GadaaLabs
AI Automation — Production Agents & Agentic Systems
Lesson 4

Planning Agents — Task Decomposition & Execution

26 min

When ReAct Is Not Enough

The ReAct loop is powerful for exploratory tasks where the next action depends on the previous observation. But it has structural limitations:

No parallelism: ReAct is inherently sequential — think, act, observe, repeat. If three independent sub-tasks exist, they run one after another.

No upfront validation: ReAct discovers problems during execution. A planning agent can detect invalid tool names or missing dependencies before running a single step.

Poor handling of long tasks: tasks with 10+ steps produce very long context windows. The LLM must reason over an increasingly long trajectory, which degrades quality and inflates cost.

No separation of concerns: the same LLM is doing both the high-level planning and the low-level execution, which forces a generalised model where specialisation would be better.

Plan-and-Execute separates the Planner (generates a full JSON task graph) from the Executor (runs steps in dependency order with parallel execution where possible).

Plan-and-Execute Architecture

The Planner receives a goal and outputs a JSON array of steps. Each step has:

  • id: unique step identifier (e.g. "step_1")
  • description: human-readable description
  • tool: the tool to call
  • args: tool arguments (can reference outputs of previous steps with {{step_id.output}})
  • depends_on: list of step IDs that must complete before this step runs

The Executor performs a topological sort, then executes all steps whose dependencies are satisfied — potentially in parallel.

The Planner

python
import json
from groq import Groq

client = Groq()

PLANNER_PROMPT = """You are a task planning agent. Given a goal, create an execution plan as a JSON array.

Each step must have:
- "id": unique string like "step_1", "step_2"
- "description": what this step does (one sentence)
- "tool": one of {available_tools}
- "args": dict of tool arguments
- "depends_on": list of step IDs that must complete first (empty list if no deps)

Use depends_on to express dependencies correctly.
Steps with no dependencies can run in parallel.

GOAL: {goal}

Return ONLY valid JSON — a list of step objects. No explanation."""


def create_plan(goal: str, available_tools: list[str]) -> list[dict]:
    """
    Ask the LLM to generate a JSON execution plan for a goal.
    Returns a list of step dicts.
    """
    response = client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[{
            "role": "user",
            "content": PLANNER_PROMPT.format(
                goal=goal,
                available_tools=", ".join(available_tools),
            ),
        }],
        response_format={"type": "json_object"},
        temperature=0.1,
    )
    data = json.loads(response.choices[0].message.content)
    # Normalise: model may wrap in a key
    if isinstance(data, dict):
        data = next(iter(data.values()))
    return data

Plan Validation

Validate the plan before executing a single step. Catch structural errors early.

python
from collections import defaultdict


class PlanValidationError(Exception):
    pass


def validate_plan(steps: list[dict], available_tools: set[str]) -> None:
    """
    Validate a plan for structural correctness.
    Raises PlanValidationError with a descriptive message if invalid.
    """
    step_ids = {step["id"] for step in steps}

    for step in steps:
        # Check required fields
        for field in ("id", "description", "tool", "args", "depends_on"):
            if field not in step:
                raise PlanValidationError(f"Step {step.get('id', '?')} missing field '{field}'")

        # Check tool exists
        if step["tool"] not in available_tools:
            raise PlanValidationError(
                f"Step {step['id']} references unknown tool '{step['tool']}'. "
                f"Available: {available_tools}"
            )

        # Check dependency IDs exist
        for dep_id in step["depends_on"]:
            if dep_id not in step_ids:
                raise PlanValidationError(
                    f"Step {step['id']} depends on '{dep_id}' which does not exist in the plan"
                )

    # Check for cycles with DFS topological sort
    _check_no_cycles(steps, step_ids)


def _check_no_cycles(steps: list[dict], step_ids: set[str]) -> None:
    """Detect cycles using DFS. Raises PlanValidationError if a cycle is found."""
    graph = {step["id"]: step["depends_on"] for step in steps}
    visited = set()
    in_stack = set()

    def dfs(node: str) -> None:
        visited.add(node)
        in_stack.add(node)
        for neighbour in graph.get(node, []):
            if neighbour not in visited:
                dfs(neighbour)
            elif neighbour in in_stack:
                raise PlanValidationError(
                    f"Cycle detected in plan: {neighbour} is an ancestor of itself"
                )
        in_stack.discard(node)

    for step_id in step_ids:
        if step_id not in visited:
            dfs(step_id)

Topological Sort

python
def topological_sort(steps: list[dict]) -> list[list[dict]]:
    """
    Sort steps into execution waves.
    Each wave is a list of steps that can execute in parallel
    (all their dependencies are in earlier waves).

    Returns a list of waves (each wave is a list of step dicts).
    """
    remaining = {step["id"]: step for step in steps}
    completed: set[str] = set()
    waves = []

    while remaining:
        # Find all steps whose dependencies are fully satisfied
        ready = [
            step for step in remaining.values()
            if all(dep in completed for dep in step["depends_on"])
        ]
        if not ready:
            raise PlanValidationError("No progress possible — potential cycle not caught by validator")

        waves.append(ready)
        for step in ready:
            completed.add(step["id"])
            del remaining[step["id"]]

    return waves

The Executor

python
import asyncio
import time


class StepExecutionError(Exception):
    def __init__(self, step_id: str, original: Exception):
        self.step_id = step_id
        self.original = original
        super().__init__(f"Step {step_id} failed: {original}")


def resolve_args(args: dict, completed_results: dict) -> dict:
    """
    Replace template references like {{step_1.output}} with actual results.
    This allows later steps to consume outputs from earlier steps.
    """
    resolved = {}
    for key, value in args.items():
        if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
            ref = value[2:-2].strip()  # strip {{ and }}
            parts = ref.split(".")
            step_id = parts[0]
            field = parts[1] if len(parts) > 1 else "output"
            resolved[key] = completed_results.get(step_id, {}).get(field, value)
        else:
            resolved[key] = value
    return resolved


async def execute_step(step: dict, tool_registry: dict, completed_results: dict) -> dict:
    """Execute a single step asynchronously, resolving arg templates first."""
    tool_fn = tool_registry.get(step["tool"])
    if tool_fn is None:
        raise StepExecutionError(step["id"], ValueError(f"Tool '{step['tool']}' not found"))

    resolved_args = resolve_args(step["args"], completed_results)

    t0 = time.perf_counter()
    try:
        result = await asyncio.to_thread(tool_fn, **resolved_args)
        latency = (time.perf_counter() - t0) * 1000
        return {"output": result, "latency_ms": latency, "status": "success"}
    except Exception as e:
        raise StepExecutionError(step["id"], e)


async def execute_plan(
    steps: list[dict],
    tool_registry: dict,
    on_step_complete=None,
) -> dict[str, dict]:
    """
    Execute the plan in dependency order, running independent steps in parallel.

    Returns a dict mapping step_id -> {output, latency_ms, status}.
    """
    waves = topological_sort(steps)
    completed_results: dict[str, dict] = {}

    for wave_idx, wave in enumerate(waves):
        print(f"Wave {wave_idx + 1}: executing {[s['id'] for s in wave]} in parallel")

        # Execute all steps in this wave concurrently
        tasks = [execute_step(step, tool_registry, completed_results) for step in wave]
        wave_results = await asyncio.gather(*tasks, return_exceptions=True)

        for step, result in zip(wave, wave_results):
            if isinstance(result, StepExecutionError):
                completed_results[step["id"]] = {"status": "failed", "error": str(result)}
            else:
                completed_results[step["id"]] = result
                if on_step_complete:
                    on_step_complete(step["id"], result)

    return completed_results

Replanning on Failure

When a step fails, pass the failure context back to the Planner and ask it to regenerate a recovery plan:

python
REPLAN_PROMPT = """You are a task planning agent. A previous execution plan failed.

ORIGINAL GOAL: {goal}
COMPLETED STEPS: {completed}
FAILED STEP: {failed_step}
ERROR: {error}

Generate a recovery plan to complete the original goal, given what has already been done.
Only include the REMAINING steps needed. Use the same JSON format.
Step IDs must not conflict with already-completed step IDs."""


def replan(
    goal: str,
    completed_steps: list[str],
    failed_step: dict,
    error: str,
    available_tools: list[str],
) -> list[dict]:
    """Ask the LLM to generate a recovery plan after a step failure."""
    response = client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[{
            "role": "user",
            "content": REPLAN_PROMPT.format(
                goal=goal,
                completed=", ".join(completed_steps),
                failed_step=json.dumps(failed_step),
                error=error,
                available_tools=available_tools,
            ),
        }],
        response_format={"type": "json_object"},
        temperature=0.1,
    )
    data = json.loads(response.choices[0].message.content)
    return data if isinstance(data, list) else next(iter(data.values()))

LLM Compiler — Batch Tool Calls

The LLM Compiler pattern generates all tool calls in a single LLM pass rather than iterating. This is faster for deterministic tasks but cannot handle conditional branches.

python
LLM_COMPILER_PROMPT = """Given the goal below, generate ALL tool calls needed to complete it in a single pass.
Output a JSON object: {{"calls": [{{"tool": str, "args": dict, "id": str}}]}}
Order calls so dependencies come first.

GOAL: {goal}
AVAILABLE TOOLS: {tools}"""


def llm_compiler_plan(goal: str, available_tools: list[str]) -> list[dict]:
    """Generate all tool calls in one shot — no iterative replanning."""
    response = client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[{"role": "user", "content": LLM_COMPILER_PROMPT.format(goal=goal, tools=available_tools)}],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    data = json.loads(response.choices[0].message.content)
    return data.get("calls", [])

Complete Data Analysis Agent

python
import pandas as pd


# --- Tool implementations ---

def load_data(filepath: str) -> dict:
    df = pd.read_csv(filepath)
    return {"rows": len(df), "columns": list(df.columns), "data": df.to_dict(orient="records")}


def compute_stats(data: dict) -> dict:
    df = pd.DataFrame(data["data"])
    numeric_cols = df.select_dtypes(include="number").columns.tolist()
    return {"stats": df[numeric_cols].describe().to_dict()}


def find_anomalies(stats: dict, data: dict) -> dict:
    df = pd.DataFrame(data["data"])
    anomalies = []
    for col, col_stats in stats["stats"].items():
        mean = col_stats["mean"]
        std = col_stats["std"]
        outliers = df[abs(df[col] - mean) > 3 * std][col].tolist()
        if outliers:
            anomalies.append({"column": col, "outlier_values": outliers[:5]})
    return {"anomalies": anomalies}


def generate_summary(stats: dict, anomalies: dict) -> dict:
    summary_prompt = f"Summarise these dataset statistics and anomalies in 3 sentences:\nStats: {stats}\nAnomalies: {anomalies}"
    response = client.chat.completions.create(
        model="llama-3.1-8b-instant",
        messages=[{"role": "user", "content": summary_prompt}],
        max_tokens=200,
    )
    return {"summary": response.choices[0].message.content.strip()}


TOOL_REGISTRY = {
    "load_data": load_data,
    "compute_stats": compute_stats,
    "find_anomalies": find_anomalies,
    "generate_summary": generate_summary,
}

# --- Run the planning agent ---

async def run_data_analysis_agent(filepath: str) -> dict:
    goal = f"Analyse the CSV file at '{filepath}': load it, compute statistics, find anomalies, and generate a summary."
    available_tools = list(TOOL_REGISTRY.keys())

    # 1. Plan
    plan = create_plan(goal, available_tools)
    print(f"Generated plan with {len(plan)} steps")

    # 2. Validate
    validate_plan(plan, set(available_tools))

    # 3. Execute
    results = await execute_plan(plan, TOOL_REGISTRY)

    # 4. Return final output
    summary_result = next(
        (v for k, v in results.items() if "summary" in k.lower()),
        list(results.values())[-1]
    )
    return {"plan": plan, "results": results, "summary": summary_result.get("output", {})}


# asyncio.run(run_data_analysis_agent("sales_data.csv"))

Key Takeaways

  • ReAct is sequential; planning agents separate concerns and enable parallel execution of independent steps, reducing total wall-clock time significantly.
  • Plan validation — checking tool names, dependency IDs, and cycle detection before execution — catches structural errors at zero cost instead of at step 7 of 12.
  • Topological sort into execution waves enables asyncio.gather-based parallel execution: independent steps run simultaneously.
  • Template references ({{step_1.output}}) allow later steps to consume earlier outputs without hard-coded data passing.
  • Replanning passes the failure context back to the Planner LLM, enabling graceful recovery without restarting the entire task.
  • The LLM Compiler generates all tool calls in a single pass — lower latency for deterministic pipelines, but cannot handle conditional branching.
  • Keep Planner and Executor as separate concerns; the Planner can be a large, slow model; the Executor is pure Python logic.
  • Always validate plans before executing — never trust raw LLM output to be structurally valid without checking it.