7 LangChain Streaming Examples (Stream Tokens to UI)
Master LangChain streaming with 7 real examples: .stream(), .astream(), astream_events(), FastAPI SSE endpoints, and React token consumers for real-time AI output.
Get more content like this on Telegram!
Daily AI tips, notes & resources ā free
Waiting for a full LLM response before showing anything to the user is a solved problem. Every major AI product streams tokens to the UI as they arrive, creating the impression of a thinking, typing assistant rather than a black box that stalls for five seconds and then dumps a wall of text.
LangChain has first-class streaming support built into its LCEL (LangChain Expression Language) runtime. Every component in a chain can participate in streaming, and there are multiple APIs depending on what level of control you need. This guide covers all seven practical streaming patterns with working code, from the simplest synchronous case to a full React + FastAPI production setup.
Start with the LangChain tutorial 2025 if you are new to LCEL, and see Build AI agent with LangChain for how streaming fits into agent architectures.
Why Streaming Matters for User Experience
A 2024 study by Andreessen Horowitz found that perceived response quality correlates more strongly with time-to-first-token than total response time. Users rate a response that starts appearing in 0.5 seconds as better than an identical response that appears all at once after 3 seconds.
For a 200-token response at typical GPT-4o-mini speeds:
- Without streaming: ~2.5 seconds of nothing, then full text appears
- With streaming: first token at ~0.4 seconds, full text by ~2.5 seconds
The total time is the same. The perceived experience is completely different.
Example 1: Synchronous Streaming with .stream()
The simplest streaming API. Use this for scripts, CLI tools, or any synchronous context.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7, streaming=True)
prompt = ChatPromptTemplate.from_template(
"Write a short paragraph about {topic}."
)
chain = prompt | llm | StrOutputParser()
# Stream tokens to the console
print("Response: ", end="", flush=True)
for chunk in chain.stream({"topic": "the future of AI"}):
print(chunk, end="", flush=True)
print() # newline at end
Each chunk is a string containing one or more tokens. The end="" and flush=True arguments ensure tokens appear immediately without buffering.
Example 2: Async Streaming with .astream()
For any async application ā FastAPI, Starlette, Django ASGI ā use .astream() to avoid blocking the event loop:
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7, streaming=True)
prompt = ChatPromptTemplate.from_template(
"Explain {concept} in simple terms."
)
chain = prompt | llm | StrOutputParser()
async def stream_response(concept: str) -> str:
full_response = ""
async for chunk in chain.astream({"concept": concept}):
print(chunk, end="", flush=True)
full_response += chunk
print()
return full_response
asyncio.run(stream_response("quantum entanglement"))
The async version yields control back to the event loop between each token, allowing other requests to be handled concurrently ā critical in a web server context.
Example 3: Streaming with astream_events()
astream_events() gives you structured events from every component in the chain, not just the final output tokens. This lets you build UIs that show intermediate states.
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_template(
"Answer this question: {question}"
)
chain = prompt | llm
async def stream_with_events(question: str):
async for event in chain.astream_events(
{"question": question},
version="v2"
):
event_type = event["event"]
if event_type == "on_chat_model_start":
print(f"\n[LLM started]")
elif event_type == "on_chat_model_stream":
# This is where the actual tokens come through
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
elif event_type == "on_chat_model_end":
print(f"\n[LLM finished]")
elif event_type == "on_chain_start":
print(f"[Chain {event['name']} started]")
elif event_type == "on_chain_end":
print(f"[Chain {event['name']} finished]")
asyncio.run(stream_with_events("What is the speed of light?"))
The version="v2" parameter is required for the current event schema. The events give you the exact component name, timing information, and the data payload for each stage.
Example 4: Streaming a RAG Chain
Streaming through a RAG chain requires the retriever to finish before the LLM starts ā you cannot stream retrieval results, only the generation stage. LangChain handles this automatically:
import asyncio
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Chroma(
collection_name="docs",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_template("""
Answer based on the context below.
Context: {context}
Question: {question}
""")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
async def stream_rag_response(question: str):
print(f"Q: {question}\nA: ", end="", flush=True)
async for chunk in rag_chain.astream(question):
print(chunk, end="", flush=True)
print()
asyncio.run(stream_rag_response("How does the indexing API work?"))
The retrieval step blocks briefly (typically 50ā200ms), then the LLM tokens start streaming immediately.
Example 5: FastAPI SSE Endpoint
Server-Sent Events (SSE) is the standard protocol for streaming text from a server to a browser. FastAPI makes it straightforward to wrap a LangChain stream in an SSE response:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from pydantic import BaseModel
import json
app = FastAPI()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_template("Answer: {question}")
chain = prompt | llm | StrOutputParser()
class ChatRequest(BaseModel):
question: str
async def generate_sse_stream(question: str):
"""
Generator that yields SSE-formatted data events.
"""
try:
async for chunk in chain.astream({"question": question}):
# SSE format: "data: <content>\n\n"
data = json.dumps({"token": chunk, "done": False})
yield f"data: {data}\n\n"
# Signal completion
yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"
except Exception as e:
error_data = json.dumps({"error": str(e), "done": True})
yield f"data: {error_data}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
generate_sse_stream(request.question),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # Disable Nginx buffering
}
)
@app.get("/health")
async def health():
return {"status": "ok"}
The X-Accel-Buffering: no header is critical if you are running behind Nginx ā without it, Nginx buffers the entire response before sending it, defeating the purpose of streaming.
Example 6: React Consumer for SSE Streaming
Here is a minimal React component that connects to the FastAPI endpoint above and renders tokens as they arrive:
import { useState, useCallback } from "react";
function ChatInterface() {
const [question, setQuestion] = useState("");
const [response, setResponse] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async () => {
if (!question.trim() || isStreaming) return;
setResponse("");
setIsStreaming(true);
try {
const res = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question }),
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
if (!res.body) throw new Error("No response body");
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const lines = text.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
try {
const data = JSON.parse(line.slice(6));
if (data.done) {
setIsStreaming(false);
return;
}
if (data.token) {
setResponse((prev) => prev + data.token);
}
} catch {
// Skip malformed JSON lines
}
}
}
}
} catch (err) {
setResponse(`Error: ${err.message}`);
} finally {
setIsStreaming(false);
}
}, [question, isStreaming]);
return (
<div style={{ maxWidth: 700, margin: "40px auto", fontFamily: "sans-serif" }}>
<h2>LangChain Streaming Chat</h2>
<div style={{ display: "flex", gap: 8, marginBottom: 16 }}>
<input
value={question}
onChange={(e) => setQuestion(e.target.value)}
onKeyDown={(e) => e.key === "Enter" && sendMessage()}
placeholder="Ask anything..."
style={{ flex: 1, padding: 10, fontSize: 16 }}
/>
<button
onClick={sendMessage}
disabled={isStreaming}
style={{ padding: "10px 20px", fontSize: 16 }}
>
{isStreaming ? "Thinking..." : "Send"}
</button>
</div>
<div
style={{
minHeight: 200,
padding: 16,
background: "#f5f5f5",
borderRadius: 8,
whiteSpace: "pre-wrap",
lineHeight: 1.6
}}
>
{response || <span style={{ color: "#999" }}>Response appears here...</span>}
{isStreaming && <span style={{ animation: "blink 1s infinite" }}>ā</span>}
</div>
</div>
);
}
export default ChatInterface;
Example 7: Streaming with astream_events() for Multi-Step Progress UI
The most sophisticated pattern shows UI updates for every step in a complex pipeline ā retrieval status, tool calls, intermediate results:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
import json
app = FastAPI()
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Chroma(
collection_name="docs",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_template("""
Context: {context}
Question: {question}
""")
rag_chain = (
{
"context": retriever | (lambda docs: "\n\n".join(d.page_content for d in docs)),
"question": RunnablePassthrough()
}
| prompt
| llm
| StrOutputParser()
)
async def generate_rich_stream(question: str):
"""
Stream events with stage indicators for a progress-aware UI.
"""
async for event in rag_chain.astream_events(question, version="v2"):
event_type = event["event"]
name = event.get("name", "")
if event_type == "on_retriever_start":
msg = json.dumps({"type": "status", "message": "Searching documents..."})
yield f"data: {msg}\n\n"
elif event_type == "on_retriever_end":
docs = event["data"].get("output", [])
msg = json.dumps({
"type": "status",
"message": f"Found {len(docs)} relevant documents."
})
yield f"data: {msg}\n\n"
elif event_type == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
msg = json.dumps({"type": "token", "content": token})
yield f"data: {msg}\n\n"
elif event_type == "on_chain_end" and name == "RunnableSequence":
msg = json.dumps({"type": "done"})
yield f"data: {msg}\n\n"
@app.post("/chat/rich-stream")
async def chat_rich_stream(request: dict):
return StreamingResponse(
generate_rich_stream(request["question"]),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
Streaming API Comparison
| API | Sync/Async | Output type | Best for |
|---|---|---|---|
.stream() | Sync | String chunks | CLI tools, scripts |
.astream() | Async | String chunks | FastAPI, async servers |
.astream_events() | Async | Event objects | Multi-step progress UI |
| LangServe streaming | Async | String chunks | Auto-generated API |
| Callback streaming | Sync/Async | Callback invocations | Custom integrations |
Performance Tips
Use streaming=True when creating the LLM. Some older integrations require this flag explicitly even when you call .stream() or .astream(). Always set it to be safe.
Set appropriate chunk sizes. OpenAI streams roughly one token per chunk. Some providers send larger chunks. If you are accumulating chunks for display, consider debouncing your UI updates to avoid excessive re-renders.
Handle backpressure. If your client is consuming tokens slower than the LLM is producing them, tokens queue up in memory. For high-traffic applications, implement a queue with a maximum depth and drop connections that are too slow.
Test with network throttling. Your SSE implementation might look fine on localhost but break on a 4G connection with intermittent packet loss. Test with Chrome DevTools' network throttling before deploying.
For more on building agents that benefit from streaming, see AI agents explained and the OpenAI Assistants API guide.
Frequently Asked Questions
What is the difference between .stream() and .astream() in LangChain?
.stream() is synchronous ā it blocks the thread while yielding tokens, suitable for scripts and synchronous frameworks. .astream() is asynchronous ā it yields tokens without blocking the event loop, required for async frameworks like FastAPI, Starlette, or any application using asyncio. For web APIs, always use .astream().
How do I stream LangChain output to a React frontend?
Use FastAPI on the backend to serve a Server-Sent Events (SSE) endpoint that yields tokens from .astream(). On the React side, use the EventSource API or the fetch API with a ReadableStream reader to consume the token stream and append each token to your UI state as it arrives.
What does astream_events() return and when should I use it?
astream_events() returns a stream of structured event objects, not just tokens. Each event has a type (like on_chat_model_stream, on_retriever_end, on_chain_end) and metadata about which component emitted it. Use it when you need to show different UI states for different pipeline stages ā for example, showing "Searching documents..." while the retriever runs and then the streaming answer when the LLM starts.
Frequently Asked Questions
AiTechWorlds Team
ā Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
How to Deploy AutoGen Agents as APIs with FastAPI (2026)
Learn to serve AutoGen multi-agent systems as production REST APIs using FastAPI with async endpoints and real-time streaming responses.
AutoGen vs LangChain: Which for Multi-Agent Systems in 2026?
AutoGen vs LangChain for multi-agent systems in 2026 ā feature comparison, same use case in both frameworks, and an honest verdict on when each wins.
AutoGPT vs LangChain Agents: Which is More Autonomous?
Compare AutoGPT's zero-shot autonomy against LangChain's ReAct agents. Discover which handles complex tasks better and when to choose each framework.
10 LangChain Retrieval Strategies for Better RAG Results
Go beyond basic similarity search with ParentDocumentRetriever, MultiQueryRetriever, EnsembleRetriever, HyDE, and 6 more LangChain retrieval strategies ā with code for each.