Parallel Agent Supervision Patterns

Module 17: Multi-Agent Workflows | Expansion Guide

Back to Module 17

The Problem

You have a big task: build a new API endpoint. It needs code, tests, documentation, and type definitions. With one agent, you do it serially: code first, then tests, then docs. It takes 20 minutes.

What if three agents worked in parallel? Code agent writes the endpoint, test agent writes tests, docs agent writes documentation - all simultaneously. Should take 7 minutes, right?

Wrong. They finish in 25 minutes because they overlap, contradict each other, and you spend 15 minutes manually resolving conflicts. Parallel agents without coordination create chaos, not speed.

The Core Insight

Parallel agent supervision works when tasks are independent and a supervisor can merge results without human intervention. Fail either condition, and you're slower than single-agent.

Think of it like cooking with multiple chefs:

The pattern has three roles:

Role Responsibility Example
Supervisor Decompose task, assign agents, merge results Splits "build API" into code/test/docs
Worker Agents Execute independent subtasks Code agent writes endpoint logic
Coordinator Orchestrate execution, handle errors Your code managing asyncio tasks

The Walkthrough

Step 1: Task Decomposition

The supervisor breaks the task into independent subtasks. Key: they must be truly independent - no shared state, no conflicting edits.

from anthropic import Anthropic
import asyncio

client = Anthropic()

async def supervisor_decompose(task: str) -> list[dict]:
    """Supervisor agent decomposes task into subtasks."""

    prompt = f"""You are a task decomposition supervisor. Break this task into 2-4 independent subtasks that can be executed in parallel.

Requirements:
- Subtasks must NOT depend on each other
- Subtasks must NOT edit the same files
- Each subtask should be completable in one LLM call

Task: {task}

Return JSON array of subtasks:
[
  {{"agent": "code", "task": "Write the API endpoint handler", "output": "app/endpoints/users.py"}},
  {{"agent": "test", "task": "Write tests for the endpoint", "output": "tests/test_users.py"}},
  ...
]"""

    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": prompt}]
    )

    # Parse JSON response
    import json
    subtasks = json.loads(response.content[0].text)
    return subtasks

# Example
task = "Build a new /users endpoint that lists all users from the database"
subtasks = await supervisor_decompose(task)
# Returns:
# [
#   {"agent": "code", "task": "Write endpoint handler", "output": "app/endpoints/users.py"},
#   {"agent": "test", "task": "Write integration tests", "output": "tests/test_users_endpoint.py"},
#   {"agent": "docs", "task": "Write API documentation", "output": "docs/api/users.md"}
# ]

Step 2: Parallel Execution

Execute each subtask concurrently using asyncio:

async def execute_subtask(subtask: dict, context: dict) -> dict:
    """Execute a single subtask with an agent."""

    prompt = f"""You are a specialized {subtask['agent']} agent.

Context (read-only, for reference):
{context.get('schema', 'No schema provided')}

Task: {subtask['task']}

Output file: {subtask['output']}

Generate the complete file content. Do not reference other agents' work - you're working independently."""

    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=4000,
        messages=[{"role": "user", "content": prompt}]
    )

    return {
        "subtask": subtask,
        "output": response.content[0].text,
        "file": subtask["output"]
    }

async def parallel_execute(subtasks: list[dict], context: dict) -> list[dict]:
    """Execute all subtasks in parallel."""

    # Create tasks
    tasks = [execute_subtask(st, context) for st in subtasks]

    # Execute concurrently
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Handle failures
    successful = []
    failed = []
    for result in results:
        if isinstance(result, Exception):
            failed.append(str(result))
        else:
            successful.append(result)

    if failed:
        print(f"⚠️ {len(failed)} subtasks failed: {failed}")

    return successful

# Execute
context = {"schema": "User model has id, name, email fields"}
results = await parallel_execute(subtasks, context)

Step 3: Result Merging

The supervisor merges results. For file-based outputs, this is straightforward (each agent wrote to different files). For more complex merges, the supervisor agent reconciles:

async def supervisor_merge(results: list[dict]) -> str:
    """Supervisor merges parallel agent outputs."""

    # If outputs are independent files, just write them
    if all_files_unique(results):
        for result in results:
            write_file(result["file"], result["output"])
        return "All files written successfully"

    # If there are conflicts, supervisor reconciles
    prompt = f"""You are a merge supervisor. Review outputs from parallel agents and resolve any conflicts.

Outputs:
{format_results(results)}

Merge strategy:
1. If files are different: accept all
2. If same file edited by multiple agents: merge intelligently
3. If contradictory logic: flag for human review

Return merged result or conflict report."""

    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=4000,
        messages=[{"role": "user", "content": prompt}]
    )

    return response.content[0].text

def all_files_unique(results: list[dict]) -> bool:
    """Check if all agents wrote to different files."""
    files = [r["file"] for r in results]
    return len(files) == len(set(files))

The Independence Test

Before running agents in parallel, ask: "Could I give these tasks to three junior developers who can't talk to each other, and merge their outputs without conflicts?" If no, don't parallelize.

When Parallel Supervision Works

Good use cases:

Bad use cases (use sequential instead):

Complete Example: Documentation Generator

async def generate_multi_format_docs(codebase_path: str):
    """Generate docs in parallel: markdown, HTML, PDF."""

    # Step 1: Supervisor decomposes
    task = f"Generate documentation for codebase at {codebase_path}"
    subtasks = [
        {"agent": "markdown", "task": "Generate README.md", "output": "README.md"},
        {"agent": "html", "task": "Generate HTML docs", "output": "docs/index.html"},
        {"agent": "api", "task": "Generate API reference", "output": "docs/api.md"}
    ]

    # Step 2: Gather context (shared read-only data)
    context = {
        "file_tree": get_file_tree(codebase_path),
        "main_modules": identify_main_modules(codebase_path)
    }

    # Step 3: Execute in parallel
    print("🚀 Launching 3 agents in parallel...")
    start = time.time()
    results = await parallel_execute(subtasks, context)
    duration = time.time() - start
    print(f"✓ Completed in {duration:.1f}s ({len(results)} files)")

    # Step 4: Supervisor merge (in this case, just write files)
    for result in results:
        write_file(result["file"], result["output"])
        print(f"  ✓ {result['file']}")

    return results

# Run
await generate_multi_format_docs("/path/to/code")

# Output:
# 🚀 Launching 3 agents in parallel...
# ✓ Completed in 8.3s (3 files)
#   ✓ README.md
#   ✓ docs/index.html
#   ✓ docs/api.md

Three agents finished in 8 seconds. Sequential would take ~20 seconds (7s per agent). Parallel achieved 2.4x speedup.

Failure Patterns

1. Merge Hell

Symptom: Agents return conflicting outputs. Merge takes longer than the parallel execution saved.

Fix: Ensure tasks are truly independent. If merge is complex, don't parallelize.

2. Context Explosion

Symptom: Each agent needs tons of context, making parallel execution slow and expensive.

Fix: Limit shared context. Give agents only what they need. Use references (file paths) instead of content.

3. One Slow Agent Blocks All

Symptom: Three agents finish in 5s, one takes 30s. Total time: 30s (no speedup).

Fix: Balance subtask complexity. Split the big subtask into smaller ones.

4. Parallel Overhead Exceeds Gains

Symptom: Coordinating agents takes longer than just doing it sequentially.

Fix: Only parallelize if subtasks take >5s each. For quick tasks, sequential is faster.

The Coordination Tax

Parallel execution has overhead: decomposition prompt, merging, error handling. If your task takes <15 seconds total, the coordination tax will make parallel slower. Parallelize big tasks, not small ones.

Quick Reference

Parallel Supervision Checklist:

Implementation Pattern:

# 1. Supervisor decomposes
subtasks = await supervisor_decompose(task)

# 2. Execute in parallel
results = await asyncio.gather(*[
    execute_subtask(st, context)
    for st in subtasks
])

# 3. Supervisor merges
final = await supervisor_merge(results)

When to Use:

When NOT to Use: