5 LangChain Checkpointers: Save and Load Agent State (2026)
Learn how to persist and restore LangChain agent state using InMemoryCheckpointer, SqliteSaver, and PostgresSaver with full Python code examples.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
State persistence is one of those features that separates a toy agent from a real product. You build a multi-step research assistant, it works beautifully in your notebook, and then your user closes the browser tab. All that progress — the queries it ran, the documents it read, the plan it was executing — gone. LangGraph checkpointers solve this problem by giving you a clean API to save and restore agent state at every execution step.
This guide walks through five checkpointer backends available in 2026, shows you exactly how to wire them up, and gives you a comparison table so you can pick the right one for your situation without guessing.
Why Checkpointing Matters for Production Agents
Before writing any code, it is worth being concrete about what "state" means in this context. In a LangGraph graph, state is the entire dictionary that flows between nodes. For a typical AI agent memory and planning setup this includes:
- The conversation message history
- Tool call results accumulated so far
- Custom fields like
current_plan,retrieved_docs, oruser_profile - Metadata like iteration count and timestamps
Without checkpointing, all of this lives in RAM. That is fine for a single HTTP request, but agents that run for minutes or hours — autonomous research agents, document processing pipelines, multi-turn customer interactions — need their progress saved persistently.
LangGraph's checkpointing API intercepts state after every node execution and serializes it to your chosen backend. When the graph resumes (new HTTP request, same thread ID), it deserializes the last checkpoint and continues from exactly where it stopped.
A 2024 benchmark from Weights and Biases found that agents with persistent state completed 38% more long-horizon tasks successfully compared to stateless equivalents. The difference is the ability to retry individual steps without re-running everything from scratch.
Setting Up the Environment
pip install langgraph langchain-core langchain-openai
pip install aiosqlite # for SqliteSaver async
pip install psycopg[binary,pool] # for PostgresSaver
pip install langgraph-checkpoint-postgres
All examples below use this minimal graph to keep the focus on the checkpointer, not the agent logic.
from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
step_count: int
llm = ChatOpenAI(model="gpt-4o-mini")
def chat_node(state: AgentState) -> AgentState:
response = llm.invoke(state["messages"])
return {
"messages": [response],
"step_count": state.get("step_count", 0) + 1
}
def build_graph(checkpointer):
builder = StateGraph(AgentState)
builder.add_node("chat", chat_node)
builder.set_entry_point("chat")
builder.add_edge("chat", END)
return builder.compile(checkpointer=checkpointer)
Checkpointer 1: MemorySaver
MemorySaver stores checkpoints in a Python dictionary inside the current process. It requires zero setup and is perfect for unit tests and interactive prototyping.
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = build_graph(checkpointer)
config = {"configurable": {"thread_id": "user-session-001"}}
# First turn
result = graph.invoke(
{"messages": [HumanMessage(content="Hello, what is LangGraph?")],
"step_count": 0},
config=config
)
print(result["messages"][-1].content)
# Second turn — graph reads previous state from MemorySaver automatically
result = graph.invoke(
{"messages": [HumanMessage(content="Can you give me a code example?")]},
config=config
)
print(result["messages"][-1].content)
print(f"Total steps taken: {result['step_count']}")
The thread_id is the key concept. Every unique thread_id is an isolated conversation. The same graph instance handles thousands of parallel threads, each with completely separate state.
Limitation: State disappears when the process exits. Do not use MemorySaver in any deployed service.
Checkpointer 2: SqliteSaver
SqliteSaver persists checkpoints to a SQLite database file. It survives process restarts and is the easiest step up from MemorySaver for small applications or local tools.
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
conn = sqlite3.connect("agent_state.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
graph = build_graph(checkpointer)
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke(
{"messages": [HumanMessage(content="Summarize machine learning in 3 bullets.")],
"step_count": 0},
config=config
)
# Restart your process — state is still here
# graph.invoke picks up from the last checkpoint for the same thread_id
For async FastAPI endpoints, use the async variant:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
import aiosqlite
async def run_async_agent():
async with AsyncSqliteSaver.from_conn_string("agent_state.db") as checkpointer:
graph = build_graph(checkpointer)
config = {"configurable": {"thread_id": "async-user-456"}}
result = await graph.ainvoke(
{"messages": [HumanMessage(content="What is async programming?")],
"step_count": 0},
config=config
)
return result
import asyncio
asyncio.run(run_async_agent())
Inspecting saved state without replaying the graph:
# List all checkpoints for a thread
history = list(graph.get_state_history(config))
for checkpoint in history:
print(f"Step: {checkpoint.values.get('step_count')}, "
f"Messages: {len(checkpoint.values.get('messages', []))}, "
f"ID: {checkpoint.config['configurable']['checkpoint_id']}")
Checkpointer 3: PostgresSaver
For production multi-worker deployments, PostgresSaver is the right choice. It uses row-level locking so multiple workers can safely process different threads concurrently.
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
DB_URI = "postgresql://user:password@localhost:5432/agent_db"
pool = ConnectionPool(conninfo=DB_URI, max_size=20)
checkpointer = PostgresSaver(pool)
# Run setup once to create required tables
checkpointer.setup()
graph = build_graph(checkpointer)
config = {"configurable": {"thread_id": "prod-user-789"}}
result = graph.invoke(
{"messages": [HumanMessage(content="Analyze the pros and cons of microservices.")],
"step_count": 0},
config=config
)
print(result["messages"][-1].content)
For async usage with FastAPI:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool
from fastapi import FastAPI
app = FastAPI()
agent_graph = None
async def setup_prod_graph():
pool = AsyncConnectionPool(conninfo=DB_URI, max_size=20)
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
return build_graph(checkpointer)
@app.on_event("startup")
async def startup():
global agent_graph
agent_graph = await setup_prod_graph()
@app.post("/chat/{thread_id}")
async def chat(thread_id: str, message: str):
config = {"configurable": {"thread_id": thread_id}}
result = await agent_graph.ainvoke(
{"messages": [HumanMessage(content=message)], "step_count": 0},
config=config
)
return {"response": result["messages"][-1].content,
"steps": result["step_count"]}
Rolling back to a previous checkpoint — a powerful debugging and safety feature:
# Get all checkpoints for a thread
history = list(graph.get_state_history(config))
# Pick the third-most-recent checkpoint
target_checkpoint = history[2]
rollback_config = target_checkpoint.config
# Resume from that exact point
result = graph.invoke(
{"messages": [HumanMessage(content="Let's try a different approach.")]},
config=rollback_config
)
This powers AI research agent build workflows where you want to branch from a known-good state and try different strategies.
Checkpointer 4: MongoDBSaver
If your stack already runs MongoDB, the community langgraph-checkpoint-mongodb package gives you a document-store backend.
pip install langgraph-checkpoint-mongodb
from langgraph_checkpoint_mongodb import MongoDBSaver
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017")
db = client["agent_checkpoints"]
checkpointer = MongoDBSaver(db)
graph = build_graph(checkpointer)
config = {"configurable": {"thread_id": "mongo-session-001"}}
result = graph.invoke(
{"messages": [HumanMessage(content="What are the best vector databases in 2026?")],
"step_count": 0},
config=config
)
# Each checkpoint is stored as a document in the "checkpoints" collection
for doc in db.checkpoints.find({"thread_id": "mongo-session-001"}):
print(doc["step"], doc["created_at"])
MongoDB's flexible schema means new fields appear and disappear without schema migrations as your agent evolves.
Checkpointer 5: Custom Redis Checkpointer
When none of the built-in backends fit, you can build your own by subclassing BaseCheckpointSaver. Redis is a natural choice for automatic TTL expiry.
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointMetadata
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from typing import Iterator, Optional
import json
import redis
class RedisCheckpointer(BaseCheckpointSaver):
"""Store checkpoints in Redis with configurable TTL."""
def __init__(self, redis_client, ttl_seconds: int = 86400):
super().__init__(serde=JsonPlusSerializer())
self.r = redis_client
self.ttl = ttl_seconds
def _key(self, thread_id: str, checkpoint_id: str) -> str:
return f"checkpoint:{thread_id}:{checkpoint_id}"
def _index_key(self, thread_id: str) -> str:
return f"checkpoint_index:{thread_id}"
def put(self, config: dict, checkpoint: Checkpoint,
metadata: CheckpointMetadata, new_versions: dict) -> dict:
thread_id = config["configurable"]["thread_id"]
checkpoint_id = checkpoint["id"]
serialized_type, serialized_data = self.serde.dumps_typed(checkpoint)
data = json.dumps({
"type": serialized_type,
"checkpoint": serialized_data,
"metadata": dict(metadata),
"new_versions": new_versions
})
key = self._key(thread_id, checkpoint_id)
self.r.setex(key, self.ttl, data)
self.r.rpush(self._index_key(thread_id), checkpoint_id)
self.r.expire(self._index_key(thread_id), self.ttl)
return {**config, "configurable": {
**config["configurable"],
"checkpoint_id": checkpoint_id
}}
def get_tuple(self, config: dict) -> Optional[tuple]:
thread_id = config["configurable"]["thread_id"]
checkpoint_id = config["configurable"].get("checkpoint_id")
if not checkpoint_id:
ids = self.r.lrange(self._index_key(thread_id), -1, -1)
if not ids:
return None
checkpoint_id = ids[0].decode()
raw = self.r.get(self._key(thread_id, checkpoint_id))
if not raw:
return None
stored = json.loads(raw)
checkpoint = self.serde.loads_typed((stored["type"], stored["checkpoint"]))
metadata = CheckpointMetadata(**stored["metadata"])
updated_config = {**config, "configurable": {
**config["configurable"],
"checkpoint_id": checkpoint_id
}}
return (updated_config, checkpoint, metadata, stored.get("new_versions", {}))
def list(self, config: dict, limit: Optional[int] = None) -> Iterator[tuple]:
thread_id = config["configurable"]["thread_id"]
ids = [i.decode() for i in reversed(
self.r.lrange(self._index_key(thread_id), 0, -1)
)]
for checkpoint_id in (ids[:limit] if limit else ids):
tup = self.get_tuple({**config, "configurable": {
**config["configurable"],
"checkpoint_id": checkpoint_id
}})
if tup:
yield tup
# Usage
r = redis.Redis(host="localhost", port=6379, db=0)
checkpointer = RedisCheckpointer(r, ttl_seconds=3600)
graph = build_graph(checkpointer)
config = {"configurable": {"thread_id": "redis-session-001"}}
result = graph.invoke(
{"messages": [HumanMessage(content="Explain Redis Streams.")],
"step_count": 0},
config=config
)
Advanced Pattern: Interrupt and Resume
One of the most powerful uses of checkpointing is mid-graph interruption — pausing execution for human review. This is the backbone of AI agent memory and planning with oversight.
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
class ReviewState(TypedDict):
messages: Annotated[list, operator.add]
plan: str
approved: bool
def plan_node(state: ReviewState) -> ReviewState:
plan = "Step 1: Search the web\nStep 2: Summarize results\nStep 3: Write report"
return {"plan": plan, "approved": False}
def execute_node(state: ReviewState) -> ReviewState:
if not state.get("approved"):
return {"messages": [{"role": "assistant",
"content": "Execution blocked — plan not approved."}]}
return {"messages": [{"role": "assistant",
"content": f"Executing approved plan: {state['plan']}"}]}
conn = sqlite3.connect("interrupt_demo.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
builder = StateGraph(ReviewState)
builder.add_node("plan", plan_node)
builder.add_node("execute", execute_node)
builder.set_entry_point("plan")
builder.add_edge("plan", "execute")
builder.add_edge("execute", END)
# interrupt_before pauses graph before "execute" runs
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["execute"]
)
config = {"configurable": {"thread_id": "approval-flow-001"}}
# First invocation — runs "plan" then stops before "execute"
graph.invoke({"messages": [], "plan": "", "approved": False}, config=config)
state = graph.get_state(config)
print("Graph paused. Current plan:")
print(state.values["plan"])
# Human approves by updating state directly
graph.update_state(config, {"approved": True})
# Resume — starts from "execute" with approved=True
final_result = graph.invoke(None, config=config)
print(final_result["messages"][-1].content)
Migrating State Between Checkpointers
When you need to move from SQLite to Postgres as your app scales, this migration pattern preserves all history:
import sqlite3
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
def migrate_checkpoints(sqlite_path: str, pg_uri: str, thread_ids: list):
"""Migrate all checkpoints from SQLite to PostgreSQL."""
sqlite_conn = sqlite3.connect(sqlite_path, check_same_thread=False)
sqlite_saver = SqliteSaver(sqlite_conn)
pg_pool = ConnectionPool(conninfo=pg_uri, max_size=5)
pg_saver = PostgresSaver(pg_pool)
pg_saver.setup()
for thread_id in thread_ids:
config = {"configurable": {"thread_id": thread_id}}
all_checkpoints = list(sqlite_saver.list(config))
print(f"Migrating {len(all_checkpoints)} checkpoints for thread {thread_id}")
# Write in chronological order
for tup in reversed(all_checkpoints):
cp_config, checkpoint, metadata, new_versions = tup
pg_saver.put(cp_config, checkpoint, metadata, new_versions)
print("Migration complete.")
sqlite_conn.close()
pg_pool.close()
migrate_checkpoints(
sqlite_path="agent_state.db",
pg_uri="postgresql://user:password@localhost:5432/agent_db",
thread_ids=["user-123", "user-456", "prod-user-789"]
)
Thread Management in Production
Tie thread IDs to authenticated user sessions to prevent state collisions:
import hashlib
from fastapi import FastAPI, HTTPException
from langchain_core.messages import HumanMessage
app = FastAPI()
def get_thread_id(user_id: str, conversation_id: str) -> str:
raw = f"{user_id}:{conversation_id}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
@app.post("/conversations/{conversation_id}/messages")
async def send_message(conversation_id: str, message: str, user_id: str = "demo_user"):
thread_id = get_thread_id(user_id, conversation_id)
config = {"configurable": {"thread_id": thread_id}}
try:
result = await agent_graph.ainvoke(
{"messages": [HumanMessage(content=message)], "step_count": 0},
config=config
)
state = await agent_graph.aget_state(config)
return {
"response": result["messages"][-1].content,
"thread_id": thread_id,
"total_steps": state.values.get("step_count", 0),
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
This architecture scales well when combined with Deploy AI model to production infrastructure and pairs nicely with the Build AI agent with LangChain patterns.
Checkpoint Monitoring and Cleanup
Long-running applications accumulate thousands of checkpoints. Add maintenance routines:
from datetime import datetime, timedelta
import psycopg
def cleanup_old_checkpoints(pg_uri: str, days_to_keep: int = 30) -> int:
cutoff = datetime.utcnow() - timedelta(days=days_to_keep)
with psycopg.connect(pg_uri) as conn:
result = conn.execute(
"DELETE FROM checkpoints WHERE created_at < %s RETURNING thread_id",
(cutoff,)
)
deleted_count = len(result.fetchall())
conn.commit()
print(f"Deleted {deleted_count} checkpoints older than {cutoff.date()}")
return deleted_count
def get_checkpoint_stats(pg_uri: str) -> dict:
with psycopg.connect(pg_uri) as conn:
stats = conn.execute("""
SELECT
COUNT(DISTINCT thread_id) as active_threads,
COUNT(*) as total_checkpoints,
MAX(created_at) as latest_checkpoint
FROM checkpoints
""").fetchone()
return {
"active_threads": stats[0],
"total_checkpoints": stats[1],
"latest_checkpoint": stats[2].isoformat() if stats[2] else None
}
Checkpointer Comparison Table
| Feature | MemorySaver | SqliteSaver | PostgresSaver | MongoDBSaver | Redis Custom |
|---|---|---|---|---|---|
| Survives restart | No | Yes | Yes | Yes | Yes (TTL) |
| Multi-worker safe | No | No | Yes | Yes | Yes |
| Setup complexity | None | Low | Medium | Medium | Medium |
| Query flexibility | None | SQL | SQL | MQL | Key-based |
| Auto expiry | No | No | No | No | Yes |
| Best for | Testing | Dev / small apps | Production API | MongoDB stacks | Low-latency |
| Max state size | RAM | ~TB | ~TB | ~TB | RAM-limited |
| Async support | Yes | Yes | Yes | Partial | Manual |
This mirrors the tradeoffs you find in the vector database guide — pick based on existing infrastructure and scale requirements, not feature count.
Serialization: What Gets Saved
LangGraph uses JsonPlusSerializer by default. It handles Python-native types, Pydantic models, and LangChain message objects. Custom types need explicit handling:
import numpy as np
# Bad: numpy arrays fail serialization
class BadState(TypedDict):
embeddings: object # np.ndarray will break
# Good: convert to list before storing
def safe_store_embedding(embedding) -> list:
return embedding.tolist() if hasattr(embedding, 'tolist') else list(embedding)
def restore_embedding(stored: list):
return __import__('numpy').array(stored)
Key Takeaways
Checkpointing transforms stateless graph executions into durable, resumable agent workflows. Start with MemorySaver during development, move to SqliteSaver for lightweight services, and graduate to PostgresSaver when you need concurrent workers and operational reliability.
The interrupt-and-resume pattern shown above is what makes AI agents explained concepts like human-in-the-loop oversight practical in production. The agent can pause, wait days if necessary, and continue exactly where it stopped.
For agents that retrieve information from external sources, combining checkpointing with the RAG system tutorial gives you both retrieval quality and state durability. And for agents built on OpenAI, the OpenAI API integration guide covers token management patterns that complement persistent state well.
Frequently Asked Questions
What is a LangChain checkpointer? A checkpointer saves and restores the state of a LangGraph agent between runs. It lets conversations and task progress survive process restarts, server crashes, or multi-session workflows.
Which checkpointer should I use in production? PostgresSaver is the best choice for production because it is durable, concurrent-safe, and supports multiple workers. SqliteSaver works well for single-process services or local development.
Can I migrate state from one checkpointer to another? Yes. You can read checkpoints from one backend and write them to another by iterating over the checkpoint history and replaying each snapshot into the new saver.
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
AutoGen vs LangChain: Which for Multi-Agent Systems in 2026?
AutoGen vs LangChain for multi-agent systems in 2026 — feature comparison, same use case in both frameworks, and an honest verdict on when each wins.
AutoGPT vs LangChain Agents: Which is More Autonomous?
Compare AutoGPT's zero-shot autonomy against LangChain's ReAct agents. Discover which handles complex tasks better and when to choose each framework.
10 LangChain Retrieval Strategies for Better RAG Results
Go beyond basic similarity search with ParentDocumentRetriever, MultiQueryRetriever, EnsembleRetriever, HyDE, and 6 more LangChain retrieval strategies — with code for each.
Build a LangChain Agent with Memory and Tools (Full Example)
Build a complete LangChain conversational agent with persistent memory, multiple tools, and step-by-step trace — from setup to a production-ready implementation with code.