How to Deploy a LangChain App as a FastAPI REST Endpoint
Serve a LangChain app as a production FastAPI REST endpoint with streaming, async chains, error handling, and Docker deployment — full Python code included.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
At some point, your LangChain prototype needs to stop living in a Jupyter notebook and start serving real HTTP requests. Whether you are building an internal tool, a customer-facing feature, or a backend microservice, the path from LangChain chain to production REST API runs through FastAPI.
FastAPI is the natural pairing for LangChain. Both are async-first, both use Python type hints extensively, and both have strong communities. The combination gives you a production-quality API in a few hundred lines of code.
This guide walks through a complete FastAPI deployment: basic endpoint, streaming response, async chain, middleware, error handling, and a Docker setup. Check Deploy AI model to production for infrastructure considerations once the app is built.
Why FastAPI for LangChain
LangChain chains are fully async-compatible — they implement both invoke() and ainvoke(), stream() and astream(). FastAPI runs on uvicorn with an async event loop, which means LangChain's async methods integrate without blocking the server.
Key benefits:
- Automatic docs — FastAPI generates Swagger UI from your Pydantic schemas
- Request validation — Input models are validated before your chain runs
- Streaming —
StreamingResponseworks cleanly with async generators - Type safety — Pydantic catches malformed requests early
- Performance — Async I/O means one worker handles many concurrent requests
A properly configured FastAPI + LangChain deployment can handle 50–200 concurrent streaming requests per worker, depending on the upstream LLM's throughput limits.
Setup
Install dependencies:
pip install langchain langchain-openai fastapi uvicorn pydantic python-dotenv
Project structure:
langchain_api/
├── main.py
├── chains.py
├── models.py
├── middleware.py
├── Dockerfile
├── requirements.txt
└── .env
.env file:
OPENAI_API_KEY=your-key-here
MODEL_NAME=gpt-4o-mini
MAX_TOKENS=1000
Step 1: Define Request and Response Models
Start with clean Pydantic models for request validation:
# models.py
from pydantic import BaseModel, Field
from typing import Optional
class ChatRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=4096, description="User message")
session_id: Optional[str] = Field(None, description="Session ID for conversation memory")
temperature: Optional[float] = Field(0.7, ge=0.0, le=2.0, description="LLM temperature")
stream: Optional[bool] = Field(False, description="Enable streaming response")
model_config = {
"json_schema_extra": {
"examples": [
{
"message": "Explain how RAG works in three sentences.",
"session_id": "user-123",
"temperature": 0.7,
"stream": False,
}
]
}
}
class ChatResponse(BaseModel):
answer: str
session_id: Optional[str]
tokens_used: Optional[int]
model: str
class RAGRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=2048)
collection: str = Field("default", description="Vector store collection to query")
k: int = Field(4, ge=1, le=20, description="Number of documents to retrieve")
class RAGResponse(BaseModel):
answer: str
sources: list[str]
question: str
Step 2: Build the Chains
Keep chain construction separate from your API layer:
# chains.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.messages import HumanMessage, AIMessage
import os
from dotenv import load_dotenv
load_dotenv()
def build_chat_chain(temperature: float = 0.7):
"""Build a simple conversational chain."""
llm = ChatOpenAI(
model=os.getenv("MODEL_NAME", "gpt-4o-mini"),
temperature=temperature,
max_tokens=int(os.getenv("MAX_TOKENS", 1000)),
streaming=True, # required for streaming endpoints
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant. Be concise and accurate."),
MessagesPlaceholder(variable_name="history"),
("human", "{message}"),
])
chain = prompt | llm | StrOutputParser()
return chain
def build_rag_chain(collection: str = "default", k: int = 4):
"""Build a RAG chain for a specific vector store collection."""
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
collection_name=collection,
embedding_function=embeddings,
persist_directory=f"./chroma_db/{collection}",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": k})
llm = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"), streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant. Answer the question using only the provided context.
If the context does not contain enough information, say so clearly.
Context:
{context}"""),
("human", "{question}"),
])
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
return rag_chain, retriever
# In-memory session storage (replace with Redis in production)
session_histories: dict[str, list] = {}
def get_session_history(session_id: str) -> list:
if session_id not in session_histories:
session_histories[session_id] = []
return session_histories[session_id]
def update_session_history(session_id: str, human_msg: str, ai_msg: str):
history = get_session_history(session_id)
history.append(HumanMessage(content=human_msg))
history.append(AIMessage(content=ai_msg))
# Keep only last 20 messages (10 turns)
if len(history) > 20:
session_histories[session_id] = history[-20:]
Step 3: Build the FastAPI Application
# main.py
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
import asyncio
import time
import logging
import os
from dotenv import load_dotenv
from models import ChatRequest, ChatResponse, RAGRequest, RAGResponse
from chains import build_chat_chain, build_rag_chain, get_session_history, update_session_history
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="LangChain API",
description="Production LangChain REST API with streaming support",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
# CORS for frontend access
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "https://yourfrontend.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Gzip compression for non-streaming responses
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
logger.info(
f"{request.method} {request.url.path} "
f"status={response.status_code} duration={duration:.3f}s"
)
return response
@app.get("/health")
async def health_check():
"""Health check endpoint for load balancers."""
return {"status": "healthy", "model": os.getenv("MODEL_NAME", "gpt-4o-mini")}
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Non-streaming chat endpoint with conversation memory."""
try:
chain = build_chat_chain(temperature=request.temperature or 0.7)
history = get_session_history(request.session_id or "default")
response = await chain.ainvoke({
"message": request.message,
"history": history,
})
if request.session_id:
update_session_history(request.session_id, request.message, response)
return ChatResponse(
answer=response,
session_id=request.session_id,
tokens_used=None, # populate from callbacks if needed
model=os.getenv("MODEL_NAME", "gpt-4o-mini"),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Chat endpoint error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""Streaming chat endpoint using Server-Sent Events."""
chain = build_chat_chain(temperature=request.temperature or 0.7)
history = get_session_history(request.session_id or "default")
async def token_generator():
full_response = []
try:
async for chunk in chain.astream({
"message": request.message,
"history": history,
}):
if chunk:
full_response.append(chunk)
# SSE format
yield f"data: {chunk}\n\n"
# Update history after streaming completes
if request.session_id:
update_session_history(
request.session_id,
request.message,
"".join(full_response)
)
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Streaming error: {e}")
yield f"data: [ERROR] {str(e)}\n\n"
return StreamingResponse(
token_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # disable nginx buffering
},
)
@app.post("/rag", response_model=RAGResponse)
async def rag_query(request: RAGRequest):
"""RAG endpoint that retrieves documents and generates an answer."""
try:
rag_chain, retriever = build_rag_chain(
collection=request.collection,
k=request.k
)
# Get source documents for citation
source_docs = await retriever.ainvoke(request.question)
sources = [
doc.metadata.get("source", "Unknown")
for doc in source_docs
]
answer = await rag_chain.ainvoke(request.question)
return RAGResponse(
answer=answer,
sources=list(set(sources)), # deduplicate sources
question=request.question,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"RAG endpoint error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/rag/stream")
async def rag_stream(request: RAGRequest):
"""Streaming RAG endpoint."""
rag_chain, _ = build_rag_chain(collection=request.collection, k=request.k)
async def rag_token_generator():
try:
async for chunk in rag_chain.astream(request.question):
if chunk:
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"RAG streaming error: {e}")
yield f"data: [ERROR] {str(e)}\n\n"
return StreamingResponse(
rag_token_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.delete("/sessions/{session_id}")
async def clear_session(session_id: str):
"""Clear conversation history for a session."""
from chains import session_histories
if session_id in session_histories:
del session_histories[session_id]
return {"message": f"Session {session_id} cleared"}
raise HTTPException(status_code=404, detail="Session not found")
Step 4: Run the Application
# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# Production with multiple workers
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop
Visit http://localhost:8000/docs for the interactive Swagger UI.
Step 5: Test the Endpoints
Test with Python's requests library:
import requests
import json
BASE_URL = "http://localhost:8000"
# Test basic chat
response = requests.post(
f"{BASE_URL}/chat",
json={
"message": "What are the main components of LangChain?",
"session_id": "test-session-1",
"temperature": 0.7,
}
)
print(response.json())
# Test streaming with requests
def stream_chat(message: str, session_id: str):
with requests.post(
f"{BASE_URL}/chat/stream",
json={"message": message, "session_id": session_id, "stream": True},
stream=True,
) as response:
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
print(data, end="", flush=True)
print() # newline after stream ends
stream_chat("Explain vector embeddings step by step.", "test-session-1")
# Test RAG endpoint
rag_response = requests.post(
f"{BASE_URL}/rag",
json={
"question": "How does retrieval augmented generation work?",
"collection": "default",
"k": 4,
}
)
print(json.dumps(rag_response.json(), indent=2))
Step 6: Rate Limiting and Auth Middleware
Add rate limiting and API key auth for production:
# middleware.py
from fastapi import HTTPException, Security, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from collections import defaultdict
import time
import os
security = HTTPBearer()
# Simple in-memory rate limiter (use Redis in production)
rate_limit_store: dict[str, list[float]] = defaultdict(list)
RATE_LIMIT_REQUESTS = 20
RATE_LIMIT_WINDOW = 60 # seconds
def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Verify Bearer token API key."""
valid_keys = os.getenv("API_KEYS", "").split(",")
if credentials.credentials not in valid_keys:
raise HTTPException(status_code=401, detail="Invalid API key")
return credentials.credentials
def check_rate_limit(request: Request):
"""Simple rate limiter by client IP."""
client_ip = request.client.host
now = time.time()
window_start = now - RATE_LIMIT_WINDOW
# Clean old entries
rate_limit_store[client_ip] = [
t for t in rate_limit_store[client_ip] if t > window_start
]
if len(rate_limit_store[client_ip]) >= RATE_LIMIT_REQUESTS:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Max {RATE_LIMIT_REQUESTS} requests per minute.",
)
rate_limit_store[client_ip].append(now)
Apply to endpoints that need protection:
from fastapi import Depends
from middleware import verify_api_key, check_rate_limit
@app.post("/chat", response_model=ChatResponse, dependencies=[Depends(check_rate_limit)])
async def chat(request: ChatRequest, api_key: str = Depends(verify_api_key)):
# ... endpoint implementation
Step 7: Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies first for layer caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN adduser --disabled-password --gecos "" appuser
USER appuser
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
requirements.txt:
langchain==0.3.0
langchain-openai==0.2.0
langchain-community==0.3.0
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.0
python-dotenv==1.0.0
chromadb==0.5.0
Build and run:
docker build -t langchain-api .
docker run -p 8000:8000 --env-file .env langchain-api
Docker Compose for local development with Redis for session storage:
# docker-compose.yml
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- MODEL_NAME=gpt-4o-mini
- REDIS_URL=redis://redis:6379
depends_on:
- redis
volumes:
- ./chroma_db:/app/chroma_db
redis:
image: redis:7-alpine
ports:
- "6379:6379"
Comparison Table: Serving Options for LangChain
| Method | Setup Time | Streaming | Auth/Rate Limiting | Customization | Best For |
|---|---|---|---|---|---|
| FastAPI (custom) | Medium | Full control | Full control | Maximum | Production APIs |
| LangServe | Very fast | Built-in | Limited | Low | Prototypes |
| LangChain + Flask | Medium | Manual | Manual | High | Simple REST APIs |
| Modal / Fly.io | Fast | Depends | Managed | Medium | Serverless deploy |
| AWS Lambda | Slow setup | Limited | AWS IAM | Medium | Event-driven |
Async Chain Patterns
Understanding async patterns prevents the most common performance mistakes:
# Pattern 1: Simple async invocation
result = await chain.ainvoke({"message": "Hello"})
# Pattern 2: Parallel invocations (run multiple chains concurrently)
import asyncio
async def run_parallel_queries(queries: list[str]) -> list[str]:
chain = build_chat_chain()
tasks = [chain.ainvoke({"message": q, "history": []}) for q in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if not isinstance(r, Exception) else f"Error: {r}" for r in results]
# Pattern 3: Timeout handling
async def chat_with_timeout(message: str, timeout: float = 30.0) -> str:
chain = build_chat_chain()
try:
result = await asyncio.wait_for(
chain.ainvoke({"message": message, "history": []}),
timeout=timeout
)
return result
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Chain execution timed out")
Connecting to OpenAI API Integration
The streaming endpoint above uses OpenAI's streaming API under the hood. LangChain's streaming=True on the ChatOpenAI object activates token-by-token streaming from the OpenAI API, which FastAPI then forwards to the client.
For agents instead of chains, the pattern is nearly identical — swap chain.astream() for agent.astream_events() and parse the event types in your generator. See Build AI agent with LangChain for agent construction, then drop the agent into any of the endpoints above.
Production Checklist
Before pushing to production:
- Replace in-memory session storage with Redis using
langchain_community.chat_message_histories.RedisChatMessageHistory - Add structured logging with correlation IDs for request tracing
- Set
OPENAI_MAX_RETRIESand configure exponential backoff - Add a
/metricsendpoint for Prometheus scraping - Configure health checks in your load balancer pointing to
/health - Set
workers = (2 * cpu_count) + 1in your uvicorn config
For full deployment infrastructure guidance, see Deploy AI model to production.
Frequently Asked Questions
Can I stream LangChain responses through FastAPI?
Yes. Use FastAPI's StreamingResponse with an async generator that yields tokens from the LangChain chain's astream() method. Set media_type to text/event-stream for Server-Sent Events, which most frontend frameworks support natively.
How do I handle LangChain errors in a FastAPI endpoint?
Wrap chain invocations in try/except blocks and raise HTTPException with appropriate status codes. Catch ValueError for invalid inputs (400), timeout errors (504), and use a broad Exception handler for unexpected failures (500). Log the full traceback for the 500 errors to aid debugging.
Is LangServe better than building my own FastAPI endpoint?
LangServe is faster to set up and provides a built-in playground UI, but building your own FastAPI endpoint gives you full control over auth, rate limiting, request validation, custom response formats, and middleware. For a prototype, LangServe. For a production service with specific requirements, custom FastAPI is almost always worth the extra hour of setup.
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
How to Deploy AutoGen Agents as APIs with FastAPI (2026)
Learn to serve AutoGen multi-agent systems as production REST APIs using FastAPI with async endpoints and real-time streaming responses.
AutoGen vs LangChain: Which for Multi-Agent Systems in 2026?
AutoGen vs LangChain for multi-agent systems in 2026 — feature comparison, same use case in both frameworks, and an honest verdict on when each wins.
How to Run AutoGPT on a VPS for 24/7 Autonomous Operation
Deploy AutoGPT on a VPS for round-the-clock operation. Covers VPS selection, systemd setup, tmux persistence, monitoring, and cost comparison across providers.
AutoGPT vs LangChain Agents: Which is More Autonomous?
Compare AutoGPT's zero-shot autonomy against LangChain's ReAct agents. Discover which handles complex tasks better and when to choose each framework.