GadaaLabs
AI Automation — Production Agents & Agentic Systems
Lesson 8

Agent Observability — Tracing, Logging & Evaluation

26 min

Why Observability Is Hard for Agents

Traditional software has deterministic, synchronous execution. If something fails, you know exactly which line caused it. Agent systems are different in four ways:

Non-determinism: the same input can produce different traces on different runs because LLM sampling is probabilistic.

Multi-step failures: a task may fail on step 7 because of a bad decision on step 2. The failure point and the root cause are separated by several steps and thousands of tokens.

Nested calls: agents call tools which call APIs which may call other agents. The trace is a tree, not a line.

Emergent costs: a single agent run can involve 5 LLM calls and 8 tool calls. The total cost and latency emerge from the combination and are not visible from any single call.

Without structured observability, debugging a production agent failure means reading hundreds of log lines manually. With it, you can filter to the failed trace, see every LLM call and tool call in order, and identify the exact point of failure in seconds.

What to Capture

For every agent run, capture:

  • Run-level: run_id, task, user_id, start_time, total_duration_ms, total_cost_usd, step_count, final_status (success/failed), final_answer
  • LLM call-level: model, input_tokens, output_tokens, latency_ms, cost_usd, prompt hash
  • Tool call-level: tool_name, input_args, output_summary, latency_ms, success/error

This data enables: failure diagnosis, cost analysis, latency profiling, and automated evaluation.

OpenTelemetry Tracing

OpenTelemetry is the standard for distributed tracing. It produces traces visible in tools like Jaeger, Zipkin, Grafana Tempo, or Langfuse.

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource
import functools
import time

# Configure the tracer
resource = Resource.create({"service.name": "ai-agent", "service.version": "1.0.0"})
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)


def trace_llm_call(model: str = "unknown"):
    """Decorator: add OpenTelemetry tracing to any LLM call function."""
    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            with tracer.start_as_current_span(f"llm_call.{model}") as span:
                t0 = time.perf_counter()
                try:
                    result = fn(*args, **kwargs)
                    latency_ms = (time.perf_counter() - t0) * 1000

                    # Extract token counts if the result is an OpenAI-style response
                    if hasattr(result, "usage"):
                        span.set_attribute("llm.input_tokens", result.usage.prompt_tokens)
                        span.set_attribute("llm.output_tokens", result.usage.completion_tokens)
                        span.set_attribute("llm.total_tokens", result.usage.total_tokens)

                    span.set_attribute("llm.model", model)
                    span.set_attribute("llm.latency_ms", round(latency_ms, 2))
                    span.set_attribute("llm.success", True)
                    return result
                except Exception as e:
                    span.set_attribute("llm.success", False)
                    span.set_attribute("llm.error", str(e))
                    span.record_exception(e)
                    raise
        return wrapper
    return decorator


def trace_tool_call(fn):
    """Decorator: add OpenTelemetry tracing to any tool function."""
    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(f"tool_call.{fn.__name__}") as span:
            span.set_attribute("tool.name", fn.__name__)
            span.set_attribute("tool.args", str(kwargs)[:500])
            t0 = time.perf_counter()
            try:
                result = fn(*args, **kwargs)
                latency_ms = (time.perf_counter() - t0) * 1000
                span.set_attribute("tool.latency_ms", round(latency_ms, 2))
                span.set_attribute("tool.success", True)
                span.set_attribute("tool.output_summary", str(result)[:200])
                return result
            except Exception as e:
                span.set_attribute("tool.success", False)
                span.set_attribute("tool.error", str(e))
                span.record_exception(e)
                raise
    return wrapper

Structured JSON Logging

Every agent event should be logged as a JSON object so it can be queried programmatically:

python
import json
import logging
import sys
import uuid
import datetime


class StructuredLogger:
    """
    Logger that emits every event as a single-line JSON object.
    Compatible with log aggregation systems (Datadog, Splunk, CloudWatch).
    """

    def __init__(self, service_name: str):
        self.service = service_name
        # Use Python's logging module as the transport layer
        self._logger = logging.getLogger(service_name)
        if not self._logger.handlers:
            handler = logging.StreamHandler(sys.stdout)
            handler.setFormatter(logging.Formatter("%(message)s"))
            self._logger.addHandler(handler)
            self._logger.setLevel(logging.INFO)

    def _emit(self, event_type: str, metadata: dict) -> None:
        record = {
            "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
            "service": self.service,
            "event_type": event_type,
            **metadata,
        }
        self._logger.info(json.dumps(record))

    def log_run_start(self, run_id: str, task: str, user_id: str) -> None:
        self._emit("run_start", {"run_id": run_id, "task": task[:200], "user_id": user_id})

    def log_run_end(self, run_id: str, status: str, steps: int, cost_usd: float, duration_ms: float) -> None:
        self._emit("run_end", {
            "run_id": run_id, "status": status, "steps": steps,
            "cost_usd": round(cost_usd, 6), "duration_ms": round(duration_ms, 2),
        })

    def log_llm_call(self, run_id: str, model: str, input_tokens: int, output_tokens: int, latency_ms: float) -> None:
        cost = self._estimate_cost(model, input_tokens, output_tokens)
        self._emit("llm_call", {
            "run_id": run_id, "model": model,
            "input_tokens": input_tokens, "output_tokens": output_tokens,
            "latency_ms": round(latency_ms, 2), "cost_usd": cost,
        })

    def log_tool_call(self, run_id: str, tool_name: str, success: bool, latency_ms: float, error: str = "") -> None:
        self._emit("tool_call", {
            "run_id": run_id, "tool_name": tool_name,
            "success": success, "latency_ms": round(latency_ms, 2), "error": error,
        })

    @staticmethod
    def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
        """Rough cost estimates in USD per token."""
        rates = {
            "llama-3.3-70b-versatile": (0.0, 0.0),   # Groq free tier
            "gpt-4o": (0.000005, 0.000015),
            "claude-sonnet-4-6": (0.000003, 0.000015),
        }
        in_rate, out_rate = rates.get(model, (0.000001, 0.000002))
        return round(in_rate * input_tokens + out_rate * output_tokens, 8)

The Complete AgentTracer Class

python
import sqlite3
import datetime


class AgentTracer:
    """
    Complete observability wrapper for agent runs.
    Captures every LLM call, tool call, and run summary to SQLite.
    """

    def __init__(self, db_path: str = "agent_traces.db"):
        self.db_path = db_path
        self.logger = StructuredLogger("agent")
        self._init_db()

    def _init_db(self) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS runs (
                    run_id TEXT PRIMARY KEY, task TEXT, user_id TEXT,
                    status TEXT, steps INTEGER, cost_usd REAL,
                    duration_ms REAL, started_at TEXT, ended_at TEXT
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS llm_calls (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    run_id TEXT, model TEXT, input_tokens INTEGER,
                    output_tokens INTEGER, latency_ms REAL, cost_usd REAL, called_at TEXT
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS tool_calls (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    run_id TEXT, tool_name TEXT, success INTEGER,
                    latency_ms REAL, error TEXT, called_at TEXT
                )
            """)

    def start_run(self, task: str, user_id: str = "anonymous") -> str:
        run_id = str(uuid.uuid4())[:8]
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO runs (run_id, task, user_id, status, steps, cost_usd, duration_ms, started_at) VALUES (?, ?, ?, 'running', 0, 0, 0, ?)",
                (run_id, task[:500], user_id, datetime.datetime.utcnow().isoformat()),
            )
        self.logger.log_run_start(run_id, task, user_id)
        return run_id

    def record_llm_call(self, run_id: str, model: str, input_tokens: int, output_tokens: int, latency_ms: float) -> None:
        cost = StructuredLogger._estimate_cost(model, input_tokens, output_tokens)
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO llm_calls (run_id, model, input_tokens, output_tokens, latency_ms, cost_usd, called_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
                (run_id, model, input_tokens, output_tokens, latency_ms, cost, datetime.datetime.utcnow().isoformat()),
            )
            # Update run cost
            conn.execute("UPDATE runs SET cost_usd = cost_usd + ?, steps = steps + 1 WHERE run_id = ?", (cost, run_id))
        self.logger.log_llm_call(run_id, model, input_tokens, output_tokens, latency_ms)

    def record_tool_call(self, run_id: str, tool_name: str, success: bool, latency_ms: float, error: str = "") -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO tool_calls (run_id, tool_name, success, latency_ms, error, called_at) VALUES (?, ?, ?, ?, ?, ?)",
                (run_id, tool_name, int(success), latency_ms, error, datetime.datetime.utcnow().isoformat()),
            )
        self.logger.log_tool_call(run_id, tool_name, success, latency_ms, error)

    def end_run(self, run_id: str, status: str, started_at_iso: str) -> None:
        now = datetime.datetime.utcnow()
        started = datetime.datetime.fromisoformat(started_at_iso)
        duration_ms = (now - started).total_seconds() * 1000
        with sqlite3.connect(self.db_path) as conn:
            row = conn.execute("SELECT steps, cost_usd FROM runs WHERE run_id=?", (run_id,)).fetchone()
            steps, cost = (row[0], row[1]) if row else (0, 0.0)
            conn.execute(
                "UPDATE runs SET status=?, duration_ms=?, ended_at=? WHERE run_id=?",
                (status, duration_ms, now.isoformat(), run_id),
            )
        self.logger.log_run_end(run_id, status, steps, cost, duration_ms)

    def get_metrics(self, hours: int = 24) -> dict:
        """Aggregate metrics for the last N hours."""
        since = (datetime.datetime.utcnow() - datetime.timedelta(hours=hours)).isoformat()
        with sqlite3.connect(self.db_path) as conn:
            runs = conn.execute(
                "SELECT status, steps, cost_usd, duration_ms FROM runs WHERE started_at > ?", (since,)
            ).fetchall()
            tool_errors = conn.execute(
                "SELECT COUNT(*) FROM tool_calls WHERE called_at > ? AND success=0", (since,)
            ).fetchone()[0]

        if not runs:
            return {}

        total = len(runs)
        successes = sum(1 for r in runs if r[0] == "success")
        return {
            "total_runs": total,
            "task_success_rate": successes / total,
            "avg_steps": sum(r[1] for r in runs) / total,
            "avg_cost_usd": sum(r[2] for r in runs) / total,
            "p99_latency_ms": sorted(r[3] for r in runs)[int(total * 0.99)] if total > 1 else 0,
            "tool_error_rate": tool_errors / max(total, 1),
        }

Automated Evaluation Pipeline

Record 100 runs, then replay them through an LLM-as-judge to measure task completion quality:

python
from groq import Groq

client = Groq()

JUDGE_PROMPT = """You are evaluating whether an AI agent successfully completed a task.

TASK: {task}

AGENT'S FINAL ANSWER:
{answer}

Did the agent correctly and completely complete the task?
Return JSON: {{"completed": true | false, "score": 1-5, "reasoning": str}}"""


def evaluate_run(task: str, answer: str) -> dict:
    """LLM-as-judge evaluation of a single agent run."""
    response = client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[{"role": "user", "content": JUDGE_PROMPT.format(task=task[:500], answer=answer[:1000])}],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    return json.loads(response.choices[0].message.content)


def run_evaluation_pipeline(
    recorded_runs: list[dict],
    sample_fraction: float = 0.1,
) -> dict:
    """
    Evaluate a sample of recorded runs.
    recorded_runs: list of {run_id, task, answer} dicts
    """
    import random
    sample_size = max(1, int(len(recorded_runs) * sample_fraction))
    sample = random.sample(recorded_runs, sample_size)

    scores = []
    for run in sample:
        try:
            evaluation = evaluate_run(run["task"], run["answer"])
            scores.append(evaluation["score"])
        except Exception:
            pass

    return {
        "sampled": len(scores),
        "avg_score": sum(scores) / len(scores) if scores else 0,
        "completion_rate": sum(1 for s in scores if s >= 4) / len(scores) if scores else 0,
    }

Anomaly Detection

python
def check_for_anomalies(tracer: AgentTracer, alert_fn=print) -> None:
    """
    Check current metrics against baselines and alert on anomalies.
    In production: replace alert_fn with a Slack/PagerDuty call.
    """
    metrics = tracer.get_metrics(hours=1)
    baseline = tracer.get_metrics(hours=24 * 7)  # last week as baseline

    if not metrics or not baseline:
        return

    # Alert if avg_steps doubled (agent may be looping)
    if baseline["avg_steps"] > 0 and metrics["avg_steps"] > baseline["avg_steps"] * 2:
        alert_fn(f"ALERT: avg_steps doubled — possible agent loop. Current: {metrics['avg_steps']:.1f}, baseline: {baseline['avg_steps']:.1f}")

    # Alert if cost spiked
    if baseline["avg_cost_usd"] > 0 and metrics["avg_cost_usd"] > baseline["avg_cost_usd"] * 3:
        alert_fn(f"ALERT: cost spike. Current avg: ${metrics['avg_cost_usd']:.4f}, baseline: ${baseline['avg_cost_usd']:.4f}")

    # Alert if success rate dropped
    if metrics["task_success_rate"] < 0.8:
        alert_fn(f"ALERT: success rate dropped to {metrics['task_success_rate']:.1%}")

Key Takeaways

  • Agent observability requires three layers: structured per-event logs, OpenTelemetry distributed traces, and aggregate metrics stored in a queryable database.
  • Capture every LLM call with (model, input_tokens, output_tokens, latency_ms, cost) — this data drives both debugging and cost optimisation.
  • Use a unique run_id on every task execution and attach it to every downstream log event — this is what enables end-to-end trace reconstruction.
  • The @trace_llm_call and @trace_tool_call decorators add observability to existing functions without changing their logic.
  • Record runs to SQLite; replay a 10% sample through LLM-as-judge evaluation to continuously measure task quality without evaluating every run.
  • Anomaly detection alerts on avg_steps doubling (looping) or cost spiking (runaway LLM calls) — both are production failure modes that latency alone won't catch.
  • Structured JSON logs are queryable with any log analytics tool; unstructured text logs require manual parsing.
  • Track five metrics in production: task_success_rate, avg_steps, avg_cost_usd, p99_latency, and tool_error_rate. Dashboard these and set alert thresholds before launch.