How to Use AutoGen with Milvus (Vector Database Memory)
Integrate Milvus vector database with AutoGen agents for large-scale persistent memory. Full setup guide with LangChain integration and vector DB comparison table.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
AutoGen agents have a problem most demos don't show: they forget everything between sessions and lose context as conversations grow. The context window fills up. Old information gets truncated. Agents start contradicting themselves because they can't access what they learned two hours ago.
Large-scale memory through a vector database solves this. Milvus is purpose-built for exactly this use case — billions of vectors, millisecond retrieval, and metadata filtering that lets agents find precisely what they need from massive knowledge stores.
This guide walks through integrating Milvus with AutoGen to give your agents genuinely persistent, scalable memory.
Why Milvus for Agent Memory
The core problem with agent memory is retrieval speed at scale. A RAG system that works fine with 10,000 documents often becomes a bottleneck at 1 million. Milvus is designed from the ground up for this scale.
Key Milvus capabilities that matter for agents:
- HNSW indexing — approximate nearest neighbor search in milliseconds at any scale
- Scalar filtering — combine vector similarity with metadata filters (agent ID, timestamp, topic)
- Partitioning — separate collections for different agent types or knowledge domains
- Hybrid search — combine dense (semantic) and sparse (keyword) vectors in one query
- Persistence — survives restarts, unlike in-memory solutions
For a deeper look at vector database choices overall, the Vector database guide covers the broader landscape. This guide focuses on Milvus specifically because of its performance at the scale where agent memory actually needs help.
Installation and Setup
Option A: Milvus Lite (development, no Docker needed)
pip install pymilvus[model] autogen openai langchain-community langchain-openai
Milvus Lite runs embedded — no server, no Docker. It's perfect for development and works with the same API as full Milvus.
Option B: Milvus Standalone (Docker)
# docker-compose.yml
version: '3.5'
services:
etcd:
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
milvus:
image: milvusdb/milvus:v2.4.0
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- etcd
docker-compose up -d
Setting Up Milvus for Agent Memory
from pymilvus import (
connections,
FieldSchema,
CollectionSchema,
DataType,
Collection,
utility
)
import numpy as np
class MilvusAgentMemory:
"""Persistent vector memory for AutoGen agents."""
def __init__(
self,
collection_name: str = "agent_memory",
dimension: int = 1536, # text-embedding-3-small dimension
use_lite: bool = True,
host: str = "localhost",
port: int = 19530
):
self.collection_name = collection_name
self.dimension = dimension
# Connect to Milvus
if use_lite:
# Milvus Lite — embedded, no server
connections.connect("default", uri="./milvus_agent_memory.db")
else:
connections.connect("default", host=host, port=port)
self._create_collection()
def _create_collection(self):
"""Create collection with schema for agent memory."""
if utility.has_collection(self.collection_name):
self.collection = Collection(self.collection_name)
self.collection.load()
return
# Define schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="agent_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=8192),
FieldSchema(name="role", dtype=DataType.VARCHAR, max_length=50),
FieldSchema(name="timestamp", dtype=DataType.INT64),
FieldSchema(name="importance_score", dtype=DataType.FLOAT),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dimension)
]
schema = CollectionSchema(
fields=fields,
description="AutoGen agent persistent memory"
)
self.collection = Collection(
name=self.collection_name,
schema=schema
)
# Create HNSW index for fast similarity search
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {
"M": 16, # Graph connectivity — higher = better recall, more memory
"efConstruction": 200 # Build-time search depth
}
}
self.collection.create_index(
field_name="embedding",
index_params=index_params
)
self.collection.load()
print(f"Collection '{self.collection_name}' created and loaded.")
def add_memory(
self,
agent_id: str,
session_id: str,
content: str,
role: str,
embedding: list,
importance_score: float = 1.0
) -> int:
"""Store a memory entry with its embedding."""
import time
data = {
"agent_id": agent_id,
"session_id": session_id,
"content": content[:8000], # Truncate to schema max
"role": role,
"timestamp": int(time.time()),
"importance_score": importance_score,
"embedding": embedding
}
result = self.collection.insert([data])
return result.primary_keys[0]
def search_memory(
self,
query_embedding: list,
agent_id: str = None,
top_k: int = 5,
min_importance: float = 0.0
) -> list:
"""Retrieve most relevant memories for a query."""
search_params = {
"metric_type": "COSINE",
"params": {"ef": 50} # Search-time parameter — higher = better recall
}
# Build filter expression
filters = []
if agent_id:
filters.append(f'agent_id == "{agent_id}"')
if min_importance > 0:
filters.append(f"importance_score >= {min_importance}")
expr = " && ".join(filters) if filters else None
results = self.collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=top_k,
expr=expr,
output_fields=["agent_id", "content", "role", "timestamp", "importance_score"]
)
memories = []
for hit in results[0]:
memories.append({
"id": hit.id,
"content": hit.entity.get("content"),
"role": hit.entity.get("role"),
"timestamp": hit.entity.get("timestamp"),
"importance": hit.entity.get("importance_score"),
"similarity": hit.score
})
return memories
def get_stats(self) -> dict:
"""Get memory statistics."""
return {
"collection": self.collection_name,
"total_vectors": self.collection.num_entities,
"dimension": self.dimension
}
Embedding Service
from openai import OpenAI
from typing import Union
import numpy as np
class EmbeddingService:
"""Handles embedding generation with caching."""
def __init__(self, model: str = "text-embedding-3-small"):
self.client = OpenAI()
self.model = model
self._cache = {}
def embed(self, text: str) -> list:
"""Generate embedding for text."""
# Simple in-memory cache for identical strings
if text in self._cache:
return self._cache[text]
# Truncate to avoid token limit
text = text[:8000] if len(text) > 8000 else text
response = self.client.embeddings.create(
model=self.model,
input=text
)
embedding = response.data[0].embedding
self._cache[text] = embedding
return embedding
def embed_batch(self, texts: list) -> list:
"""Batch embedding for efficiency."""
# Filter out cached items
uncached = [t for t in texts if t not in self._cache]
if uncached:
response = self.client.embeddings.create(
model=self.model,
input=uncached
)
for text, result in zip(uncached, response.data):
self._cache[text] = result.embedding
return [self._cache[text] for text in texts]
AutoGen Agent with Milvus Memory
import autogen
import json
from datetime import datetime
embedder = EmbeddingService()
memory_store = MilvusAgentMemory(use_lite=True)
class MemoryEnabledAssistant(autogen.AssistantAgent):
"""AutoGen AssistantAgent with Milvus persistent memory."""
def __init__(self, memory: MilvusAgentMemory, embedding_service: EmbeddingService, **kwargs):
super().__init__(**kwargs)
self.memory = memory
self.embedder = embedding_service
self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
def generate_reply(self, messages=None, sender=None, **kwargs):
"""Override to inject relevant memories before generating reply."""
if messages:
# Get the last user message
last_user_msg = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
""
)
if last_user_msg:
# Retrieve relevant memories
query_embedding = self.embedder.embed(last_user_msg)
relevant_memories = self.memory.search_memory(
query_embedding=query_embedding,
agent_id=self.name,
top_k=5,
min_importance=0.5
)
if relevant_memories:
memory_context = self._format_memories(relevant_memories)
# Inject memories into system context
memory_message = {
"role": "system",
"content": f"[RELEVANT PAST CONTEXT]\n{memory_context}\n[END PAST CONTEXT]"
}
messages = [memory_message] + list(messages)
# Generate reply with enriched context
reply = super().generate_reply(messages=messages, sender=sender, **kwargs)
# Store the reply in memory
if reply and isinstance(reply, str):
reply_embedding = self.embedder.embed(reply[:1000])
self.memory.add_memory(
agent_id=self.name,
session_id=self.session_id,
content=reply,
role="assistant",
embedding=reply_embedding,
importance_score=self._score_importance(reply)
)
return reply
def _format_memories(self, memories: list) -> str:
"""Format retrieved memories for context injection."""
formatted = []
for m in memories:
score = f"{m['similarity']:.2f}"
ts = datetime.fromtimestamp(m['timestamp']).strftime("%Y-%m-%d")
formatted.append(f"[{ts}, relevance: {score}] {m['content'][:300]}")
return "\n".join(formatted)
def _score_importance(self, content: str) -> float:
"""Heuristic importance scoring — customize for your use case."""
# Higher importance for longer, more detailed responses
length_score = min(len(content) / 2000, 1.0)
# Higher importance for content with key indicators
key_indicators = ["important", "remember", "note", "critical", "key finding"]
indicator_score = sum(1 for k in key_indicators if k in content.lower()) * 0.1
return min(length_score + indicator_score, 1.0)
# Create memory-enabled agents
llm_config = {
"config_list": [{"model": "gpt-4o", "api_key": "your-key"}],
"temperature": 0.1
}
memory_assistant = MemoryEnabledAssistant(
memory=memory_store,
embedding_service=embedder,
name="Memory_Assistant",
llm_config=llm_config,
system_message="""You are a knowledgeable assistant with long-term memory.
You recall relevant past interactions to provide better, more consistent responses.
Reference past context when relevant."""
)
user_proxy = autogen.UserProxyAgent(
name="User",
human_input_mode="TERMINATE",
max_consecutive_auto_reply=10,
code_execution_config=False,
is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)
LangChain + AutoGen + Milvus Pattern
For more sophisticated retrieval (hybrid search, document chunking), use LangChain's Milvus integration as the retrieval layer:
from langchain_community.vectorstores import Milvus
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
import autogen
# Set up LangChain Milvus vector store
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Milvus(
embedding_function=embeddings,
collection_name="agent_knowledge_base",
connection_args={"uri": "./langchain_milvus.db"}, # Milvus Lite
drop_old=False
)
# Create retrieval chain
retriever = vector_store.as_retriever(
search_type="mmr", # Maximum Marginal Relevance — diverse results
search_kwargs={
"k": 6,
"fetch_k": 20, # Candidate pool for MMR
"lambda_mult": 0.6 # Diversity vs relevance balance
}
)
def load_documents_to_milvus(file_paths: list):
"""Load and index documents into Milvus."""
from langchain_community.document_loaders import (
PyPDFLoader, TextLoader, WebBaseLoader
)
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
all_docs = []
for path in file_paths:
if path.startswith("http"):
loader = WebBaseLoader(path)
elif path.endswith(".pdf"):
loader = PyPDFLoader(path)
else:
loader = TextLoader(path)
docs = loader.load()
chunks = splitter.split_documents(docs)
all_docs.extend(chunks)
vector_store.add_documents(all_docs)
print(f"Loaded {len(all_docs)} chunks into Milvus")
def retrieve_context(query: str, top_k: int = 5) -> str:
"""Retrieve relevant context for an agent query."""
docs = retriever.get_relevant_documents(query)
context_parts = []
for i, doc in enumerate(docs[:top_k], 1):
source = doc.metadata.get("source", "unknown")
context_parts.append(f"[Source {i}: {source}]\n{doc.page_content}")
return "\n\n".join(context_parts)
# AutoGen with LangChain Milvus retrieval
def create_rag_autogen_agent():
"""Create AutoGen agent that uses Milvus for RAG."""
system_message = """You are a knowledge assistant with access to a document database.
When answering questions, always consider the retrieved context provided.
Cite sources when using specific information from documents."""
assistant = autogen.AssistantAgent(
name="RAG_Assistant",
llm_config=llm_config,
system_message=system_message
)
def rag_reply_function(recipient, messages, sender, config):
"""Inject RAG context before agent generates reply."""
if messages:
last_msg = messages[-1].get("content", "")
context = retrieve_context(last_msg)
augmented_message = f"""RETRIEVED CONTEXT:
{context}
USER QUESTION: {last_msg}
Answer based on the retrieved context. Cite specific sources."""
# Temporarily modify last message for context injection
augmented_messages = list(messages[:-1]) + [
{"role": "user", "content": augmented_message}
]
return False, None # Let normal processing continue
return False, None
return assistant
Milvus vs Other Vector Databases for Agent Memory
| Feature | Milvus | Pinecone | ChromaDB | Weaviate | pgvector |
|---|---|---|---|---|---|
| Scale | Billions | Hundreds of millions | Millions | Hundreds of millions | Tens of millions |
| Query speed | Sub-ms at scale | Sub-ms | ms-range | ms-range | Slower at scale |
| Self-hosted | Yes (free) | No | Yes (free) | Yes | Yes (PostgreSQL) |
| Managed cloud | Zilliz Cloud | Native | Yes | Weaviate Cloud | Supabase/Neon |
| Hybrid search | Yes | Beta | No | Yes | Limited |
| Metadata filtering | Yes | Yes | Yes | Yes | Yes |
| Setup complexity | Medium | Low | Very Low | Medium | Low |
| Agent memory fit | Excellent | Good | Good for small | Good | OK for moderate |
For most AutoGen projects at startup scale (under 1M vectors), ChromaDB's simplicity wins. Milvus becomes the right choice when you need performance guarantees at scale, hybrid search, or fine-grained partitioning for multiple agent types.
Running the Complete System
# Full example: Document Q&A agent with Milvus memory
# 1. Load documents
load_documents_to_milvus([
"company_handbook.pdf",
"product_documentation.pdf",
"https://docs.your-company.com/api"
])
# 2. Create agents
rag_assistant = create_rag_autogen_agent()
user_proxy = autogen.UserProxyAgent(
name="User",
human_input_mode="TERMINATE",
max_consecutive_auto_reply=5,
code_execution_config=False,
is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)
# 3. Start conversation — agent has access to all loaded documents
user_proxy.initiate_chat(
rag_assistant,
message="What is our company's remote work policy?"
)
# Check memory stats
stats = memory_store.get_stats()
print(f"\nMemory stats: {stats}")
For more on how this integrates with broader AutoGen architectures, the AI research agent build guide shows Milvus memory in a research workflow. The AI agent memory and planning post explains the theoretical foundations for why vector memory improves agent performance on long-horizon tasks.
The Build AI chatbot Python tutorial shows a simpler memory pattern that's worth understanding before adopting full vector database memory.
Milvus with AutoGen gives you the infrastructure foundation for agents that genuinely improve over time — agents that remember customer preferences, learn from past mistakes, and build up specialized knowledge about your domain. That's where large-scale memory stops being an infrastructure concern and starts being a product differentiator.
Frequently Asked Questions
Why use Milvus with AutoGen instead of simpler memory solutions?
Milvus handles millions of vectors with sub-millisecond query times, which matters when your agent needs to retrieve from large knowledge bases. Simpler solutions like in-memory lists or SQLite degrade quickly beyond 100K documents. Milvus also supports filtering by metadata alongside vector similarity, enabling precise contextual retrieval.
Can AutoGen connect to Milvus directly?
AutoGen doesn't have a native Milvus connector, but integrating them is straightforward: use pymilvus to manage the vector store and inject retrieved context into agent messages before LLM calls. Alternatively, use LangChain's Milvus integration as the retrieval layer within an AutoGen workflow.
How does vector database memory work for AI agents?
The agent converts important information (past conversations, documents, facts) into embeddings — numerical vectors representing semantic meaning. When the agent needs context, it embeds the current query, searches the vector database for similar stored vectors, and retrieves the most relevant past information. This gives the agent access to unlimited history without overflowing its context window.
Is Milvus free to use for AutoGen projects?
Milvus is fully open source and free to self-host. Zilliz Cloud offers a managed Milvus service with a free tier (1 collection, up to 1M vectors). For development, Milvus Lite runs as an embedded library with no server required — ideal for getting started before scaling to a full deployment.
What embedding model should I use with Milvus and AutoGen?
OpenAI's text-embedding-3-small (1536 dimensions) is the most common choice — good quality, cost-effective, and integrates cleanly with AutoGen's OpenAI-based setup. For cost-sensitive or local deployments, sentence-transformers/all-MiniLM-L6-v2 (384 dimensions) runs locally and performs well for most retrieval tasks.
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
5 AutoGen Agent Roles (Assistant, UserProxy, CodeExecutor)
Understand the 5 core AutoGen agent types — AssistantAgent, UserProxyAgent, CodeExecutorAgent, and more — with code examples and a comparison table for each role.
How to Deploy AutoGen Agents as APIs with FastAPI (2026)
Learn to serve AutoGen multi-agent systems as production REST APIs using FastAPI with async endpoints and real-time streaming responses.
How to Use AutoGen with Azure OpenAI (Enterprise Security)
Connect Microsoft AutoGen to Azure OpenAI for enterprise-grade AI agents. Step-by-step setup with private endpoints, OAI_CONFIG_LIST, and deployment config.
Build a Code Debugging Agent with AutoGen (Auto-Fix PRs)
Build an AutoGen agent that reviews code, analyzes PR diffs, suggests fixes, and automates code quality improvements with a full working implementation.