7 Communication Protocols in Multi-Agent Systems (Message Passing, Blackboard)
From message passing to publish-subscribe and contract net — here are the 7 communication protocols used in multi-agent systems, with Python examples for each.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
The way agents talk to each other is just as important as what they say. I've seen well-designed agent systems fall apart because the communication layer was an afterthought — agents stepping on each other's messages, duplicating work, or producing conflicting outputs because nobody thought through how information flows between them.
Communication protocols are the connective tissue of any multi-agent system. This article covers seven distinct protocols with Python implementations for each. Not all of them are equally practical for LLM-based agents, but understanding the full landscape helps you make better design decisions.
If you're new to multi-agent systems, start with multi-agent systems explained for the foundational concepts.
Why Communication Protocol Matters
Here's a concrete example of why protocol choice matters: suppose you have three agents working on a research task. Agent A finds a fact. Should it:
a) Send the fact directly to Agent B who needs it? b) Post it to a shared workspace that all agents can see? c) Broadcast it to everyone and let interested agents subscribe? d) Wait for Agent B to request it?
Each of these is a different protocol. Each has different implications for latency, coupling, fault tolerance, and scalability. The "right" answer depends on your system's requirements.
According to a 2023 paper on distributed AI coordination (Wooldridge & Jennings), choice of communication protocol accounts for up to 40% of variance in multi-agent system performance on cooperative tasks. (source)
Protocol 1: Direct Message Passing (Point-to-Point)
The simplest and most common protocol. Agent A sends a message directly to Agent B. Synchronous or asynchronous.
Structure: Messages contain a sender, recipient, performative (intent), and content.
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
import asyncio
from collections import defaultdict
class Performative(Enum):
REQUEST = "request"
INFORM = "inform"
PROPOSE = "propose"
ACCEPT = "accept"
REJECT = "reject"
QUERY = "query"
CONFIRM = "confirm"
@dataclass
class ACLMessage:
"""FIPA ACL-inspired message structure."""
sender: str
receiver: str
performative: Performative
content: Any
conversation_id: Optional[str] = None
reply_to: Optional[str] = None
class MessagePassingAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.inbox: asyncio.Queue = asyncio.Queue()
self.message_log = []
async def send(self, recipient: 'MessagePassingAgent', performative: Performative, content: Any):
msg = ACLMessage(
sender=self.agent_id,
receiver=recipient.agent_id,
performative=performative,
content=content
)
await recipient.inbox.put(msg)
self.message_log.append(("sent", msg))
async def receive(self) -> ACLMessage:
msg = await self.inbox.get()
self.message_log.append(("received", msg))
return msg
# Usage
async def demo_message_passing():
agent_a = MessagePassingAgent("researcher")
agent_b = MessagePassingAgent("writer")
# Researcher sends findings to writer
await agent_a.send(
agent_b,
Performative.INFORM,
{"topic": "async Python", "summary": "asyncio is the standard library for async"}
)
msg = await agent_b.receive()
print(f"Writer received from {msg.sender}: {msg.content}")
asyncio.run(demo_message_passing())
Best for: Tightly coupled agent pairs, request-response patterns, situations where you need explicit accountability for message delivery.
Weakness: Doesn't scale well. If Agent A needs to send to 20 agents, that's 20 direct connections to manage.
Protocol 2: Blackboard System
A shared memory structure where agents read and write. No direct agent-to-agent communication. Agents are triggered by changes to the blackboard that match their interests.
from typing import Callable, Dict, List, Any
import threading
import time
class Blackboard:
def __init__(self):
self._data: Dict[str, Any] = {}
self._watchers: Dict[str, List[Callable]] = defaultdict(list)
self._lock = threading.Lock()
self._history: List[dict] = []
def write(self, key: str, value: Any, author: str):
with self._lock:
self._data[key] = value
entry = {"key": key, "value": value, "author": author, "time": time.time()}
self._history.append(entry)
# Notify watchers
for watcher in self._watchers.get(key, []):
watcher(key, value, author)
def read(self, key: str) -> Any:
with self._lock:
return self._data.get(key)
def watch(self, key: str, callback: Callable):
"""Register a callback for when a key changes."""
self._watchers[key].append(callback)
def get_history(self) -> List[dict]:
return self._history.copy()
# Example agents using blackboard
blackboard = Blackboard()
def research_agent():
"""Writes findings to blackboard."""
print("Researcher: gathering data...")
time.sleep(0.5) # Simulate work
blackboard.write("research_findings", {
"sources": ["arxiv.org/abs/1234", "papers.org/456"],
"key_points": ["Transformers scale well", "Attention is quadratic"],
"status": "complete"
}, author="Researcher")
def writer_agent(key: str, value: Any, author: str):
"""Triggered when research_findings is updated."""
if key == "research_findings" and value.get("status") == "complete":
print(f"Writer: research done (by {author}), starting draft...")
blackboard.write("draft", {
"content": f"Based on findings: {value['key_points']}",
"status": "draft"
}, author="Writer")
# Writer watches for research completion
blackboard.watch("research_findings", writer_agent)
def reviewer_agent(key: str, value: Any, author: str):
"""Triggered when a draft appears."""
if key == "draft" and value.get("status") == "draft":
print(f"Reviewer: reviewing draft from {author}...")
blackboard.write("review", {"approved": True, "notes": "Looks good"}, author="Reviewer")
blackboard.watch("draft", reviewer_agent)
# Run the pipeline
research_agent()
print("Blackboard history:", blackboard.get_history())
Best for: Problems with multiple independent "experts" that contribute to a shared solution. Classic AI planning, diagnosis systems, NLP annotation pipelines.
Weakness: The blackboard becomes a global state that's hard to reason about. Debugging write conflicts is painful.
Protocol 3: Publish-Subscribe (Pub/Sub)
Agents publish messages to named topics. Other agents subscribe to topics they care about. No direct coupling between publishers and subscribers — they don't know about each other.
from collections import defaultdict
from typing import Callable, Any
import threading
class EventBus:
"""Simple in-process pub/sub event bus."""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
self._lock = threading.Lock()
def subscribe(self, topic: str, handler: Callable):
with self._lock:
self._subscribers[topic].append(handler)
print(f"Subscribed to '{topic}'")
def publish(self, topic: str, payload: Any, publisher: str = "unknown"):
print(f"[{publisher}] published to '{topic}': {str(payload)[:50]}...")
handlers = self._subscribers.get(topic, [])
for handler in handlers:
handler(payload)
def unsubscribe(self, topic: str, handler: Callable):
with self._lock:
self._subscribers[topic] = [
h for h in self._subscribers[topic] if h != handler
]
# Setup
bus = EventBus()
# Agent subscriptions
def analyst_on_data_ready(data):
print(f"Analyst processing: {data['topic']}")
bus.publish("analysis_complete", {"result": "processed"}, publisher="Analyst")
def writer_on_analysis(analysis):
print(f"Writer drafting based on analysis: {analysis}")
bus.subscribe("data_ready", analyst_on_data_ready)
bus.subscribe("analysis_complete", writer_on_analysis)
# Trigger pipeline
bus.publish("data_ready", {"topic": "AI trends 2026", "raw_data": "..."}, publisher="DataCollector")
Best for: Event-driven agent architectures, fan-out patterns where one event should trigger multiple agents, loosely coupled systems where agents may come and go dynamically.
Weakness: Hard to trace causality when things go wrong. Topic design requires discipline — bad topic naming leads to confused subscriptions.
Protocol 4: Request-Reply (Synchronous)
Classic request-response. Agent A sends a request and blocks until Agent B replies. Simple, predictable, but can create bottlenecks.
import asyncio
import uuid
from typing import Dict
class RequestReplyBus:
def __init__(self):
self._pending: Dict[str, asyncio.Future] = {}
self._handlers: Dict[str, Callable] = {}
def register_handler(self, agent_id: str, handler: Callable):
self._handlers[agent_id] = handler
async def request(self, target_agent: str, query: Any, timeout: float = 30.0) -> Any:
request_id = str(uuid.uuid4())
loop = asyncio.get_event_loop()
future = loop.create_future()
self._pending[request_id] = future
handler = self._handlers.get(target_agent)
if not handler:
raise ValueError(f"No handler registered for {target_agent}")
# Process asynchronously
asyncio.create_task(self._process_request(handler, query, request_id))
try:
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
del self._pending[request_id]
raise TimeoutError(f"Request to {target_agent} timed out")
async def _process_request(self, handler: Callable, query: Any, request_id: str):
result = await handler(query) if asyncio.iscoroutinefunction(handler) else handler(query)
if request_id in self._pending:
self._pending[request_id].set_result(result)
del self._pending[request_id]
Protocol 5: Contract Net Protocol (CNP)
A task-allocation mechanism. Manager announces task, workers bid, manager awards contract. Excellent for dynamic load balancing.
@dataclass
class TaskAnnouncement:
task_id: str
description: str
required_skills: List[str]
deadline: float
@dataclass
class Bid:
agent_id: str
task_id: str
confidence: float # 0.0 to 1.0
estimated_time: float # seconds
class ContractNetManager:
def __init__(self, manager_id: str):
self.manager_id = manager_id
self.workers: List['ContractNetWorker'] = []
def register_worker(self, worker: 'ContractNetWorker'):
self.workers.append(worker)
async def allocate_task(self, task: TaskAnnouncement) -> Optional[str]:
# Step 1: Announce task to all workers
bids = []
for worker in self.workers:
bid = await worker.evaluate_bid(task)
if bid:
bids.append(bid)
if not bids:
return None
# Step 2: Select best bid (highest confidence, lowest time)
winner = max(bids, key=lambda b: b.confidence / (b.estimated_time + 1))
# Step 3: Award contract
for worker in self.workers:
if worker.worker_id == winner.agent_id:
return await worker.execute_task(task)
return None
class ContractNetWorker:
def __init__(self, worker_id: str, skills: List[str]):
self.worker_id = worker_id
self.skills = skills
async def evaluate_bid(self, task: TaskAnnouncement) -> Optional[Bid]:
skill_match = len(set(task.required_skills) & set(self.skills)) / len(task.required_skills)
if skill_match == 0:
return None
return Bid(
agent_id=self.worker_id,
task_id=task.task_id,
confidence=skill_match,
estimated_time=10.0 / skill_match
)
async def execute_task(self, task: TaskAnnouncement) -> str:
return f"{self.worker_id} completed: {task.description}"
Best for: Heterogeneous agent pools, dynamic task allocation, resource-constrained systems.
Protocol 6: Gossip Protocol
Agents randomly share state with neighbors. Information propagates through the network without central coordination. Used more in distributed systems than LLM pipelines, but relevant for large-scale simulations.
import random
class GossipAgent:
def __init__(self, agent_id: str, initial_state: dict):
self.agent_id = agent_id
self.state = initial_state
self.peers: List['GossipAgent'] = []
self.round_count = 0
def add_peer(self, peer: 'GossipAgent'):
self.peers.append(peer)
def gossip_round(self):
"""Share state with a random peer."""
if not self.peers:
return
target = random.choice(self.peers)
# Merge states (last-write-wins for this example)
merged = {**target.state, **self.state}
self.state = merged
target.state = merged.copy()
self.round_count += 1
Protocol 7: FIPA ACL (Formal Standard)
The Foundation for Intelligent Physical Agents Agent Communication Language is the formal standard for agent communication. Used in academic research and some enterprise systems.
class FIPAACLMessage:
"""Implementation of FIPA ACL message structure."""
PERFORMATIVES = [
"accept-proposal", "agree", "cancel", "cfp",
"confirm", "disconfirm", "failure", "inform",
"inform-if", "inform-ref", "not-understood",
"propagate", "propose", "proxy", "query-if",
"query-ref", "refuse", "reject-proposal", "request",
"request-when", "request-whenever", "subscribe"
]
def __init__(self, performative: str, sender: str, receiver: str, content: str):
if performative not in self.PERFORMATIVES:
raise ValueError(f"Invalid performative: {performative}")
self.performative = performative
self.sender = sender
self.receiver = receiver
self.content = content
self.language = "FIPA-SL"
self.ontology = None
self.conversation_id = str(uuid.uuid4())
def to_dict(self) -> dict:
return {
"performative": self.performative,
"sender": self.sender,
"receiver": self.receiver,
"content": self.content,
"language": self.language,
"conversation-id": self.conversation_id
}
Protocol Comparison Table
| Protocol | Coupling | Scalability | Debugging | Latency | Best Use Case |
|---|---|---|---|---|---|
| Direct Message Passing | Tight | Low | Easy | Low | Simple pipelines |
| Blackboard | Loose | Medium | Hard | Low | Expert collaboration |
| Publish-Subscribe | Very Loose | High | Medium | Low | Event-driven systems |
| Request-Reply | Tight | Low | Easy | Medium | Synchronous APIs |
| Contract Net | Loose | High | Medium | Medium | Dynamic allocation |
| Gossip | Very Loose | Very High | Very Hard | High | Large simulations |
| FIPA ACL | Formal | Medium | Medium | Medium | Formal/academic MAS |
For LLM-based multi-agent systems like those built with AutoGen or CrewAI, you'll almost always use some variant of message passing or pub/sub. The multi-agent architecture patterns article explores how these protocols map to different architectural choices.
Which Protocol Should You Use?
For most practical LLM multi-agent systems:
- Building a sequential pipeline? Direct message passing. Simple, traceable, debuggable.
- Building a collaborative research system? Blackboard with typed fields for different content types.
- Building an event-driven system where agents react to signals? Pub/sub with well-defined topics.
- Building a system with heterogeneous agents and variable task loads? Contract Net.
The multi-agent research team article shows a practical implementation using message passing and shared state together, which is the combination you'll find in most real LangChain and AutoGen pipelines.
The build AI agent with LangChain guide also covers how LangChain's runnable interface provides an implicit message-passing layer between chain components.
Conclusion
Communication protocol isn't just an implementation detail — it shapes how your agents coordinate, fail, and scale. Direct message passing is where most developers should start: explicit, traceable, easy to debug. Move to pub/sub when you need decoupling, and consider Contract Net when you have dynamic task allocation requirements.
The pattern that best fits LLM-based MAS in 2026 is a hybrid: structured message passing for explicit handoffs, with a shared state store (blackboard variant) for context accumulation. Most mature frameworks like LangGraph implement exactly this without calling it by name.
Explore the AutoGen multi-agent group chat tutorial to see these protocols in action with a real working system.
Frequently Asked Questions
What is message passing in multi-agent systems? Message passing is when agents communicate by sending structured messages directly to each other (point-to-point) or through a message broker. Each message contains sender, recipient, content, and often a performative (intent) like REQUEST, INFORM, or PROPOSE.
What is the blackboard system in AI? A blackboard system is a shared memory architecture where multiple agents (called knowledge sources) read from and write to a common data structure (the blackboard). Agents are triggered when relevant data appears on the blackboard. Good for problems where multiple experts need to collaborate on a solution.
What is the Contract Net Protocol in multi-agent systems? The Contract Net Protocol (CNP) is a task-sharing mechanism where a manager agent announces a task, worker agents submit bids with their capability to handle it, and the manager awards the contract to the best bidder. It enables dynamic task allocation in decentralized systems.
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
10 AI Automation Ideas for Small Business (Save 20 Hours a Week)
Discover 10 actionable AI automation ideas for small business that can save you 20+ hours weekly with practical tools and real cost breakdowns.
5 AI Automation Platforms Compared (Make, n8n, Pabbly, Activepieces)
Compare Make, n8n, Pabbly, and Activepieces on pricing, AI features, self-hosting, and ease of use. Honest picks for every budget and technical skill level in 2026.
7 AI Automation Use Cases for Customer Support (Ticketing + Chatbots)
Explore 7 high-impact AI customer support automation use cases including ticketing, chatbots, and escalation routing with platform comparisons and real ROI data.
How to Automate Data Entry into Google Sheets with AI
Automate data entry into Google Sheets using AI with Google Apps Script, Make.com workflows, and Zapier integrations. Full script examples and tool comparisons included.