Follow AiTechWorlds on LinkedIn for professional AI content!Follow Now →
16 minLesson 18 of 23
Production Agents

Streaming Agent Output to Users

Streaming Agent Output

Nobody wants to stare at a spinner for 30 seconds while an agent works. Streaming shows users what's happening in real time — tokens as they're generated, tool calls as they happen, intermediate results as they arrive. This lesson covers how to implement streaming at every level of your agent stack.

Why Streaming Matters

Without streaming: user submits task → wait 30 seconds → full response appears. Users assume it's broken.

With streaming: typing indicator starts immediately → tokens appear as generated → tool calls visible → final answer complete. Users understand what's happening.

Streaming is especially important for agents because they take many steps and call multiple tools. Without visibility, a 60-second agent run feels like a failure.

Token-Level Streaming from the LLM

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", streaming=True)

# Synchronous streaming
for chunk in llm.stream("Write a 3-paragraph essay about AI agents"):
    print(chunk.content, end="", flush=True)

# Or with astream for async
import asyncio

async def stream_response():
    async for chunk in llm.astream("Write a poem"):
        print(chunk.content, end="", flush=True)
        
asyncio.run(stream_response())

Streaming from LangGraph Agents

LangGraph's .stream() method emits events at each step of the graph:

from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_community.tools import TavilySearchResults

llm = ChatOpenAI(model="gpt-4o", streaming=True)
tools = [TavilySearchResults(max_results=3)]
agent = create_react_agent(llm, tools)

# Stream mode: "values" — emit full state at each step
for event in agent.stream(
    {"messages": [("human", "Research the current state of AI agents in 2025")]},
    stream_mode="values"
):
    last_msg = event["messages"][-1]
    msg_type = last_msg.__class__.__name__
    
    if hasattr(last_msg, 'content') and last_msg.content:
        print(f"[{msg_type}]: {last_msg.content[:200]}")

# Stream mode: "updates" — emit only what changed at each step
for event in agent.stream(
    {"messages": [("human", "Research AI agents")]},
    stream_mode="updates"
):
    for node_name, node_output in event.items():
        print(f"\n[Node: {node_name}]")
        if "messages" in node_output:
            for msg in node_output["messages"]:
                print(f"  {msg.content[:100] if hasattr(msg, 'content') else str(msg)[:100]}")

Event-Based Streaming (Most Granular)

# stream_mode="events" gives you every event type
for event in agent.astream_events(
    {"messages": [("human", "What's the weather in Tokyo?")]},
    version="v2"
):
    event_type = event["event"]
    
    if event_type == "on_chat_model_stream":
        # LLM is generating tokens
        chunk = event["data"]["chunk"]
        if chunk.content:
            print(chunk.content, end="", flush=True)
    
    elif event_type == "on_tool_start":
        # A tool is being called
        tool_name = event["name"]
        tool_input = event["data"]["input"]
        print(f"\n🔧 Calling tool: {tool_name}")
        print(f"   Input: {tool_input}")
    
    elif event_type == "on_tool_end":
        # Tool finished
        tool_name = event["name"]
        print(f"   ✓ {tool_name} completed")
    
    elif event_type == "on_chain_end" and event["name"] == "LangGraph":
        # Entire agent finished
        print("\n\n✅ Agent completed")

Server-Sent Events for Web Applications

Stream agent output to a browser using SSE:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
import asyncio
import json

app = FastAPI()
llm = ChatOpenAI(model="gpt-4o", streaming=True)
agent = create_react_agent(llm, tools)

@app.get("/agent/stream")
async def stream_agent(task: str):
    async def event_generator():
        try:
            async for event in agent.astream_events(
                {"messages": [("human", task)]},
                version="v2"
            ):
                event_type = event["event"]
                
                if event_type == "on_chat_model_stream":
                    chunk = event["data"]["chunk"]
                    if chunk.content:
                        yield f"data: {json.dumps({'type': 'token', 'content': chunk.content})}\n\n"
                
                elif event_type == "on_tool_start":
                    yield f"data: {json.dumps({'type': 'tool_start', 'tool': event['name'], 'input': str(event['data'].get('input', ''))[:100]})}\n\n"
                
                elif event_type == "on_tool_end":
                    yield f"data: {json.dumps({'type': 'tool_end', 'tool': event['name']})}\n\n"
                
                elif event_type == "on_chain_end" and event["name"] == "LangGraph":
                    yield f"data: {json.dumps({'type': 'done'})}\n\n"
                    
        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # Disable Nginx buffering
        }
    )

Frontend: Consuming SSE in React

// React component that streams agent output
function AgentStream({ task }: { task: string }) {
    const [output, setOutput] = useState<string>("");
    const [toolCalls, setToolCalls] = useState<string[]>([]);
    const [isComplete, setIsComplete] = useState(false);
    
    useEffect(() => {
        const eventSource = new EventSource(
            `/agent/stream?task=${encodeURIComponent(task)}`
        );
        
        eventSource.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            if (data.type === "token") {
                setOutput(prev => prev + data.content);
            } else if (data.type === "tool_start") {
                setToolCalls(prev => [...prev, `🔧 ${data.tool}: ${data.input}`]);
            } else if (data.type === "done") {
                setIsComplete(true);
                eventSource.close();
            } else if (data.type === "error") {
                console.error("Agent error:", data.message);
                eventSource.close();
            }
        };
        
        return () => eventSource.close();
    }, [task]);
    
    return (
        <div>
            {toolCalls.length > 0 && (
                <div className="tool-calls">
                    {toolCalls.map((call, i) => <p key={i} className="text-gray-500 text-sm">{call}</p>)}
                </div>
            )}
            <div className="output">
                {output}
                {!isComplete && <span className="animate-pulse">▊</span>}
            </div>
        </div>
    );
}

Streaming Intermediate Results

For agents that produce structured intermediate results:

from langchain_core.callbacks import BaseCallbackHandler

class StreamingProgressCallback(BaseCallbackHandler):
    """Custom callback for tracking and streaming agent progress."""
    
    def __init__(self, progress_queue: asyncio.Queue):
        self.queue = progress_queue
    
    def on_tool_start(self, serialized, input_str, **kwargs):
        tool_name = serialized.get("name", "unknown")
        asyncio.create_task(
            self.queue.put({"type": "tool_start", "tool": tool_name, "input": input_str[:100]})
        )
    
    def on_tool_end(self, output, **kwargs):
        asyncio.create_task(
            self.queue.put({"type": "tool_end", "output": str(output)[:200]})
        )
    
    def on_llm_new_token(self, token, **kwargs):
        asyncio.create_task(
            self.queue.put({"type": "token", "content": token})
        )

Async Streaming Best Practices

Buffer for jitter: Tokens arrive in tiny bursts. Buffer 10-20 tokens before sending to avoid excessive HTTP round-trips:

async def buffered_token_stream(agent, task: str):
    buffer = []
    async for event in agent.astream_events({"messages": [("human", task)]}, version="v2"):
        if event["event"] == "on_chat_model_stream":
            buffer.append(event["data"]["chunk"].content)
            if len(buffer) >= 10:
                yield "".join(buffer)
                buffer = []
    if buffer:
        yield "".join(buffer)

Handle disconnections: SSE connections can drop. Implement reconnection with last-event-id:

headers = {"Last-Event-ID": "0"}  # Client sends this on reconnect
# Server uses this to resume from where it left off

Set appropriate timeouts: Long agent runs need long connection timeouts — configure your web server accordingly.

Next lesson: Agent error handling — building resilient agents that fail gracefully.

📱

Get this course's notes on Telegram!

Free cheat sheets, summaries & practice exercises

Get Notes Free →
!