24 minLesson 20 of 23
Production Agents
Deploying Agents with FastAPI
Deploying Agents with FastAPI
Building an agent is only half the work. Deploying it as a reliable, scalable API that other services or users can call is the other half. This lesson covers a production-quality FastAPI deployment for your agents.
Project Structure for Deployment
agent-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── research.py # Agent definitions
│ │ └── base.py
│ ├── api/
│ │ ├── routes.py # API endpoints
│ │ └── models.py # Pydantic request/response models
│ ├── core/
│ │ ├── config.py # Settings
│ │ └── logging.py # Logging setup
│ └── middleware/
│ └── auth.py # API key authentication
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── .env
The FastAPI Application
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.api.routes import router
from app.core.config import settings
from app.core.logging import setup_logging
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize resources on startup, clean up on shutdown."""
setup_logging()
# Pre-load models/indexes that are expensive to initialize
from app.agents.research import initialize_agent
app.state.research_agent = await initialize_agent()
yield
# Cleanup
# await cleanup_resources()
app = FastAPI(
title="AI Agent API",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(router, prefix="/api/v1")
Request and Response Models
# app/api/models.py
from pydantic import BaseModel, Field
from typing import Optional, List
from uuid import UUID, uuid4
class AgentRequest(BaseModel):
task: str = Field(..., min_length=1, max_length=5000)
session_id: Optional[str] = None # For multi-turn conversations
stream: bool = False # Whether to stream the response
class ToolCallInfo(BaseModel):
tool_name: str
input: str
output: str
class AgentResponse(BaseModel):
request_id: str = Field(default_factory=lambda: str(uuid4()))
answer: str
sources: List[str] = []
tool_calls: List[ToolCallInfo] = []
duration_ms: int
session_id: Optional[str] = None
class ErrorResponse(BaseModel):
error: str
error_type: str
request_id: str
API Routes
# app/api/routes.py
from fastapi import APIRouter, Request, HTTPException, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from app.api.models import AgentRequest, AgentResponse
from app.middleware.auth import verify_api_key
import time
import json
router = APIRouter()
@router.post("/agent/run", response_model=AgentResponse)
async def run_agent(
request: AgentRequest,
http_request: Request,
api_key: str = Depends(verify_api_key)
):
"""Run the agent and return when complete."""
start_time = time.time()
agent = http_request.app.state.research_agent
try:
config = {}
if request.session_id:
config["configurable"] = {"thread_id": request.session_id}
result = agent.run(request.task, config=config)
return AgentResponse(
answer=result["answer"],
sources=result.get("sources", []),
tool_calls=result.get("tool_calls", []),
duration_ms=int((time.time() - start_time) * 1000),
session_id=request.session_id
)
except Exception as e:
raise HTTPException(
status_code=500,
detail={"error": str(e), "error_type": type(e).__name__}
)
@router.post("/agent/stream")
async def stream_agent(
request: AgentRequest,
http_request: Request,
api_key: str = Depends(verify_api_key)
):
"""Stream agent output as Server-Sent Events."""
agent = http_request.app.state.research_agent
async def generate():
try:
async for event in agent.stream_events(request.task):
yield f"data: {json.dumps(event)}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no"}
)
@router.get("/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
API Key Authentication
# app/middleware/auth.py
from fastapi import HTTPException, Security
from fastapi.security import APIKeyHeader
import hashlib
import os
api_key_header = APIKeyHeader(name="X-API-Key")
# In production, store hashed keys in database
VALID_API_KEYS = {
hashlib.sha256(key.encode()).hexdigest()
for key in os.environ.get("API_KEYS", "").split(",")
if key
}
def verify_api_key(api_key: str = Security(api_key_header)) -> str:
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
if key_hash not in VALID_API_KEYS:
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi import Request
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@router.post("/agent/run")
@limiter.limit("10/minute") # 10 requests per minute per IP
async def run_agent(request: Request, ...):
...
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy app
COPY . .
# Run with gunicorn + uvicorn workers for production
CMD ["gunicorn", "app.main:app",
"-w", "2",
"-k", "uvicorn.workers.UvicornWorker",
"--bind", "0.0.0.0:8000",
"--timeout", "300",
"--graceful-timeout", "30"]
Background Tasks for Long-Running Agents
For agents that might take several minutes:
from fastapi import BackgroundTasks
import uuid
from redis import Redis
redis = Redis.from_url(os.environ["REDIS_URL"])
@router.post("/agent/async")
async def start_async_agent(request: AgentRequest, background_tasks: BackgroundTasks):
"""Start agent asynchronously, return job ID for polling."""
job_id = str(uuid.uuid4())
background_tasks.add_task(
run_agent_background,
job_id=job_id,
task=request.task,
redis=redis
)
return {"job_id": job_id, "status": "queued"}
async def run_agent_background(job_id: str, task: str, redis: Redis):
redis.set(f"job:{job_id}:status", "running")
try:
result = agent.run(task)
redis.set(f"job:{job_id}:status", "complete")
redis.set(f"job:{job_id}:result", json.dumps(result), ex=3600)
except Exception as e:
redis.set(f"job:{job_id}:status", "failed")
redis.set(f"job:{job_id}:error", str(e), ex=3600)
@router.get("/agent/async/{job_id}")
async def get_job_status(job_id: str):
status = redis.get(f"job:{job_id}:status")
if not status:
raise HTTPException(status_code=404, detail="Job not found")
response = {"job_id": job_id, "status": status.decode()}
if status == b"complete":
result = redis.get(f"job:{job_id}:result")
response["result"] = json.loads(result)
elif status == b"failed":
error = redis.get(f"job:{job_id}:error")
response["error"] = error.decode() if error else "Unknown error"
return response
Deploy to Railway or Render
Railway (simplest):
# Install Railway CLI
npm install -g @railway/cli
# Login and deploy
railway login
railway init
railway up
Set environment variables in Railway dashboard. It auto-detects your Dockerfile.
Render:
- Connect your GitHub repository
- Select "Web Service"
- Set Build Command:
pip install -r requirements.txt - Start Command:
uvicorn app.main:app --host 0.0.0.0 --port $PORT - Add environment variables
Next lesson: Project — building a research agent that autonomously gathers, synthesizes, and reports.
📱
Get Notes Free →Get this course's notes on Telegram!
Free cheat sheets, summaries & practice exercises