Follow AiTechWorlds on LinkedIn for professional AI content!Follow Now →
18 minLesson 15 of 23
Multi-Agent Systems with LangGraph

Parallel & Sequential Agent Workflows

Parallel and Sequential Agent Execution

How you arrange agents — whether they work one at a time or simultaneously — dramatically affects both performance and cost. This lesson covers the patterns for orchestrating multiple agents efficiently.

Sequential vs. Parallel: The Core Trade-off

Sequential: Agent B starts only after Agent A finishes. Agent B can use Agent A's output. Parallel: Agent A and Agent B run simultaneously. Neither can use the other's output.

The choice depends on data dependencies:

  • If Agent B needs Agent A's output → Sequential
  • If Agent A and Agent B are independent → Parallel (usually faster)

Sequential Patterns

Pipeline: Each Agent Enriches the Previous Output

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class PipelineState(TypedDict):
    input: str
    research: str       # Filled by research agent
    analysis: str       # Filled by analysis agent (uses research)
    report: str         # Filled by writing agent (uses analysis)

def research_node(state: PipelineState) -> PipelineState:
    """Step 1: Gather information."""
    result = research_agent.invoke(state["input"])
    return {**state, "research": result}

def analysis_node(state: PipelineState) -> PipelineState:
    """Step 2: Analyze the research (must come after research)."""
    analysis_input = f"Research findings:\n{state['research']}\n\nAnalyze these findings."
    result = analysis_agent.invoke(analysis_input)
    return {**state, "analysis": result}

def writing_node(state: PipelineState) -> PipelineState:
    """Step 3: Write the report (uses both research and analysis)."""
    writing_input = f"""
Research: {state['research']}
Analysis: {state['analysis']}

Write a comprehensive report.
"""
    result = writing_agent.invoke(writing_input)
    return {**state, "report": result}

# Sequential pipeline — strict ordering
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("writing", writing_node)

pipeline.set_entry_point("research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "writing")
pipeline.add_edge("writing", END)

graph = pipeline.compile()

Iterative Refinement: Sequential with Feedback Loops

class RefinementState(TypedDict):
    task: str
    draft: str
    critique: str
    iteration: int
    max_iterations: int
    done: bool

def write_draft(state: RefinementState) -> RefinementState:
    """Generate or revise the draft."""
    if state["iteration"] == 0:
        draft = writing_agent.invoke(f"Write: {state['task']}")
    else:
        draft = writing_agent.invoke(
            f"Revise this draft based on the critique:\n\nDraft:\n{state['draft']}\n\nCritique:\n{state['critique']}"
        )
    return {**state, "draft": draft}

def critique_draft(state: RefinementState) -> RefinementState:
    """Evaluate the draft."""
    critique = critique_agent.invoke(
        f"Critique this draft on factual accuracy, clarity, and completeness:\n{state['draft']}"
    )
    return {**state, "critique": critique, "iteration": state["iteration"] + 1}

def should_continue_refinement(state: RefinementState) -> str:
    if state["iteration"] >= state["max_iterations"]:
        return "done"
    if "no significant issues" in state.get("critique", "").lower():
        return "done"
    return "revise"

refine_graph = StateGraph(RefinementState)
refine_graph.add_node("write", write_draft)
refine_graph.add_node("critique", critique_draft)

refine_graph.set_entry_point("write")
refine_graph.add_edge("write", "critique")
refine_graph.add_conditional_edges(
    "critique",
    should_continue_refinement,
    {"revise": "write", "done": END}
)

graph = refine_graph.compile()
result = graph.invoke({"task": "Write a competitive analysis", "iteration": 0, "max_iterations": 3, "done": False, "draft": "", "critique": ""})

Parallel Patterns

Fan-Out / Fan-In: Parallel Research, Combined Synthesis

from typing import TypedDict, Annotated, List
import operator

class ParallelResearchState(TypedDict):
    topic: str
    competitor_a_research: str
    competitor_b_research: str  
    competitor_c_research: str
    synthesis: str

# Three independent research tasks run in parallel
def research_competitor_a(state: ParallelResearchState) -> dict:
    result = research_agent.invoke(f"Research {state['topic']} focusing on Company A")
    return {"competitor_a_research": result}

def research_competitor_b(state: ParallelResearchState) -> dict:
    result = research_agent.invoke(f"Research {state['topic']} focusing on Company B")
    return {"competitor_b_research": result}

def research_competitor_c(state: ParallelResearchState) -> dict:
    result = research_agent.invoke(f"Research {state['topic']} focusing on Company C")
    return {"competitor_c_research": result}

def synthesize_research(state: ParallelResearchState) -> dict:
    """Runs only after all three parallel nodes complete."""
    synthesis = synthesis_agent.invoke(f"""
Synthesize this competitive research:
A: {state['competitor_a_research']}
B: {state['competitor_b_research']}
C: {state['competitor_c_research']}
""")
    return {"synthesis": synthesis}

parallel_graph = StateGraph(ParallelResearchState)
parallel_graph.add_node("research_a", research_competitor_a)
parallel_graph.add_node("research_b", research_competitor_b)
parallel_graph.add_node("research_c", research_competitor_c)
parallel_graph.add_node("synthesize", synthesize_research)

parallel_graph.set_entry_point("research_a")
# Note: In LangGraph, add parallel edges from a shared entry or use Send API
parallel_graph.add_edge("research_a", "synthesize")
parallel_graph.add_edge("research_b", "synthesize")
parallel_graph.add_edge("research_c", "synthesize")
parallel_graph.add_edge("synthesize", END)

The Send API for Dynamic Parallelism

When you don't know how many parallel tasks you need until runtime:

from langgraph.types import Send

class MapReduceState(TypedDict):
    documents: list
    summaries: Annotated[list, operator.add]  # Accumulated from all branches
    final_summary: str

def summarize_document(state: dict) -> dict:
    """Summarizes a single document — called in parallel for each."""
    summary = summarize_agent.invoke(state["document"])
    return {"summaries": [summary]}

def merge_summaries(state: MapReduceState) -> dict:
    final = synthesis_agent.invoke(f"Combine these summaries:\n{state['summaries']}")
    return {"final_summary": final}

def route_to_parallel_summarization(state: MapReduceState):
    """Dynamically create parallel branches — one per document."""
    return [
        Send("summarize_document", {"document": doc})
        for doc in state["documents"]
    ]

map_reduce = StateGraph(MapReduceState)
map_reduce.add_node("summarize_document", summarize_document)
map_reduce.add_node("merge", merge_summaries)

map_reduce.add_conditional_edges(
    "__start__",
    route_to_parallel_summarization,
    ["summarize_document"]
)
map_reduce.add_edge("summarize_document", "merge")
map_reduce.add_edge("merge", END)

# All documents are summarized in parallel, then merged
result = map_reduce.compile().invoke({
    "documents": ["doc1 content", "doc2 content", "doc3 content"],
    "summaries": [],
    "final_summary": ""
})

Timing and Performance

import time
from langchain_community.callbacks import get_openai_callback

def benchmark_sequential_vs_parallel(task: str):
    """Compare time and cost for sequential vs parallel execution."""
    
    # Sequential
    start = time.time()
    with get_openai_callback() as cb_seq:
        sequential_result = sequential_graph.invoke({"task": task})
    sequential_time = time.time() - start
    
    # Parallel
    start = time.time()
    with get_openai_callback() as cb_par:
        parallel_result = parallel_graph.invoke({"task": task})
    parallel_time = time.time() - start
    
    print(f"Sequential: {sequential_time:.1f}s, ${cb_seq.total_cost:.4f}")
    print(f"Parallel:   {parallel_time:.1f}s, ${cb_par.total_cost:.4f}")
    print(f"Speedup: {sequential_time/parallel_time:.1f}x")
    # Usually same cost, much faster for independent tasks

When Each Pattern Makes Sense

PatternWhen to Use
Sequential pipelineEach step depends on the previous step's output
Iterative refinementQuality improvement cycles (write → critique → revise)
Fan-out / fan-inSame task for multiple independent inputs
Dynamic parallel (Send)Process a variable-length list of items
MixedComplex tasks with both sequential and parallel phases

The general rule: Map out data dependencies first. If A → B → C (B needs A's output, C needs B's), sequential is your only option. If A, B, and C are all independent, parallel saves time.

Next lesson: Human-in-the-loop — building agents that can pause and ask for human input.

📱

Get this course's notes on Telegram!

Free cheat sheets, summaries & practice exercises

Get Notes Free →
!