Build a LangChain Multi-User Chatbot with Session Management
Build a production LangChain chatbot that handles multiple users simultaneously with isolated sessions, Redis-backed memory, and FastAPI async endpoints.
Get more content like this on Telegram!
Daily AI tips, notes & resources ā free
Build a LangChain Multi-User Chatbot with Session Management
The single-user LangChain chatbot is a tutorial staple. You create one ConversationBufferMemory, one chain, run a loop, done. Then you deploy it and realize that every user is sharing the same memory, reading each other's conversation history, and the whole thing is a disaster.
Multi-user session management is the gap between "tutorial project" and "production application." It's not complicated once you see the pattern, but it's rarely explained well.
This guide covers session isolation, Redis-backed persistence, FastAPI async endpoints, and the scaling considerations that matter once real users show up.
The Core Problem: Shared vs Isolated Memory
Here's what goes wrong with naive multi-user setups:
# WRONG: Global memory ā all users share the same history
memory = ConversationBufferMemory()
chain = ConversationChain(llm=llm, memory=memory)
# User A asks something
chain.run("My name is Alice")
# User B asks something
chain.run("Who am I?")
# Output: "You are Alice" ā because they share memory!
The fix: create separate memory instances per user/session:
# RIGHT: Per-user memory
memory_store: dict[str, ConversationBufferMemory] = {}
def get_memory(session_id: str) -> ConversationBufferMemory:
if session_id not in memory_store:
memory_store[session_id] = ConversationBufferMemory(return_messages=True)
return memory_store[session_id]
This works for simple cases but loses all history on server restart. Let's build it properly.
Session Management with RunnableWithMessageHistory
LangChain's RunnableWithMessageHistory is the clean abstraction for this:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Be concise and friendly."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
])
chain = prompt | llm
# In-memory store (replace with Redis for production)
session_store: dict[str, ChatMessageHistory] = {}
def get_session_history(session_id: str) -> ChatMessageHistory:
if session_id not in session_store:
session_store[session_id] = ChatMessageHistory()
return session_store[session_id]
# Wrap chain with session history management
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
# Usage ā each user gets isolated memory via session_id
response_alice = chain_with_history.invoke(
{"input": "My name is Alice"},
config={"configurable": {"session_id": "user-alice-123"}}
)
response_bob = chain_with_history.invoke(
{"input": "My name is Bob"},
config={"configurable": {"session_id": "user-bob-456"}}
)
# Alice's follow-up
response = chain_with_history.invoke(
{"input": "What's my name?"},
config={"configurable": {"session_id": "user-alice-123"}}
)
print(response.content) # "Your name is Alice."
Redis-Backed Session Storage
In-memory dicts vanish on server restart. Redis is the standard solution:
pip install redis langchain-community
from langchain_community.chat_message_histories import RedisChatMessageHistory
import os
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
def get_redis_session_history(session_id: str) -> RedisChatMessageHistory:
return RedisChatMessageHistory(
session_id=session_id,
url=REDIS_URL,
ttl=86400, # Sessions expire after 24 hours of inactivity
key_prefix="chat_history:" # Namespace keys to avoid conflicts
)
chain_with_history = RunnableWithMessageHistory(
chain,
get_redis_session_history,
input_messages_key="input",
history_messages_key="history",
)
Redis stores each session's messages as a JSON list under a key like chat_history:user-alice-123. The TTL ensures stale sessions clean up automatically.
FastAPI Multi-User Chatbot
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uuid
app = FastAPI(title="LangChain Multi-User Chatbot")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
response: str
session_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
# Create new session if not provided
session_id = request.session_id or str(uuid.uuid4())
try:
response = await chain_with_history.ainvoke(
{"input": request.message},
config={"configurable": {"session_id": session_id}}
)
return ChatResponse(
response=response.content,
session_id=session_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/history/{session_id}")
async def get_history(session_id: str):
history = get_redis_session_history(session_id)
messages = history.messages
return {
"session_id": session_id,
"message_count": len(messages),
"messages": [
{"role": msg.type, "content": msg.content}
for msg in messages
]
}
@app.delete("/history/{session_id}")
async def clear_history(session_id: str):
history = get_redis_session_history(session_id)
history.clear()
return {"message": f"Session {session_id} cleared"}
Streaming Responses for Better UX
Nobody wants to stare at a loading spinner for 3 seconds. Stream the response as tokens arrive:
from fastapi.responses import StreamingResponse
import asyncio
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
session_id = request.session_id or str(uuid.uuid4())
async def generate():
# Yield session_id first so client knows it
yield f"data: {{\"session_id\": \"{session_id}\"}}\n\n"
async for chunk in chain_with_history.astream(
{"input": request.message},
config={"configurable": {"session_id": session_id}}
):
if hasattr(chunk, "content") and chunk.content:
# Escape the content for SSE
content = chunk.content.replace("\n", "\\n")
yield f"data: {{\"token\": \"{content}\"}}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
Client-side JavaScript to consume the stream:
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({message: userInput, session_id: currentSessionId})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buffer += decoder.decode(value, {stream: true});
const lines = buffer.split('\n\n');
buffer = lines.pop();
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.token) appendToUI(data.token);
if (data.session_id) currentSessionId = data.session_id;
}
}
}
Handling Rate Limits and Errors
With multiple concurrent users, you'll hit OpenAI rate limits. Handle them gracefully:
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def invoke_with_retry(message: str, session_id: str) -> str:
response = await chain_with_history.ainvoke(
{"input": message},
config={"configurable": {"session_id": session_id}}
)
return response.content
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
session_id = request.session_id or str(uuid.uuid4())
try:
content = await invoke_with_retry(request.message, session_id)
return ChatResponse(response=content, session_id=session_id)
except Exception as e:
raise HTTPException(status_code=503, detail="AI service temporarily unavailable")
Session Storage Comparison
| Storage | Persistence | Latency | TTL Support | Scale | Best For |
|---|---|---|---|---|---|
| In-memory dict | No (restarts wipe it) | Microseconds | Manual | Single server | Development |
| Redis | Yes (with RDB/AOF) | Milliseconds | Built-in | Horizontal | Production |
| PostgreSQL | Yes | Low tens of ms | Manual/pg_cron | Horizontal | Audit logs |
| DynamoDB | Yes | Low tens of ms | Built-in | Infinite | AWS-native |
| SQLite | Yes (local file) | Low | Manual | Single server | Small apps |
The Vector database guide covers storage tradeoffs in more depth, including the case for using the same Redis instance for both session storage and semantic caching.
Production Checklist
Before going live with a multi-user chatbot:
ā
Sessions isolated by user ID or conversation ID
ā
Redis (or equivalent) for persistent message history
ā
TTL on sessions to prevent unbounded storage growth
ā
Async FastAPI endpoints with ainvoke/astream
ā
Rate limiting per user (requests per minute)
ā
Error handling with retries for LLM API failures
ā
Session deletion endpoint (GDPR compliance)
ā
Message count limit per session (prevent context overflow)
ā
Input sanitization (strip extra whitespace, limit length)
ā
Logging per session for debugging
For the memory management patterns behind session design, see AI agent memory and planning. If you want to add tools to this chatbot (web search, database queries), Build AI agent with LangChain covers the LangGraph patterns that work well with the session architecture here.
The Deploy AI model to production guide covers Docker containerization and cloud deployment for when you're ready to ship this.
Conclusion
Multi-user session management boils down to three things: use RunnableWithMessageHistory with a session_id, back your history store with Redis so sessions survive restarts, and write async endpoints so users don't block each other.
The in-memory prototype is fine for learning, but don't let it get anywhere near production. Two users hitting the same endpoint at the same time with shared memory is a debugging nightmare you can avoid entirely.
Start with the Redis-backed version from the beginning. The setup takes maybe 15 minutes, and you'll never have to migrate a production system away from in-memory storage at an inconvenient moment.
For a complete production-ready chatbot example with authentication, rate limiting, and monitoring, the Build AI chatbot Python guide walks through the full stack.
Frequently Asked Questions
How does LangChain isolate memory between different users? LangChain's RunnableWithMessageHistory takes a session_id parameter (a string) and uses a get_session_history function to retrieve the right history store for that session. By mapping session_id to a user's ID or conversation thread, each user gets completely separate memory. The key is never reusing session IDs across users.
What's the best database for storing LangChain chat history in production? Redis is the most popular choice for active sessions ā it's fast, supports TTL for automatic expiry, and scales well. For long-term history storage (audit logs, analytics), PostgreSQL is better. Many production systems use both: Redis for active sessions (24-48 hour TTL) and PostgreSQL for permanent archive.
How many concurrent users can a FastAPI + LangChain chatbot handle? With async FastAPI and async LangChain (ainvoke), a single server can handle hundreds of concurrent users ā the bottleneck is usually the LLM API's rate limits and your server's memory. For OpenAI, the default limit is 3,500 RPM for gpt-4o-mini. Use Redis to cache common responses and exponential backoff for rate limit handling.
Frequently Asked Questions
AiTechWorlds Team
ā Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
5 AutoGen Agent Roles (Assistant, UserProxy, CodeExecutor)
Understand the 5 core AutoGen agent types ā AssistantAgent, UserProxyAgent, CodeExecutorAgent, and more ā with code examples and a comparison table for each role.
How to Deploy AutoGen Agents as APIs with FastAPI (2026)
Learn to serve AutoGen multi-agent systems as production REST APIs using FastAPI with async endpoints and real-time streaming responses.
How to Use AutoGen with Azure OpenAI (Enterprise Security)
Connect Microsoft AutoGen to Azure OpenAI for enterprise-grade AI agents. Step-by-step setup with private endpoints, OAI_CONFIG_LIST, and deployment config.
Build a Code Debugging Agent with AutoGen (Auto-Fix PRs)
Build an AutoGen agent that reviews code, analyzes PR diffs, suggests fixes, and automates code quality improvements with a full working implementation.