Production Agent Deployment — Reliability, Cost & Scaling
28 min
The Reliability Problem at Scale
A single agent run that works 99% of the time sounds reliable. At 1000 tasks per day, that is 10 failures per day — which means 10 users, customers, or automated pipelines hitting errors. At 10,000 tasks per day, it is 100 failures.
Production reliability requires addressing failures at every layer:
Transient failures (network timeouts, rate limits, temporary service unavailability): retry with exponential backoff
Duplicate execution (the same task submitted twice): idempotency keys
Cascading failures (one service failing causes agents to queue up and crash): circuit breakers
from tenacity import ( retry, wait_exponential, stop_after_attempt, retry_if_exception_type, before_sleep_log,)import loggingimport timelogger = logging.getLogger(__name__)class TransientError(Exception): """Retryable: network error, rate limit, 5xx from API."""class PermanentError(Exception): """Non-retryable: invalid input, 4xx (except 429), authentication failure."""@retry( retry=retry_if_exception_type(TransientError), wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True,)def call_llm_reliable(messages: list[dict], model: str) -> str: """LLM call with automatic retry on transient failures.""" from groq import Groq, RateLimitError, APIConnectionError, APIStatusError client = Groq() try: response = client.chat.completions.create( model=model, messages=messages, max_tokens=800, timeout=30, # hard timeout per call ) return response.choices[0].message.content except RateLimitError as e: raise TransientError(f"Rate limited: {e}") from e except APIConnectionError as e: raise TransientError(f"Connection error: {e}") from e except APIStatusError as e: if e.status_code >= 500: raise TransientError(f"Server error {e.status_code}: {e}") from e raise PermanentError(f"Client error {e.status_code}: {e}") from e
Idempotency Keys
python
import redisimport jsonimport hashlibredis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)IDEMPOTENCY_TTL = 86400 # 24 hoursdef run_idempotent(idempotency_key: str, task_fn, *args, **kwargs) -> dict: """ Execute a task function exactly once per idempotency key. Subsequent calls with the same key return the cached result. """ cache_key = f"idempotent:{idempotency_key}" cached = redis_client.get(cache_key) if cached: return json.loads(cached) result = task_fn(*args, **kwargs) redis_client.setex(cache_key, IDEMPOTENCY_TTL, json.dumps(result, default=str)) return result
Circuit Breaker
A circuit breaker prevents cascading failures by stopping calls to a failing service before they pile up:
python
import threadingimport timefrom enum import Enumclass CircuitState(str, Enum): CLOSED = "closed" # normal operation, calls pass through OPEN = "open" # service is failing, calls immediately fail HALF_OPEN = "half_open" # probe: try one call, close if it succeedsclass CircuitBreaker: """ Circuit breaker for any external service call. Opens after failure_threshold failures in window_seconds. """ def __init__( self, name: str, failure_threshold: int = 5, recovery_timeout: float = 30.0, window_seconds: float = 60.0, ): self.name = name self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.window_seconds = window_seconds self._state = CircuitState.CLOSED self._failures: list[float] = [] self._opened_at: float = 0.0 self._lock = threading.Lock() def call(self, fn, *args, **kwargs): """Call fn through the circuit breaker.""" with self._lock: if self._state == CircuitState.OPEN: if time.time() - self._opened_at > self.recovery_timeout: self._state = CircuitState.HALF_OPEN else: raise RuntimeError(f"Circuit '{self.name}' is OPEN — service unavailable") try: result = fn(*args, **kwargs) with self._lock: if self._state == CircuitState.HALF_OPEN: self._state = CircuitState.CLOSED self._failures.clear() print(f"Circuit '{self.name}' CLOSED — service recovered") return result except Exception as e: with self._lock: now = time.time() # Expire old failures outside the window self._failures = [t for t in self._failures if now - t < self.window_seconds] self._failures.append(now) if len(self._failures) >= self.failure_threshold: self._state = CircuitState.OPEN self._opened_at = now print(f"Circuit '{self.name}' OPENED after {len(self._failures)} failures") raise
Token Budget Management
python
def count_tokens_approx(text: str) -> int: """Rough token estimate: 1 token ≈ 4 characters.""" return len(text) // 4def build_messages_within_budget( system_prompt: str, conversation_history: list[dict], user_message: str, max_tokens: int = 6000, reserve_for_output: int = 800,) -> list[dict]: """ Build a message list that fits within the token budget. Keeps the system prompt and the most recent messages. Drops the oldest messages if the budget is exceeded. """ budget = max_tokens - reserve_for_output system_tokens = count_tokens_approx(system_prompt) user_tokens = count_tokens_approx(user_message) available_for_history = budget - system_tokens - user_tokens if available_for_history < 0: # Even the system prompt + user message exceeds budget — truncate user message available_for_history = 0 # Fill history from newest to oldest until we exhaust the budget selected_history = [] used_tokens = 0 for msg in reversed(conversation_history): msg_tokens = count_tokens_approx(msg["content"]) if used_tokens + msg_tokens > available_for_history: break selected_history.insert(0, msg) used_tokens += msg_tokens messages = [{"role": "system", "content": system_prompt}] messages.extend(selected_history) messages.append({"role": "user", "content": user_message}) return messages
Model Routing — Cost Optimisation
Route simple tasks to a cheap, fast model and complex tasks to a capable, expensive one:
python
ROUTING_CONFIG = { "simple": { "model": "llama-3.1-8b-instant", "max_tokens": 400, "description": "Simple Q&A, short summaries, classification", }, "standard": { "model": "llama-3.3-70b-versatile", "max_tokens": 800, "description": "Multi-step reasoning, code generation, complex analysis", },}SIMPLE_TASK_INDICATORS = [ "what is", "define", "summarise", "list", "classify", "yes or no", "true or false", "how many"]def route_to_model(task_description: str) -> dict: """ Route a task to the appropriate model based on complexity heuristics. """ lower = task_description.lower() is_simple = any(indicator in lower for indicator in SIMPLE_TASK_INDICATORS) # Simple tasks under 20 words get the small model is_short = len(task_description.split()) < 20 if is_simple and is_short: return ROUTING_CONFIG["simple"] return ROUTING_CONFIG["standard"]
Production FastAPI Service
python
from fastapi import FastAPI, HTTPException, BackgroundTasksfrom pydantic import BaseModelimport asyncioimport uuidimport timeapp = FastAPI(title="Production Agent Service")# Task state store (use Redis in production for horizontal scaling)task_store: dict[str, dict] = {}class TaskRequest(BaseModel): task: str user_id: str = "anonymous" idempotency_key: str | None = Noneclass TaskResponse(BaseModel): task_id: str status: str # "pending" | "running" | "complete" | "failed" result: str | None = None error: str | None = None cost_usd: float | None = None duration_ms: float | None = None@app.post("/tasks", response_model=TaskResponse, status_code=202)async def create_task(request: TaskRequest, background_tasks: BackgroundTasks) -> TaskResponse: """Submit a new agent task. Returns immediately with a task_id for polling.""" # Idempotency: check if we've seen this key before if request.idempotency_key: existing = _find_by_idempotency_key(request.idempotency_key) if existing: return TaskResponse(**existing) task_id = str(uuid.uuid4())[:8] task_store[task_id] = { "task_id": task_id, "task": request.task, "user_id": request.user_id, "idempotency_key": request.idempotency_key, "status": "pending", "result": None, "error": None, "cost_usd": 0.0, "duration_ms": 0.0, "created_at": time.time(), } background_tasks.add_task(_run_agent_task, task_id, request.task) return TaskResponse(task_id=task_id, status="pending")@app.get("/tasks/{task_id}", response_model=TaskResponse)async def get_task(task_id: str) -> TaskResponse: """Poll task status and retrieve result when complete.""" task = task_store.get(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return TaskResponse(**task)async def _run_agent_task(task_id: str, task_description: str) -> None: """Background worker that executes the agent task.""" task_store[task_id]["status"] = "running" t_start = time.perf_counter() try: # Route to appropriate model routing = route_to_model(task_description) model = routing["model"] messages = [ {"role": "system", "content": "You are a helpful assistant. Complete the task concisely."}, {"role": "user", "content": task_description}, ] result = call_llm_reliable(messages, model) duration_ms = (time.perf_counter() - t_start) * 1000 task_store[task_id].update({ "status": "complete", "result": result, "duration_ms": round(duration_ms, 2), }) except Exception as e: duration_ms = (time.perf_counter() - t_start) * 1000 task_store[task_id].update({ "status": "failed", "error": str(e), "duration_ms": round(duration_ms, 2), })def _find_by_idempotency_key(key: str) -> dict | None: for task in task_store.values(): if task.get("idempotency_key") == key: return task return None@app.get("/health")async def health() -> dict: pending = sum(1 for t in task_store.values() if t["status"] in {"pending", "running"}) return {"status": "ok", "tasks_in_flight": pending}
Graceful Shutdown
python
import signalimport asyncio_shutdown_event = asyncio.Event()def handle_shutdown(sig, frame): print(f"Received {signal.Signals(sig).name} — initiating graceful shutdown") _shutdown_event.set()signal.signal(signal.SIGTERM, handle_shutdown)signal.signal(signal.SIGINT, handle_shutdown)async def wait_for_drain(max_wait_seconds: int = 30) -> None: """Wait for in-flight tasks to complete before shutting down.""" deadline = time.time() + max_wait_seconds while time.time() < deadline: in_flight = sum(1 for t in task_store.values() if t["status"] in {"pending", "running"}) if in_flight == 0: print("All tasks drained. Shutting down cleanly.") return print(f"Draining: {in_flight} tasks in flight. Waiting...") await asyncio.sleep(2) print(f"Drain timeout ({max_wait_seconds}s). Forcing shutdown.")
Production Dockerfile
python
# This is a Dockerfile (not Python) — shown as a comment-formatted code block# syntax=docker/dockerfile:1# Stage 1: build dependencies# FROM python:3.12-slim AS builder# WORKDIR /app# COPY requirements.txt .# RUN pip install --no-cache-dir --user -r requirements.txt# Stage 2: production image# FROM python:3.12-slim# WORKDIR /app# COPY --from=builder /root/.local /root/.local# COPY . .# ENV PATH=/root/.local/bin:$PATH# ENV PYTHONDONTWRITEBYTECODE=1# ENV PYTHONUNBUFFERED=1# RUN useradd -m -u 1000 agent && chown -R agent:agent /app# USER agent# HEALTHCHECK --interval=30s --timeout=10s --retries=3 \# CMD curl -f http://localhost:8000/health || exit 1# EXPOSE 8000# CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]print("See Dockerfile in repo root — multi-stage build with non-root user and health check")