Parallel & Sequential Agent Workflows
Parallel and Sequential Agent Execution
How you arrange agents — whether they work one at a time or simultaneously — dramatically affects both performance and cost. This lesson covers the patterns for orchestrating multiple agents efficiently.
Sequential vs. Parallel: The Core Trade-off
Sequential: Agent B starts only after Agent A finishes. Agent B can use Agent A's output. Parallel: Agent A and Agent B run simultaneously. Neither can use the other's output.
The choice depends on data dependencies:
- If Agent B needs Agent A's output → Sequential
- If Agent A and Agent B are independent → Parallel (usually faster)
Sequential Patterns
Pipeline: Each Agent Enriches the Previous Output
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class PipelineState(TypedDict):
input: str
research: str # Filled by research agent
analysis: str # Filled by analysis agent (uses research)
report: str # Filled by writing agent (uses analysis)
def research_node(state: PipelineState) -> PipelineState:
"""Step 1: Gather information."""
result = research_agent.invoke(state["input"])
return {**state, "research": result}
def analysis_node(state: PipelineState) -> PipelineState:
"""Step 2: Analyze the research (must come after research)."""
analysis_input = f"Research findings:\n{state['research']}\n\nAnalyze these findings."
result = analysis_agent.invoke(analysis_input)
return {**state, "analysis": result}
def writing_node(state: PipelineState) -> PipelineState:
"""Step 3: Write the report (uses both research and analysis)."""
writing_input = f"""
Research: {state['research']}
Analysis: {state['analysis']}
Write a comprehensive report.
"""
result = writing_agent.invoke(writing_input)
return {**state, "report": result}
# Sequential pipeline — strict ordering
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("writing", writing_node)
pipeline.set_entry_point("research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "writing")
pipeline.add_edge("writing", END)
graph = pipeline.compile()
Iterative Refinement: Sequential with Feedback Loops
class RefinementState(TypedDict):
task: str
draft: str
critique: str
iteration: int
max_iterations: int
done: bool
def write_draft(state: RefinementState) -> RefinementState:
"""Generate or revise the draft."""
if state["iteration"] == 0:
draft = writing_agent.invoke(f"Write: {state['task']}")
else:
draft = writing_agent.invoke(
f"Revise this draft based on the critique:\n\nDraft:\n{state['draft']}\n\nCritique:\n{state['critique']}"
)
return {**state, "draft": draft}
def critique_draft(state: RefinementState) -> RefinementState:
"""Evaluate the draft."""
critique = critique_agent.invoke(
f"Critique this draft on factual accuracy, clarity, and completeness:\n{state['draft']}"
)
return {**state, "critique": critique, "iteration": state["iteration"] + 1}
def should_continue_refinement(state: RefinementState) -> str:
if state["iteration"] >= state["max_iterations"]:
return "done"
if "no significant issues" in state.get("critique", "").lower():
return "done"
return "revise"
refine_graph = StateGraph(RefinementState)
refine_graph.add_node("write", write_draft)
refine_graph.add_node("critique", critique_draft)
refine_graph.set_entry_point("write")
refine_graph.add_edge("write", "critique")
refine_graph.add_conditional_edges(
"critique",
should_continue_refinement,
{"revise": "write", "done": END}
)
graph = refine_graph.compile()
result = graph.invoke({"task": "Write a competitive analysis", "iteration": 0, "max_iterations": 3, "done": False, "draft": "", "critique": ""})
Parallel Patterns
Fan-Out / Fan-In: Parallel Research, Combined Synthesis
from typing import TypedDict, Annotated, List
import operator
class ParallelResearchState(TypedDict):
topic: str
competitor_a_research: str
competitor_b_research: str
competitor_c_research: str
synthesis: str
# Three independent research tasks run in parallel
def research_competitor_a(state: ParallelResearchState) -> dict:
result = research_agent.invoke(f"Research {state['topic']} focusing on Company A")
return {"competitor_a_research": result}
def research_competitor_b(state: ParallelResearchState) -> dict:
result = research_agent.invoke(f"Research {state['topic']} focusing on Company B")
return {"competitor_b_research": result}
def research_competitor_c(state: ParallelResearchState) -> dict:
result = research_agent.invoke(f"Research {state['topic']} focusing on Company C")
return {"competitor_c_research": result}
def synthesize_research(state: ParallelResearchState) -> dict:
"""Runs only after all three parallel nodes complete."""
synthesis = synthesis_agent.invoke(f"""
Synthesize this competitive research:
A: {state['competitor_a_research']}
B: {state['competitor_b_research']}
C: {state['competitor_c_research']}
""")
return {"synthesis": synthesis}
parallel_graph = StateGraph(ParallelResearchState)
parallel_graph.add_node("research_a", research_competitor_a)
parallel_graph.add_node("research_b", research_competitor_b)
parallel_graph.add_node("research_c", research_competitor_c)
parallel_graph.add_node("synthesize", synthesize_research)
parallel_graph.set_entry_point("research_a")
# Note: In LangGraph, add parallel edges from a shared entry or use Send API
parallel_graph.add_edge("research_a", "synthesize")
parallel_graph.add_edge("research_b", "synthesize")
parallel_graph.add_edge("research_c", "synthesize")
parallel_graph.add_edge("synthesize", END)
The Send API for Dynamic Parallelism
When you don't know how many parallel tasks you need until runtime:
from langgraph.types import Send
class MapReduceState(TypedDict):
documents: list
summaries: Annotated[list, operator.add] # Accumulated from all branches
final_summary: str
def summarize_document(state: dict) -> dict:
"""Summarizes a single document — called in parallel for each."""
summary = summarize_agent.invoke(state["document"])
return {"summaries": [summary]}
def merge_summaries(state: MapReduceState) -> dict:
final = synthesis_agent.invoke(f"Combine these summaries:\n{state['summaries']}")
return {"final_summary": final}
def route_to_parallel_summarization(state: MapReduceState):
"""Dynamically create parallel branches — one per document."""
return [
Send("summarize_document", {"document": doc})
for doc in state["documents"]
]
map_reduce = StateGraph(MapReduceState)
map_reduce.add_node("summarize_document", summarize_document)
map_reduce.add_node("merge", merge_summaries)
map_reduce.add_conditional_edges(
"__start__",
route_to_parallel_summarization,
["summarize_document"]
)
map_reduce.add_edge("summarize_document", "merge")
map_reduce.add_edge("merge", END)
# All documents are summarized in parallel, then merged
result = map_reduce.compile().invoke({
"documents": ["doc1 content", "doc2 content", "doc3 content"],
"summaries": [],
"final_summary": ""
})
Timing and Performance
import time
from langchain_community.callbacks import get_openai_callback
def benchmark_sequential_vs_parallel(task: str):
"""Compare time and cost for sequential vs parallel execution."""
# Sequential
start = time.time()
with get_openai_callback() as cb_seq:
sequential_result = sequential_graph.invoke({"task": task})
sequential_time = time.time() - start
# Parallel
start = time.time()
with get_openai_callback() as cb_par:
parallel_result = parallel_graph.invoke({"task": task})
parallel_time = time.time() - start
print(f"Sequential: {sequential_time:.1f}s, ${cb_seq.total_cost:.4f}")
print(f"Parallel: {parallel_time:.1f}s, ${cb_par.total_cost:.4f}")
print(f"Speedup: {sequential_time/parallel_time:.1f}x")
# Usually same cost, much faster for independent tasks
When Each Pattern Makes Sense
| Pattern | When to Use |
|---|---|
| Sequential pipeline | Each step depends on the previous step's output |
| Iterative refinement | Quality improvement cycles (write → critique → revise) |
| Fan-out / fan-in | Same task for multiple independent inputs |
| Dynamic parallel (Send) | Process a variable-length list of items |
| Mixed | Complex tasks with both sequential and parallel phases |
The general rule: Map out data dependencies first. If A → B → C (B needs A's output, C needs B's), sequential is your only option. If A, B, and C are all independent, parallel saves time.
Next lesson: Human-in-the-loop — building agents that can pause and ask for human input.
Get this course's notes on Telegram!
Free cheat sheets, summaries & practice exercises