5 AutoGen Tool Integration Patterns (Sync, Async, Callback)
Master AutoGen's 5 tool integration patterns — synchronous, asynchronous, callback-based, chained, and error-handled — with complete code examples for each.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
Adding custom tools to AutoGen agents is where the framework's flexibility really shows. The built-in capabilities — web search, code execution, file operations — only get you so far. Real production systems need agents that can call your internal APIs, query your databases, trigger your workflows, and integrate with third-party services.
This guide covers five distinct tool integration patterns, each suited to different requirements. By the end, you'll know when to use each one and have working code you can adapt directly.
For foundational AutoGen knowledge before diving into tools, see AI agents explained and AutoGen conversational patterns.
How AutoGen Tool Integration Works
Before the patterns, a quick model of how tools work in AutoGen. An agent's llm_config includes a functions list that describes available tools. When the LLM decides a tool should be called, AutoGen routes the call to a registered function, captures the return value, and feeds it back into the conversation as a tool result.
import autogen
# Basic tool registration pattern
llm_config = {
"config_list": [{"model": "gpt-4o", "api_key": "YOUR_KEY"}],
"functions": [
{
"name": "tool_name",
"description": "What this tool does",
"parameters": {
"type": "object",
"properties": {
"param1": {"type": "string", "description": "What param1 is"}
},
"required": ["param1"]
}
}
]
}
# Function that implements the tool
def tool_name(param1: str) -> str:
return f"Result for: {param1}"
# Register function with the agent
agent = autogen.AssistantAgent(
name="ToolUser",
llm_config=llm_config,
function_map={"tool_name": tool_name}
)
Pattern 1: Synchronous Tool
Best for: Fast operations, database queries, file reads, simple calculations, API calls with response times under 5 seconds.
The synchronous pattern is the most straightforward — the tool runs, returns a result, and the agent continues. No threading, no waiting. Use this as your default until you have a concrete reason to go async.
# pattern_1_sync_tool.py
import autogen
import requests
import json
from datetime import datetime
# --- Tool Implementation ---
def get_stock_price(ticker: str, currency: str = "USD") -> str:
"""Fetch current stock price for a given ticker symbol."""
# Using a free financial API
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}"
params = {"interval": "1d", "range": "1d"}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
result = data["chart"]["result"][0]
meta = result["meta"]
price_info = {
"ticker": ticker,
"current_price": meta.get("regularMarketPrice"),
"previous_close": meta.get("previousClose"),
"currency": meta.get("currency", currency),
"market_state": meta.get("marketState"),
"timestamp": datetime.utcnow().isoformat()
}
return json.dumps(price_info)
except requests.RequestException as e:
return json.dumps({"error": f"API request failed: {str(e)}", "ticker": ticker})
except (KeyError, IndexError) as e:
return json.dumps({"error": f"Data parsing failed: {str(e)}", "ticker": ticker})
def search_web(query: str, num_results: int = 5) -> str:
"""Search the web and return top results."""
# DuckDuckGo instant answer API (no key required)
url = "https://api.duckduckgo.com/"
params = {
"q": query,
"format": "json",
"no_html": 1,
"skip_disambig": 1
}
try:
response = requests.get(url, params=params, timeout=10)
data = response.json()
results = []
# Abstract result
if data.get("Abstract"):
results.append({
"type": "summary",
"text": data["Abstract"][:500],
"source": data.get("AbstractSource")
})
# Related topics
for topic in data.get("RelatedTopics", [])[:num_results]:
if isinstance(topic, dict) and topic.get("Text"):
results.append({
"type": "related",
"text": topic["Text"][:300],
"url": topic.get("FirstURL")
})
return json.dumps(results)
except Exception as e:
return json.dumps({"error": str(e)})
# --- Agent Configuration ---
llm_config = {
"config_list": [{"model": "gpt-4o", "api_key": "YOUR_KEY"}],
"functions": [
{
"name": "get_stock_price",
"description": "Get current stock price and market data for a ticker symbol",
"parameters": {
"type": "object",
"properties": {
"ticker": {"type": "string", "description": "Stock ticker symbol (e.g., AAPL, GOOGL)"},
"currency": {"type": "string", "description": "Currency for price display", "default": "USD"}
},
"required": ["ticker"]
}
},
{
"name": "search_web",
"description": "Search the web for current information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"num_results": {"type": "integer", "description": "Number of results to return", "default": 5}
},
"required": ["query"]
}
}
]
}
assistant = autogen.AssistantAgent(
name="FinancialAnalyst",
llm_config=llm_config,
function_map={
"get_stock_price": get_stock_price,
"search_web": search_web
}
)
user = autogen.UserProxyAgent(
name="User",
human_input_mode="NEVER",
max_consecutive_auto_reply=5
)
user.initiate_chat(
assistant,
message="What's the current stock price of NVIDIA and any recent news about it?"
)
Pattern 2: Asynchronous Tool
Best for: Long-running operations, parallel API calls, I/O-bound tasks that would block the main thread (database queries, external services with high latency).
# pattern_2_async_tool.py
import autogen
import asyncio
import aiohttp
import json
from concurrent.futures import ThreadPoolExecutor
# AsyncIO tools require a bridge to AutoGen's synchronous interface
executor = ThreadPoolExecutor(max_workers=4)
def run_async(coro):
"""Run an async coroutine from synchronous code."""
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
async def fetch_multiple_apis(urls: list[str]) -> list[dict]:
"""Fetch multiple URLs in parallel."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""Fetch a single URL with error handling."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.content_type == "application/json":
data = await response.json()
else:
text = await response.text()
data = {"text": text[:1000]}
return {"url": url, "status": response.status, "data": data}
except Exception as e:
return {"url": url, "error": str(e)}
# Sync wrapper that AutoGen can call
def parallel_web_research(urls: str) -> str:
"""Fetch multiple URLs in parallel. urls should be comma-separated."""
url_list = [u.strip() for u in urls.split(",")]
results = run_async(fetch_multiple_apis(url_list))
return json.dumps(results, indent=2)
async def async_database_query(query: str, db_connection_string: str = None) -> dict:
"""Run an async database query (example with aiopg)."""
# Placeholder — replace with actual async DB driver
await asyncio.sleep(0.1) # Simulate DB latency
return {"query": query, "rows": [], "message": "Replace with actual async DB call"}
# Sync wrapper
def query_database(sql: str) -> str:
"""Execute a database query asynchronously."""
result = run_async(async_database_query(sql))
return json.dumps(result)
llm_config = {
"config_list": [{"model": "gpt-4o", "api_key": "YOUR_KEY"}],
"functions": [
{
"name": "parallel_web_research",
"description": "Fetch multiple web pages in parallel for research",
"parameters": {
"type": "object",
"properties": {
"urls": {
"type": "string",
"description": "Comma-separated list of URLs to fetch"
}
},
"required": ["urls"]
}
}
]
}
research_agent = autogen.AssistantAgent(
name="ResearchAgent",
llm_config=llm_config,
function_map={"parallel_web_research": parallel_web_research}
)
Pattern 3: Callback Tool (Event-Driven)
Best for: Long-running tasks where you want to stream progress back to the agent, webhook-style integrations, tasks with multiple intermediate states.
# pattern_3_callback_tool.py
import autogen
import json
import time
from typing import Callable
class ProgressTracker:
"""Shared state for tracking tool progress."""
def __init__(self):
self.steps = []
self.current_status = "idle"
def update(self, step: str, status: str = "running"):
self.steps.append({"step": step, "timestamp": time.time(), "status": status})
self.current_status = status
print(f"[Progress] {step}")
def make_data_pipeline_tool(callback: Callable[[str], None] = None) -> Callable:
"""Factory that creates a data pipeline tool with an optional progress callback."""
def run_data_pipeline(
source_path: str,
output_path: str,
transformations: str
) -> str:
"""Run a multi-step data transformation pipeline."""
steps_log = []
transform_list = [t.strip() for t in transformations.split(",")]
# Simulate a multi-step pipeline with progress reporting
steps = [
("Loading data", 0.5),
("Validating schema", 0.3),
*[(f"Applying: {t}", 0.8) for t in transform_list],
("Writing output", 0.4),
("Generating report", 0.3)
]
for step_name, duration in steps:
if callback:
callback(f"Step: {step_name}")
time.sleep(duration) # Simulate work
steps_log.append({
"step": step_name,
"status": "completed",
"duration_secs": duration
})
result = {
"source": source_path,
"output": output_path,
"transformations_applied": transform_list,
"steps": steps_log,
"total_time_secs": sum(s[1] for s in steps),
"status": "completed"
}
return json.dumps(result)
return run_data_pipeline
# --- Webhook-style callback tool ---
def register_webhook_tool(endpoint_url: str, event_type: str, payload: str) -> str:
"""Register a webhook and return a tracking ID."""
import requests
import uuid
tracking_id = str(uuid.uuid4())
try:
response = requests.post(
endpoint_url,
json={
"tracking_id": tracking_id,
"event_type": event_type,
"payload": json.loads(payload) if payload else {},
"callback_registered": True
},
timeout=5
)
return json.dumps({
"tracking_id": tracking_id,
"webhook_registered": response.status_code == 200,
"message": "Webhook registered. Check status with tracking ID."
})
except Exception as e:
return json.dumps({
"tracking_id": tracking_id,
"error": str(e),
"webhook_registered": False
})
# Create the tool with a progress callback
tracker = ProgressTracker()
pipeline_tool = make_data_pipeline_tool(callback=tracker.update)
llm_config = {
"config_list": [{"model": "gpt-4o", "api_key": "YOUR_KEY"}],
"functions": [
{
"name": "run_data_pipeline",
"description": "Run a data transformation pipeline with progress tracking",
"parameters": {
"type": "object",
"properties": {
"source_path": {"type": "string"},
"output_path": {"type": "string"},
"transformations": {
"type": "string",
"description": "Comma-separated list of transformations to apply"
}
},
"required": ["source_path", "output_path", "transformations"]
}
}
]
}
pipeline_agent = autogen.AssistantAgent(
name="DataEngineer",
llm_config=llm_config,
function_map={"run_data_pipeline": pipeline_tool}
)
Pattern 4: Chained Tools
Best for: Multi-step workflows where each tool's output becomes the next tool's input, ETL pipelines, sequential API workflows.
# pattern_4_chained_tools.py
import autogen
import json
from typing import Optional
# Tool 1: Search
def search_companies(industry: str, min_revenue: str = "1M") -> str:
"""Find companies in an industry."""
# Simulate company search
companies = [
{"name": "TechStartup A", "revenue": "5M", "employees": 45},
{"name": "AI Firm B", "revenue": "12M", "employees": 120},
{"name": "DataCorp C", "revenue": "3M", "employees": 28}
]
return json.dumps({"industry": industry, "companies": companies})
# Tool 2: Enrich — uses output from Tool 1
def enrich_company_data(company_name: str) -> str:
"""Fetch detailed data for a specific company."""
# Simulate enrichment API call
enriched = {
"company": company_name,
"founded": 2019,
"headquarters": "San Francisco, CA",
"tech_stack": ["Python", "React", "AWS"],
"recent_funding": "Series A - $8M",
"linkedin_url": f"https://linkedin.com/company/{company_name.lower().replace(' ', '-')}",
"contact_email": "info@company.com"
}
return json.dumps(enriched)
# Tool 3: Score — uses output from Tools 1 and 2
def score_lead(
company_name: str,
revenue: str,
employees: int,
recent_funding: str
) -> str:
"""Score a company as a sales lead."""
score = 0
# Revenue scoring
revenue_val = int(revenue.replace("M", "")) if "M" in revenue else 0
if revenue_val > 10:
score += 40
elif revenue_val > 5:
score += 25
else:
score += 10
# Employee scoring
if 50 < employees < 500:
score += 30 # Sweet spot for many SaaS products
elif employees >= 500:
score += 20
else:
score += 15
# Funding scoring
if "Series B" in recent_funding or "Series C" in recent_funding:
score += 30
elif "Series A" in recent_funding:
score += 20
else:
score += 5
return json.dumps({
"company": company_name,
"lead_score": score,
"tier": "A" if score >= 70 else "B" if score >= 50 else "C",
"priority": "High" if score >= 70 else "Medium" if score >= 50 else "Low"
})
# Tool 4: Save — final step in the chain
def save_to_crm(company_name: str, lead_tier: str, enrichment_data: str) -> str:
"""Save lead data to CRM (simulated)."""
import datetime
crm_record = {
"id": f"LEAD-{hash(company_name) % 100000:05d}",
"company": company_name,
"tier": lead_tier,
"created_at": datetime.datetime.utcnow().isoformat(),
"enrichment": json.loads(enrichment_data) if enrichment_data else {},
"status": "new"
}
# In production: POST to your CRM API
return json.dumps({"saved": True, "crm_id": crm_record["id"], "record": crm_record})
# --- LLM config with all 4 tools ---
llm_config = {
"config_list": [{"model": "gpt-4o", "api_key": "YOUR_KEY"}],
"functions": [
{
"name": "search_companies",
"description": "Search for companies in a given industry",
"parameters": {
"type": "object",
"properties": {
"industry": {"type": "string"},
"min_revenue": {"type": "string", "default": "1M"}
},
"required": ["industry"]
}
},
{
"name": "enrich_company_data",
"description": "Get detailed information about a specific company",
"parameters": {
"type": "object",
"properties": {"company_name": {"type": "string"}},
"required": ["company_name"]
}
},
{
"name": "score_lead",
"description": "Score a company as a sales lead based on company data",
"parameters": {
"type": "object",
"properties": {
"company_name": {"type": "string"},
"revenue": {"type": "string"},
"employees": {"type": "integer"},
"recent_funding": {"type": "string"}
},
"required": ["company_name", "revenue", "employees", "recent_funding"]
}
},
{
"name": "save_to_crm",
"description": "Save a scored lead to the CRM system",
"parameters": {
"type": "object",
"properties": {
"company_name": {"type": "string"},
"lead_tier": {"type": "string"},
"enrichment_data": {"type": "string"}
},
"required": ["company_name", "lead_tier"]
}
}
]
}
sales_agent = autogen.AssistantAgent(
name="SalesResearcher",
llm_config=llm_config,
function_map={
"search_companies": search_companies,
"enrich_company_data": enrich_company_data,
"score_lead": score_lead,
"save_to_crm": save_to_crm
},
system_message="You are a sales research agent. For each company found, enrich its data, score it as a lead, and save high-priority leads (tier A or B) to the CRM."
)
Pattern 5: Error-Handled Tool
Best for: Any production tool — external APIs, database operations, file system operations where failures need graceful handling.
# pattern_5_error_handled_tool.py
import autogen
import json
import requests
import logging
from functools import wraps
from typing import Any, Callable
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def resilient_tool(max_retries: int = 3, timeout: int = 30):
"""Decorator that adds retry logic and standardized error handling to tools."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> str:
last_error = None
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
logger.info(f"Tool {func.__name__} succeeded on attempt {attempt + 1}")
return result
except requests.Timeout:
last_error = f"Request timed out after {timeout}s"
logger.warning(f"Timeout on attempt {attempt + 1} for {func.__name__}")
except requests.HTTPError as e:
status_code = e.response.status_code if e.response else None
# Don't retry client errors (4xx) — they won't fix themselves
if status_code and 400 <= status_code < 500:
return json.dumps({
"success": False,
"error": f"Client error {status_code}: {str(e)}",
"retried": False
})
last_error = f"HTTP {status_code}: {str(e)}"
logger.warning(f"HTTP error on attempt {attempt + 1}: {last_error}")
except json.JSONDecodeError as e:
last_error = f"Invalid JSON response: {str(e)}"
logger.error(f"JSON decode error in {func.__name__}: {last_error}")
break # JSON errors won't fix on retry
except Exception as e:
last_error = f"Unexpected error: {str(e)}"
logger.error(f"Unexpected error in {func.__name__}: {last_error}")
# Exponential backoff between retries
if attempt < max_retries - 1:
import time
wait = 2 ** attempt
logger.info(f"Retrying in {wait}s...")
time.sleep(wait)
return json.dumps({
"success": False,
"error": last_error,
"attempts": max_retries,
"tool": func.__name__
})
return wrapper
return decorator
@resilient_tool(max_retries=3, timeout=15)
def call_external_api(endpoint: str, method: str = "GET", payload: str = None) -> str:
"""Call any external REST API with full error handling."""
payload_dict = json.loads(payload) if payload else None
response = requests.request(
method=method.upper(),
url=endpoint,
json=payload_dict,
timeout=15,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
try:
data = response.json()
except json.JSONDecodeError:
data = {"text": response.text[:2000]}
return json.dumps({
"success": True,
"status_code": response.status_code,
"data": data
})
@resilient_tool(max_retries=2)
def read_file_safe(file_path: str, encoding: str = "utf-8") -> str:
"""Read a file with error handling for missing files and encoding issues."""
import os
if not os.path.exists(file_path):
return json.dumps({"success": False, "error": f"File not found: {file_path}"})
if os.path.getsize(file_path) > 10 * 1024 * 1024: # 10MB limit
return json.dumps({"success": False, "error": "File exceeds 10MB size limit"})
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
return json.dumps({
"success": True,
"path": file_path,
"content": content,
"size_bytes": len(content)
})
Tool Integration Pattern Comparison Table
| Pattern | Use Case | Complexity | Error Handling | When to Use |
|---|---|---|---|---|
| Synchronous | API calls, DB queries | Low | Basic | Default choice for most tools |
| Asynchronous | Parallel I/O, high-latency APIs | Medium | Manual | Multiple concurrent API calls |
| Callback | Long pipelines, progress tracking | Medium | Via callback | Multi-step with status reporting |
| Chained | Multi-step workflows, ETL | Medium-High | Per tool | Sequential dependency chains |
| Error-Handled | Any production tool | Medium | Full retry/logging | Always in production |
Combining Patterns in Production
Real production agents combine multiple patterns. A typical architecture:
# production_agent_tools.py
# Each tool category uses the appropriate pattern
tools = {
# Fast lookups: Synchronous
"get_user_data": sync_user_lookup,
"check_inventory": sync_inventory_check,
# External APIs: Error-handled + sync
"call_payment_api": resilient_payment_api,
"send_notification": resilient_notification_sender,
# Heavy processing: Async + error-handled
"process_batch_data": async_batch_processor,
# Multi-step: Chained
"complete_order_flow": chained_order_pipeline,
}
For more complex architectures involving multiple coordinating agents with shared tool pools, see AI agent memory and planning and Build AI agent with LangChain.
FAQs
Can AutoGen tools call other AutoGen agents?
Yes — this is actually one of AutoGen's most powerful patterns. A tool function can instantiate or call another agent, enabling hierarchical agent chains where the outer agent delegates subtasks to inner agents via tool calls. Keep in mind that nested agents share your API budget, so set tight iteration limits on inner agents.
How do I pass context from the conversation into a tool call?
Tools receive only the arguments defined in their schema. To pass conversation context, include it as an argument in the tool's parameter schema, or use a closure that captures shared state. For persistent context, a shared memory object or external store works better than trying to pass everything through tool arguments.
What happens when an AutoGen tool throws an exception?
By default, AutoGen catches tool exceptions and passes the error message back to the agent as the tool result. The agent then decides how to respond — usually by retrying with different arguments or telling the user about the failure. You can customize this behavior by wrapping tools in explicit error handlers that return structured error objects.
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
5 AutoGen Agent Roles (Assistant, UserProxy, CodeExecutor)
Understand the 5 core AutoGen agent types — AssistantAgent, UserProxyAgent, CodeExecutorAgent, and more — with code examples and a comparison table for each role.
How to Deploy AutoGen Agents as APIs with FastAPI (2026)
Learn to serve AutoGen multi-agent systems as production REST APIs using FastAPI with async endpoints and real-time streaming responses.
How to Use AutoGen with Azure OpenAI (Enterprise Security)
Connect Microsoft AutoGen to Azure OpenAI for enterprise-grade AI agents. Step-by-step setup with private endpoints, OAI_CONFIG_LIST, and deployment config.
Build a Code Debugging Agent with AutoGen (Auto-Fix PRs)
Build an AutoGen agent that reviews code, analyzes PR diffs, suggests fixes, and automates code quality improvements with a full working implementation.