Agents are the right tool when the task requires dynamic reasoning, tool selection, and adaptive decision-making. But not every automation problem needs an agent. Workflows — deterministic pipelines with predefined steps — are better when:
Determinism is required: an invoice processing pipeline must always run the same steps in the same order. A non-deterministic agent that "decides" to skip validation is a liability.
Audit trails matter: a compliance workflow needs a complete, reproducible record of every step. A DAG-based workflow provides this naturally; an agent's reasoning trace does not.
Cost is constrained: a simple extract-transform-load pipeline with one LLM call per document does not need an agent loop with 5 LLM calls to "decide" what to do.
Reliability is paramount: workflows are easier to test, retry, and monitor because the state machine is explicit.
The correct architecture uses both: workflows handle the deterministic orchestration; agents handle the steps that require language understanding.
Event-Driven Webhooks
Webhook endpoints receive HTTP POST requests when external events occur (GitHub push, Stripe payment, Slack message). The pattern:
Receive POST request
Validate the signature (HMAC-SHA256)
Acknowledge immediately (return 200)
Enqueue the actual processing for async execution
python
import hashlibimport hmacimport jsonfrom fastapi import FastAPI, Request, HTTPException, BackgroundTasksfrom redis import Redisapp = FastAPI()redis = Redis(host="localhost", port=6379, decode_responses=True)WEBHOOK_SECRET = "your_webhook_secret" # shared secret from the senderdef verify_webhook_signature(payload: bytes, signature_header: str, secret: str) -> bool: """Verify HMAC-SHA256 webhook signature. Reject the request if invalid.""" expected = "sha256=" + hmac.new( secret.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature_header)@app.post("/webhooks/document-updated")async def document_webhook(request: Request, background_tasks: BackgroundTasks): """ Receive document update events. Acknowledge immediately, process in background. """ payload = await request.body() signature = request.headers.get("X-Signature", "") if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET): raise HTTPException(status_code=401, detail="Invalid signature") event = json.loads(payload) # Enqueue for async processing — never block the webhook response background_tasks.add_task(process_document_event, event) return {"status": "accepted"}async def process_document_event(event: dict) -> None: """Background processing of a document update event.""" doc_id = event.get("doc_id") action = event.get("action") # "created" | "updated" | "deleted" if action == "deleted": redis.rpush("delete_queue", json.dumps({"doc_id": doc_id})) elif action in ("created", "updated"): redis.rpush("index_queue", json.dumps({"doc_id": doc_id, "url": event.get("url")}))
Message Queue with Redis and Celery
For workloads that need reliable delivery, retry, and distributed processing, use a message queue:
python
# celery_app.pyfrom celery import Celeryfrom kombu import Queuecelery_app = Celery( "ai_workflows", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1",)celery_app.conf.update( task_queues=( Queue("high_priority", routing_key="high"), Queue("default", routing_key="default"), Queue("low_priority", routing_key="low"), ), task_default_queue="default", task_acks_late=True, # ack only after successful completion, not on receipt task_reject_on_worker_lost=True,)@celery_app.task( bind=True, max_retries=3, queue="default", name="process_document",)def process_document_task(self, doc_id: str, doc_url: str) -> dict: """Process a single document: download, chunk, embed, index.""" try: content = download_document(doc_url) chunks = chunk_document(content) embeddings = embed_chunks(chunks) upsert_to_index(doc_id, chunks, embeddings) return {"doc_id": doc_id, "chunks_indexed": len(chunks), "status": "success"} except DownloadError as e: # Retry with exponential backoff for transient errors raise self.retry(exc=e, countdown=2 ** self.request.retries) except PermanentError: # Do not retry — log and fail permanently return {"doc_id": doc_id, "status": "failed", "error": str(e)}
Retry with Exponential Backoff
python
from tenacity import ( retry, wait_exponential, stop_after_attempt, retry_if_exception_type, before_sleep_log,)import logginglogger = logging.getLogger(__name__)class TransientError(Exception): """Retryable error: network timeout, rate limit, temporary service unavailability."""class PermanentError(Exception): """Non-retryable error: invalid input, authentication failure, resource not found."""@retry( retry=retry_if_exception_type(TransientError), wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True,)def call_llm_with_retry(prompt: str) -> str: """Call an LLM API with automatic retry on transient failures.""" from groq import Groq, RateLimitError, APIConnectionError client = Groq() try: response = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user", "content": prompt}], max_tokens=500, ) return response.choices[0].message.content except RateLimitError as e: raise TransientError(f"Rate limited: {e}") from e except APIConnectionError as e: raise TransientError(f"Connection error: {e}") from e
Idempotency
Every task must be idempotent: if the same task arrives twice (due to network retry, duplicate event), the second execution must produce the same result without double-processing.
python
import redisimport jsonimport uuidredis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)IDEMPOTENCY_TTL = 86400 # 24 hoursdef idempotent_task(idempotency_key: str, task_fn, *args, **kwargs): """ Execute a task idempotently. If idempotency_key has been seen before, return the cached result. """ cache_key = f"idempotency:{idempotency_key}" cached = redis_client.get(cache_key) if cached: print(f"Idempotency hit for key: {idempotency_key}") return json.loads(cached) result = task_fn(*args, **kwargs) redis_client.setex(cache_key, IDEMPOTENCY_TTL, json.dumps(result)) return result
DAG-Based Workflow with State Machine
A state machine makes workflow transitions explicit and prevents invalid state jumps:
When a multi-step workflow partially succeeds and then fails, the saga pattern runs compensating actions to undo the completed steps:
python
from dataclasses import dataclassfrom typing import Callable@dataclassclass SagaStep: name: str execute: Callable # the forward action compensate: Callable # the rollback action (undo)class Saga: """ Execute a sequence of steps with automatic rollback on failure. Each step must define a compensating action. """ def __init__(self, steps: list[SagaStep]): self.steps = steps def run(self, context: dict) -> dict: """Execute all steps. On failure, roll back all completed steps in reverse order.""" completed: list[SagaStep] = [] for step in self.steps: try: print(f"Executing: {step.name}") result = step.execute(context) context[step.name] = result completed.append(step) except Exception as e: print(f"Step '{step.name}' failed: {e}. Rolling back {len(completed)} completed steps.") # Compensate in reverse order for completed_step in reversed(completed): try: completed_step.compensate(context) print(f"Compensated: {completed_step.name}") except Exception as comp_err: print(f"Compensation failed for '{completed_step.name}': {comp_err}") raise RuntimeError(f"Workflow failed at step '{step.name}': {e}") from e return context
Complete Report Generation Workflow
python
import datetimeimport smtplibfrom email.mime.text import MIMETextfrom groq import Groqclient = Groq()def load_data_step(context: dict) -> dict: """Step 1: Load data from the database.""" # In production: run a SQL query return {"rows": 1500, "date_range": "2025-01-01 to 2025-03-31", "data": []}def analyse_with_llm_step(context: dict) -> dict: """Step 2: Use LLM to generate qualitative analysis of the data.""" data_summary = context.get("load_data_step", {}) response = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{ "role": "user", "content": f"Analyse this data summary and provide 3 key insights: {data_summary}", }], max_tokens=300, ) return {"insights": response.choices[0].message.content}def generate_charts_step(context: dict) -> dict: """Step 3: Generate chart descriptions (placeholder for real chart generation).""" return {"charts": ["revenue_trend.png", "cost_breakdown.png"]}def compose_html_report_step(context: dict) -> dict: """Step 4: Compose the full HTML report.""" analysis = context.get("analyse_with_llm_step", {}) charts = context.get("generate_charts_step", {}) html = f""" <html><body> <h1>Quarterly Report — {datetime.date.today()}</h1> <h2>Key Insights</h2> <p>{analysis.get('insights', 'No insights generated')}</p> <h2>Charts</h2> <p>{', '.join(charts.get('charts', []))}</p> </body></html> """ return {"html_report": html, "report_path": "/tmp/quarterly_report.html"}def send_email_step(context: dict) -> dict: """Step 5: Email the report.""" report = context.get("compose_html_report_step", {}) # In production: use SES, SendGrid, etc. # msg = MIMEText(report["html_report"], "html") # ... smtp send logic ... return {"sent_to": "team@company.com", "status": "sent"}def run_report_workflow() -> dict: """Execute the full report generation workflow with saga rollback.""" steps = [ SagaStep( name="load_data_step", execute=load_data_step, compensate=lambda ctx: print("Nothing to compensate for data load"), ), SagaStep( name="analyse_with_llm_step", execute=analyse_with_llm_step, compensate=lambda ctx: print("Nothing to compensate for analysis"), ), SagaStep( name="generate_charts_step", execute=generate_charts_step, compensate=lambda ctx: print("Deleting generated chart files"), ), SagaStep( name="compose_html_report_step", execute=compose_html_report_step, compensate=lambda ctx: print(f"Deleting report: {ctx.get('compose_html_report_step', {}).get('report_path')}"), ), SagaStep( name="send_email_step", execute=send_email_step, compensate=lambda ctx: print("Cannot unsend email — logging compensation failure"), ), ] saga = Saga(steps) return saga.run(context={})
Key Takeaways
Choose workflows over agents when the task is deterministic, requires an audit trail, or has strict cost and reliability constraints.
Always validate webhook signatures before processing — unauthenticated webhooks are a common attack vector.
Acknowledge webhook requests immediately and process asynchronously — never make the sender wait for your processing.
Use tenacity with wait_exponential for transient errors (rate limits, network timeouts); never retry PermanentErrors (invalid input, auth failures).
Idempotency keys prevent double-processing when events are delivered more than once — store results keyed by idempotency_key with TTL.
State machines make valid workflow transitions explicit; a transition that bypasses WAITING_REVIEW should be impossible by construction.
The saga pattern is the correct rollback mechanism for multi-step workflows — each step must define a compensating action that runs if a later step fails.
The cron → load → analyse → compose → send pattern is the template for most automated reporting workflows; parametrise the steps for different report types.