5 Multi-Agent Orchestration Patterns (Sequential, Parallel, Branching)
Master sequential, parallel, conditional, loop, and DAG orchestration patterns for multi-agent systems. Full Python and LangGraph code examples with comparison table.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
Orchestration is how you answer the question: "In what order do the agents run, and how do they exchange results?"
It sounds simple, but the answer shapes everything about your system's performance, cost, and reliability. A sequential pipeline that takes 60 seconds could run in 20 with parallel execution. A conditional branching pattern can save 40% of your API cost by only calling expensive agents when they're actually needed.
This article covers five orchestration patterns — sequential, parallel, conditional, loop, and DAG-based — with full Python implementations and LangGraph examples for each. By the end, you'll have a clear mental model for choosing the right pattern and the code to implement it.
For background on why orchestration decisions matter, multi-agent architecture patterns explains how orchestration fits into broader system design.
The Five Patterns at a Glance
Pattern 1: Sequential
A → B → C → D
Each agent waits for the previous to complete.
Pattern 2: Parallel
A ──→
B ──→ [Merge] → D
C ──→
Pattern 3: Conditional
┌─→ B (if condition X)
A → Router
└─→ C (if condition Y)
Pattern 4: Loop
A → B → [Evaluator] → A (if retry needed) → Final Output
Pattern 5: DAG
A → B → D
↘ C ↗
Pattern 1: Sequential Orchestration
The simplest pattern. Agent A runs, its output becomes Agent B's input, which becomes Agent C's input, and so on. Clear dependency chain, no concurrency.
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage
from typing import Dict, Any
import asyncio
class SequentialPipeline:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
async def _run_agent(self, role: str, instructions: str, input_text: str) -> str:
messages = [
SystemMessage(content=f"You are a {role}. {instructions}"),
HumanMessage(content=input_text)
]
response = await self.llm.ainvoke(messages)
return response.content
async def run(self, initial_input: str) -> Dict[str, str]:
results = {}
# Stage 1: Research
print("[Stage 1] Researching...")
research = await self._run_agent(
"researcher",
"Gather key facts and context on the topic. Be thorough.",
initial_input
)
results["research"] = research
# Stage 2: Analysis (takes research output)
print("[Stage 2] Analyzing...")
analysis = await self._run_agent(
"analyst",
"Analyze the provided research. Identify key patterns and implications.",
f"Topic: {initial_input}\n\nResearch:\n{research}"
)
results["analysis"] = analysis
# Stage 3: Writing (takes analysis output)
print("[Stage 3] Writing...")
report = await self._run_agent(
"writer",
"Write a clear, well-structured report based on the analysis.",
f"Analysis:\n{analysis}"
)
results["report"] = report
return results
async def demo_sequential():
pipeline = SequentialPipeline()
results = await pipeline.run("The impact of LLM agents on software development workflows")
print("\n--- Final Report ---")
print(results["report"][:500])
asyncio.run(demo_sequential())
When to use: Strict data dependencies (Stage 2 genuinely needs Stage 1's output), simple pipelines where predictability matters more than speed, debugging-sensitive systems.
Performance profile: Total time = sum of all agent times. No concurrency benefit.
Pattern 2: Parallel Orchestration
Independent agents run simultaneously. A merge step combines their outputs.
async def run_parallel_pipeline(topic: str) -> Dict[str, str]:
llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
async def run_agent(role: str, instructions: str, input_text: str) -> str:
response = await llm.ainvoke([
SystemMessage(content=f"You are a {role}. {instructions}"),
HumanMessage(content=input_text)
])
return response.content
# Stage 1: Three independent research tasks run in parallel
print("[Stage 1] Running parallel research tasks...")
technical_task = run_agent(
"technical researcher",
"Focus on technical implementation details and code examples.",
topic
)
business_task = run_agent(
"business analyst",
"Focus on business impact, ROI, and adoption trends.",
topic
)
risk_task = run_agent(
"risk assessor",
"Focus on risks, limitations, and failure modes.",
topic
)
# Run all three concurrently
technical, business, risk = await asyncio.gather(
technical_task, business_task, risk_task
)
print(f"[Stage 1 Complete] Technical: {len(technical)} chars, "
f"Business: {len(business)} chars, Risk: {len(risk)} chars")
# Stage 2: Synthesis (waits for all parallel tasks)
print("[Stage 2] Synthesizing parallel outputs...")
synthesis = await run_agent(
"synthesis specialist",
"Combine the technical, business, and risk perspectives into a comprehensive report.",
f"""Topic: {topic}
Technical Findings:
{technical}
Business Analysis:
{business}
Risk Assessment:
{risk}"""
)
return {
"technical": technical,
"business": business,
"risk": risk,
"synthesis": synthesis
}
# Parallel is ~3x faster than sequential for the research stage
asyncio.run(run_parallel_pipeline("Multi-agent AI systems in enterprise automation"))
Performance comparison:
- Sequential (3 research agents + 1 synthesis): ~40-60s total
- Parallel (3 concurrent research agents + 1 synthesis): ~20-25s total
When to use: Independent subtasks that don't depend on each other, batch processing (10 documents analyzed simultaneously), any situation where you want to reduce wall-clock time at the cost of slightly higher peak token usage.
Pattern 3: Conditional (Branching) Orchestration
A router agent evaluates the task and routes to different specialist agents based on the content.
from enum import Enum
class TaskType(Enum):
TECHNICAL = "technical"
CREATIVE = "creative"
RESEARCH = "research"
GENERAL = "general"
class ConditionalRouter:
def __init__(self):
self.router_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
self.specialist_llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
async def _classify_task(self, task: str) -> TaskType:
"""Router agent determines which specialist should handle the task."""
response = await self.router_llm.ainvoke([
SystemMessage(content="""Classify the task as one of: technical, creative, research, general.
Output ONLY the type word."""),
HumanMessage(content=task)
])
try:
return TaskType(response.content.strip().lower())
except ValueError:
return TaskType.GENERAL
async def _handle_technical(self, task: str) -> str:
return (await self.specialist_llm.ainvoke([
SystemMessage(content="You are a technical expert. Provide code examples and technical depth."),
HumanMessage(content=task)
])).content
async def _handle_creative(self, task: str) -> str:
creative_llm = ChatOpenAI(model="gpt-4o", temperature=0.8)
return (await creative_llm.ainvoke([
SystemMessage(content="You are a creative writer. Be imaginative and engaging."),
HumanMessage(content=task)
])).content
async def _handle_research(self, task: str) -> str:
return (await self.specialist_llm.ainvoke([
SystemMessage(content="You are a research specialist. Be thorough, cite sources, be accurate."),
HumanMessage(content=task)
])).content
async def _handle_general(self, task: str) -> str:
return (await self.specialist_llm.ainvoke([
SystemMessage(content="Answer clearly and helpfully."),
HumanMessage(content=task)
])).content
async def route_and_execute(self, task: str) -> Dict[str, Any]:
task_type = await self._classify_task(task)
print(f"[Router] Routing to {task_type.value} specialist")
handlers = {
TaskType.TECHNICAL: self._handle_technical,
TaskType.CREATIVE: self._handle_creative,
TaskType.RESEARCH: self._handle_research,
TaskType.GENERAL: self._handle_general
}
result = await handlers[task_type](task)
return {"task_type": task_type.value, "result": result}
When to use: Heterogeneous task queues, customer support systems, content platforms where different content types need different treatment.
Cost benefit: Routing to cheaper models for simpler tasks, expensive models only for complex ones, can reduce average cost by 30-50%.
Pattern 4: Loop (Critique-Revise) Orchestration
An agent generates output, an evaluator critiques it, and the generator revises until quality criteria are met.
class CritiqueReviseLoop:
def __init__(self, max_iterations: int = 3, quality_threshold: float = 0.8):
self.max_iterations = max_iterations
self.quality_threshold = quality_threshold
self.generator_llm = ChatOpenAI(model="gpt-4o", temperature=0.4)
self.critic_llm = ChatOpenAI(model="gpt-4o", temperature=0)
async def _generate(self, task: str, previous: str = "", feedback: str = "") -> str:
context = task
if previous and feedback:
context += f"\n\nPrevious attempt:\n{previous}\n\nFeedback to address:\n{feedback}"
return (await self.generator_llm.ainvoke([
SystemMessage(content="Generate high-quality output. If given feedback, address all points."),
HumanMessage(content=context)
])).content
async def _critique(self, task: str, output: str) -> Dict[str, Any]:
response = await self.critic_llm.ainvoke([
SystemMessage(content="""Evaluate the output against the task requirements.
Output JSON: {"score": 0.0-1.0, "feedback": "specific issues to fix", "approved": true/false}
Score 0.8+ means approved."""),
HumanMessage(content=f"Task: {task}\n\nOutput to evaluate:\n{output}")
])
import json
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
try:
return json.loads(content)
except Exception:
return {"score": 0.5, "feedback": "Could not parse evaluation", "approved": False}
async def run(self, task: str) -> Dict[str, Any]:
current_output = ""
feedback = ""
history = []
for iteration in range(self.max_iterations):
print(f"[Loop] Iteration {iteration + 1}/{self.max_iterations}")
# Generate
current_output = await self._generate(task, current_output, feedback)
# Critique
evaluation = await self._critique(task, current_output)
score = evaluation.get("score", 0)
feedback = evaluation.get("feedback", "")
history.append({
"iteration": iteration + 1,
"score": score,
"feedback": feedback
})
print(f"[Loop] Score: {score:.2f}, Approved: {evaluation.get('approved', False)}")
if evaluation.get("approved") or score >= self.quality_threshold:
print(f"[Loop] Quality threshold reached at iteration {iteration + 1}")
break
return {
"final_output": current_output,
"iterations": len(history),
"final_score": history[-1]["score"] if history else 0,
"history": history
}
Pattern 5: DAG-Based Orchestration with LangGraph
LangGraph implements DAG orchestration natively. Nodes are agents or functions, edges are transitions, conditional edges enable branching.
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage
from typing import TypedDict, Annotated
import operator
# Define shared state
class ResearchState(TypedDict):
topic: str
plan: str
search_results: str
draft: str
reviewed_draft: str
final_report: str
iteration: int
# Node functions (each is an agent)
def planner_node(state: ResearchState) -> ResearchState:
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
response = llm.invoke([
SystemMessage(content="Create a research plan with 4-5 specific questions."),
HumanMessage(content=f"Topic: {state['topic']}")
])
return {"plan": response.content}
def searcher_node(state: ResearchState) -> ResearchState:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
response = llm.invoke([
SystemMessage(content="Find information for each research question."),
HumanMessage(content=f"Plan:\n{state['plan']}")
])
return {"search_results": response.content}
def writer_node(state: ResearchState) -> ResearchState:
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
response = llm.invoke([
SystemMessage(content="Write a comprehensive report from the research."),
HumanMessage(content=f"Research:\n{state['search_results']}")
])
return {"draft": response.content, "iteration": state.get("iteration", 0) + 1}
def reviewer_node(state: ResearchState) -> ResearchState:
llm = ChatOpenAI(model="gpt-4o", temperature=0)
response = llm.invoke([
SystemMessage(content="""Review the draft.
If quality is sufficient, start with 'APPROVED:'.
Otherwise, start with 'REVISE:' and list specific issues."""),
HumanMessage(content=f"Draft:\n{state['draft']}")
])
return {"reviewed_draft": response.content}
def finalizer_node(state: ResearchState) -> ResearchState:
"""Apply reviewer feedback to produce final report."""
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
if state["reviewed_draft"].startswith("APPROVED:"):
return {"final_report": state["draft"]}
response = llm.invoke([
SystemMessage(content="Apply the feedback to improve the draft."),
HumanMessage(content=f"Draft:\n{state['draft']}\n\nFeedback:\n{state['reviewed_draft']}")
])
return {"final_report": response.content}
def should_revise(state: ResearchState) -> str:
"""Conditional edge: route based on reviewer decision."""
if state["reviewed_draft"].startswith("APPROVED:"):
return "finalize"
elif state.get("iteration", 0) >= 3:
return "finalize" # Force completion after 3 iterations
else:
return "revise"
# Build the graph
def build_research_graph():
workflow = StateGraph(ResearchState)
# Add nodes
workflow.add_node("planner", planner_node)
workflow.add_node("searcher", searcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("reviewer", reviewer_node)
workflow.add_node("finalizer", finalizer_node)
# Add edges (sequential dependencies)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "searcher")
workflow.add_edge("searcher", "writer")
workflow.add_edge("writer", "reviewer")
# Conditional edge from reviewer
workflow.add_conditional_edges(
"reviewer",
should_revise,
{
"finalize": "finalizer",
"revise": "writer" # Loop back for revision
}
)
workflow.add_edge("finalizer", END)
return workflow.compile()
# Run the graph
graph = build_research_graph()
result = graph.invoke({"topic": "State of AI agent frameworks in 2026", "iteration": 0})
print(result["final_report"][:500])
Orchestration Pattern Comparison Table
| Pattern | Complexity | Latency | Cost | Best For | Debugging |
|---|---|---|---|---|---|
| Sequential | Very Low | High (additive) | Low | Simple pipelines | Easy |
| Parallel | Low | Low (concurrent) | Medium | Independent tasks | Easy |
| Conditional | Medium | Low-Medium | Low-Medium | Heterogeneous tasks | Medium |
| Loop | Medium | Variable | Variable | Quality iteration | Medium |
| DAG | High | Optimized | Optimized | Complex workflows | Hard |
Choosing the Right Pattern
A quick decision guide:
- All tasks depend on each other, strict order required? Sequential.
- Tasks are independent and you want speed? Parallel.
- Different inputs need different handlers? Conditional.
- You need output quality iteration? Loop.
- Complex mix of dependencies, conditions, and parallels? DAG with LangGraph.
Most real systems use a hybrid: parallel at the research stage, sequential for planning-to-writing, conditional routing for different query types, and a loop for final quality check.
The AutoGen multi-agent group chat tutorial shows how AutoGen's GroupChat implements these patterns through conversation dynamics. The multi-agent research team article shows a sequential-to-parallel hybrid in a complete working system.
For LangGraph-specific patterns, the official documentation at langchain-ai.github.io/langgraph covers the state machine model in depth.
Conclusion
Orchestration pattern choice has a direct impact on performance, cost, and reliability. Sequential is where to start — it's the easiest to understand and debug. Parallel execution is the first upgrade once you identify independent tasks. Conditional branching improves cost efficiency. The loop pattern improves output quality. DAG-based orchestration with LangGraph gives you the full control you need for production systems.
The LangChain RAG pipeline article shows how these orchestration patterns apply to document processing and retrieval workflows if you're building knowledge-intensive agent systems.
Start with the simplest pattern that solves your problem. Add complexity only where you've measured that it's needed.
Frequently Asked Questions
What is sequential orchestration in multi-agent systems? Sequential orchestration runs agents one after another, where each agent receives the output of the previous one as input. Simple to implement and debug, but doesn't take advantage of parallelism. Good when tasks have strict dependencies.
How does parallel agent execution work? In parallel execution, multiple agents run simultaneously on independent tasks, then their outputs are merged. This can dramatically reduce total execution time when tasks don't depend on each other. Implemented using asyncio.gather() in Python or parallel nodes in LangGraph.
What is a DAG orchestration pattern for agents? A DAG (Directed Acyclic Graph) pattern defines agent execution order as a graph where nodes are agents and edges represent dependencies. An agent runs only after all its upstream dependencies complete. LangGraph implements this natively, making complex conditional and branching workflows manageable.
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
5 LangChain Checkpointers: Save and Load Agent State (2026)
Learn how to persist and restore LangChain agent state using InMemoryCheckpointer, SqliteSaver, and PostgresSaver with full Python code examples.
10 Multi-Agent Frameworks Compared (AutoGen, CrewAI, LangGraph, MetaGPT)
AutoGen, CrewAI, LangGraph, MetaGPT — compare all 10 major multi-agent frameworks on GitHub stars, ease of use, and real strengths. Pick the right one for your project.
10 AI Automation Ideas for Small Business (Save 20 Hours a Week)
Discover 10 actionable AI automation ideas for small business that can save you 20+ hours weekly with practical tools and real cost breakdowns.
5 AI Automation Platforms Compared (Make, n8n, Pabbly, Activepieces)
Compare Make, n8n, Pabbly, and Activepieces on pricing, AI features, self-hosting, and ease of use. Honest picks for every budget and technical skill level in 2026.