10 LCEL Patterns: RunnableParallel and RunnableLambda (2026)
Master LangChain LCEL with 10 real patterns for RunnableParallel, RunnableLambda, branching, fan-out, and streaming with full Python code examples.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
LangChain's Expression Language (LCEL) ships with a handful of Runnable primitives that can compose into almost any execution topology you need. The problem is that most documentation shows simple linear chains and leaves the interesting patterns — fan-out execution, conditional branching, mixing sync and async, injecting side effects — to be figured out on your own.
This guide covers 10 concrete LCEL patterns built around RunnableParallel and RunnableLambda. Each pattern solves a real problem you will encounter building production LLM applications.
LCEL Quick Refresher
The pipe operator | in LCEL is syntactic sugar for RunnableSequence. Everything in a chain is a Runnable — a prompt, an LLM, a retriever, an output parser, or any of the primitives below.
from langchain_core.runnables import (
RunnableParallel,
RunnableLambda,
RunnablePassthrough,
RunnableBranch,
)
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
Pattern 1: Basic RunnableParallel Fan-Out
Run two independent LLM calls at the same time. This is the most common use case for RunnableParallel — you need multiple perspectives on the same input.
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
pros_prompt = ChatPromptTemplate.from_template("List 3 pros of: {technology}")
cons_prompt = ChatPromptTemplate.from_template("List 3 cons of: {technology}")
pros_chain = pros_prompt | llm | StrOutputParser()
cons_chain = cons_prompt | llm | StrOutputParser()
analysis_chain = RunnableParallel(
pros=pros_chain,
cons=cons_chain,
technology=RunnablePassthrough() # Pass input through unchanged
)
result = analysis_chain.invoke({"technology": "serverless functions"})
print("PROS:", result["pros"])
print("CONS:", result["cons"])
print("Input was:", result["technology"]) # {'technology': 'serverless functions'}
The two LLM calls execute concurrently via ThreadPoolExecutor, not sequentially. On a typical gpt-4o-mini call, parallel execution cuts latency roughly in half compared to sequential.
Pattern 2: RunnableParallel with Dict Shorthand
The dict shorthand creates RunnableParallel implicitly:
# These two are identical:
explicit = RunnableParallel(pros=pros_chain, cons=cons_chain)
shorthand = {"pros": pros_chain, "cons": cons_chain} # LCEL treats dicts as RunnableParallel
# Shorthand works anywhere in a chain
summary_prompt = ChatPromptTemplate.from_template(
"Given these pros: {pros}\nAnd these cons: {cons}\nWrite a balanced one-paragraph summary."
)
full_chain = (
{"pros": pros_chain, "cons": cons_chain} # parallel fan-out
| summary_prompt # merge into single prompt
| llm
| StrOutputParser()
)
summary = full_chain.invoke({"technology": "microservices architecture"})
print(summary)
This is the foundation of the patterns in the LangChain tutorial 2025 that build multi-step reasoning chains.
Pattern 3: RunnableLambda for Custom Transformations
RunnableLambda turns any Python function into a chain-compatible step:
from langchain_core.runnables import RunnableLambda
import re
def extract_bullet_points(text: str) -> list:
"""Extract bullet points from LLM output."""
lines = text.strip().split("\n")
bullets = [line.lstrip("•-*123456789. ").strip() for line in lines if line.strip()]
return [b for b in bullets if len(b) > 5]
def format_as_html_list(items: list) -> str:
"""Format a list as an HTML unordered list."""
items_html = "".join(f"<li>{item}</li>" for item in items)
return f"<ul>{items_html}</ul>"
extract_bullets = RunnableLambda(extract_bullet_points)
format_html = RunnableLambda(format_as_html_list)
html_bullets_chain = (
pros_prompt
| llm
| StrOutputParser()
| extract_bullets
| format_html
)
result = html_bullets_chain.invoke({"technology": "Python type hints"})
print(result)
# Output: <ul><li>Improves IDE autocomplete</li><li>Catches bugs early</li>...</ul>
Pattern 4: RunnablePassthrough.assign — Enriching Without Replacing
RunnablePassthrough.assign() adds keys to the existing input dictionary without dropping anything. This is essential for chains where downstream steps need both the original input and computed values.
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
import numpy as np
embeddings_model = OpenAIEmbeddings(model="text-embedding-3-small")
def get_embedding(data: dict) -> list:
"""Compute embedding for the query."""
return embeddings_model.embed_query(data["question"])
def estimate_complexity(data: dict) -> str:
"""Heuristic complexity estimate."""
word_count = len(data["question"].split())
if word_count < 8:
return "simple"
elif word_count < 20:
return "moderate"
return "complex"
enriched_chain = (
RunnablePassthrough.assign(
embedding=RunnableLambda(get_embedding),
complexity=RunnableLambda(estimate_complexity)
)
)
result = enriched_chain.invoke({"question": "What is the capital of France?"})
print(result.keys()) # dict_keys(['question', 'embedding', 'complexity'])
print(result["complexity"]) # 'simple'
print(len(result["embedding"])) # 1536
Pattern 5: RunnableBranch for Conditional Logic
RunnableBranch routes to different chains based on a condition function:
from langchain_core.runnables import RunnableBranch
simple_prompt = ChatPromptTemplate.from_template(
"Answer briefly: {question}"
)
detailed_prompt = ChatPromptTemplate.from_template(
"Provide a thorough explanation with examples: {question}"
)
technical_prompt = ChatPromptTemplate.from_template(
"Give a technical deep-dive with code examples: {question}"
)
simple_chain = simple_prompt | llm | StrOutputParser()
detailed_chain = detailed_prompt | llm | StrOutputParser()
technical_chain = technical_prompt | llm | StrOutputParser()
routing_chain = RunnableBranch(
(lambda x: x.get("complexity") == "simple", simple_chain),
(lambda x: x.get("complexity") == "complex", technical_chain),
detailed_chain # default branch
)
full_pipeline = (
RunnablePassthrough.assign(
complexity=RunnableLambda(estimate_complexity)
)
| routing_chain
)
# Routes to simple_chain
result1 = full_pipeline.invoke({"question": "What is Python?"})
# Routes to technical_chain
result2 = full_pipeline.invoke({
"question": "Explain how Python's garbage collector handles reference cycles in complex object graphs with weak references"
})
print("Simple answer:", result1[:100])
print("Technical answer:", result2[:100])
This branching pattern connects directly to the routing logic discussed in AI agent memory and planning where different agent paths handle different intent categories.
Pattern 6: Nested RunnableParallel for Multi-Step Fan-Out
Fan out, then fan out again:
# First level: generate draft and gather context in parallel
first_level = RunnableParallel(
draft=ChatPromptTemplate.from_template("Write a draft blog post about {topic}") | llm | StrOutputParser(),
seo_keywords=ChatPromptTemplate.from_template("List 8 SEO keywords for {topic}") | llm | StrOutputParser(),
target_audience=ChatPromptTemplate.from_template("Describe the target audience for {topic} content") | llm | StrOutputParser()
)
# Second level: use all three outputs to produce final content
def merge_for_revision(data: dict) -> dict:
return {
"draft": data["draft"],
"keywords": data["seo_keywords"],
"audience": data["target_audience"]
}
revision_prompt = ChatPromptTemplate.from_template(
"""Revise this blog post draft:
Draft: {draft}
Incorporate these SEO keywords naturally: {keywords}
Write for this audience: {audience}
Produce the final polished post."""
)
content_pipeline = (
first_level
| RunnableLambda(merge_for_revision)
| revision_prompt
| llm
| StrOutputParser()
)
final_post = content_pipeline.invoke({"topic": "LangChain LCEL patterns"})
print(final_post[:500])
Pattern 7: Async Fan-Out with ainvoke
For async web applications, all LCEL primitives support async natively:
import asyncio
from langchain_core.runnables import RunnableParallel
async def run_parallel_analysis(topic: str) -> dict:
"""Run multiple analyses in parallel using async."""
parallel_chain = RunnableParallel(
summary=ChatPromptTemplate.from_template("Summarize: {topic}") | llm | StrOutputParser(),
faqs=ChatPromptTemplate.from_template("Generate 5 FAQs about: {topic}") | llm | StrOutputParser(),
tweet=ChatPromptTemplate.from_template("Write a tweet about: {topic}") | llm | StrOutputParser(),
headline=ChatPromptTemplate.from_template("Write a headline for: {topic}") | llm | StrOutputParser()
)
# All 4 LLM calls run as concurrent asyncio tasks
result = await parallel_chain.ainvoke({"topic": topic})
return result
# In an async context (FastAPI endpoint, etc.)
async def main():
result = await run_parallel_analysis("quantum computing breakthroughs 2026")
print("Summary:", result["summary"][:150])
print("Tweet:", result["tweet"])
print("Headline:", result["headline"])
asyncio.run(main())
The async version has no thread overhead — each LLM call is a native asyncio task, making it ideal for high-throughput FastAPI endpoints.
Pattern 8: Streaming Through Parallel Chains
Streaming works differently with RunnableParallel — by default, parallel output is buffered. If you want streaming, apply it to individual branches:
from langchain_core.callbacks import StreamingStdOutCallbackHandler
streaming_llm = ChatOpenAI(
model="gpt-4o-mini",
streaming=True
)
# Stream a single chain
single_chain = (
ChatPromptTemplate.from_template("Write about {topic}")
| streaming_llm
| StrOutputParser()
)
print("Streaming single chain:")
for chunk in single_chain.stream({"topic": "the future of AI agents"}):
print(chunk, end="", flush=True)
print()
# For parallel chains, stream after the merge step
parallel_then_stream = (
RunnableParallel(
pros=pros_chain,
cons=cons_chain
)
| ChatPromptTemplate.from_template("Synthesize: pros={pros}, cons={cons}")
| streaming_llm
| StrOutputParser()
)
print("\nStreaming merged parallel output:")
for chunk in parallel_then_stream.stream({"technology": "AI coding assistants"}):
print(chunk, end="", flush=True)
Pattern 9: RunnableLambda with Side Effects
Sometimes you need to log, track costs, or write to a database mid-chain without changing the data flow:
import time
from datetime import datetime
execution_log = []
def log_and_pass(data, step_name: str = "step"):
"""Log execution metadata without modifying data."""
execution_log.append({
"step": step_name,
"timestamp": datetime.utcnow().isoformat(),
"input_type": type(data).__name__,
"input_preview": str(data)[:100]
})
return data # Return unchanged
def create_logger(step_name: str) -> RunnableLambda:
return RunnableLambda(lambda x: log_and_pass(x, step_name))
# Insert loggers between steps without changing behavior
monitored_chain = (
create_logger("input_received")
| pros_prompt
| create_logger("after_prompt_format")
| llm
| create_logger("after_llm_call")
| StrOutputParser()
| create_logger("final_output")
)
result = monitored_chain.invoke({"technology": "LangChain LCEL"})
print("Result:", result[:100])
print(f"\nExecution log ({len(execution_log)} steps recorded):")
for entry in execution_log:
print(f" [{entry['timestamp'][:19]}] {entry['step']}: {entry['input_type']}")
This kind of instrumentation is far cheaper to implement in LCEL than in raw Python because you can insert monitoring at any point in the chain without refactoring surrounding code. The LangChain callbacks logging tracing streaming guide covers this in depth.
Pattern 10: Dynamic Chain Construction at Runtime
Sometimes the chain topology itself needs to change based on runtime data:
from langchain_core.runnables import RunnableLambda, RunnableParallel
from typing import Any
def build_dynamic_chain(config: dict):
"""Build a chain dynamically based on configuration."""
branches = {}
if config.get("include_summary"):
branches["summary"] = (
ChatPromptTemplate.from_template("Summarize: {input}")
| llm | StrOutputParser()
)
if config.get("include_sentiment"):
branches["sentiment"] = (
ChatPromptTemplate.from_template("Classify sentiment (positive/negative/neutral): {input}")
| llm | StrOutputParser()
)
if config.get("include_keywords"):
branches["keywords"] = (
ChatPromptTemplate.from_template("Extract 5 keywords from: {input}")
| llm | StrOutputParser()
)
if not branches:
return RunnableLambda(lambda x: {"error": "No analysis configured"})
return RunnableParallel(**branches)
# Build different chains for different use cases
news_analyzer = build_dynamic_chain({
"include_summary": True,
"include_sentiment": True,
"include_keywords": True
})
simple_analyzer = build_dynamic_chain({
"include_summary": True,
"include_sentiment": False,
"include_keywords": False
})
text = "LangChain announced a major update to their LCEL system, adding new parallel execution features."
full_analysis = news_analyzer.invoke({"input": text})
print("Full analysis keys:", list(full_analysis.keys()))
simple_analysis = simple_analyzer.invoke({"input": text})
print("Simple analysis keys:", list(simple_analysis.keys()))
This dynamic construction pattern is used in production Build AI agent with LangChain systems where different user tiers get different chain complexity.
Combining All Patterns: Production Content Pipeline
Here is a realistic production pipeline that uses multiple patterns together:
from langchain_core.runnables import (
RunnableParallel, RunnableLambda, RunnablePassthrough, RunnableBranch
)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
import re
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
# Step 1: Classify content type
classify_prompt = ChatPromptTemplate.from_template(
"Classify this content request as 'blog', 'social', or 'email': {request}. Reply with just the type."
)
def clean_classification(text: str) -> str:
for t in ["blog", "social", "email"]:
if t in text.lower():
return t
return "blog"
classify_chain = classify_prompt | llm | StrOutputParser() | RunnableLambda(clean_classification)
# Step 2: Parallel content generation based on type
blog_chain = (
ChatPromptTemplate.from_template("Write a 300-word blog post about: {request}")
| llm | StrOutputParser()
)
social_chain = (
ChatPromptTemplate.from_template("Write 3 social media posts about: {request}")
| llm | StrOutputParser()
)
email_chain = (
ChatPromptTemplate.from_template("Write a marketing email about: {request}")
| llm | StrOutputParser()
)
content_router = RunnableBranch(
(lambda x: x["content_type"] == "social", social_chain),
(lambda x: x["content_type"] == "email", email_chain),
blog_chain # default
)
# Step 3: Parallel quality checks
def check_word_count(text: str) -> dict:
count = len(text.split())
return {"word_count": count, "meets_minimum": count >= 50}
quality_chain = RunnableParallel(
word_count_check=RunnableLambda(check_word_count),
has_cta=RunnableLambda(lambda t: {"has_cta": any(
phrase in t.lower() for phrase in ["click", "learn more", "sign up", "contact"]
)})
)
# Full pipeline
full_pipeline = (
RunnablePassthrough.assign(
content_type=RunnableLambda(lambda x: classify_chain.invoke(x))
)
| RunnablePassthrough.assign(
content=content_router
)
| RunnableLambda(lambda x: x["content"]) # Extract just the content string
| RunnablePassthrough.assign(
quality=quality_chain
)
)
result = full_pipeline.invoke({"request": "our new AI writing assistant product launch"})
print("Content preview:", result["content"][:300])
print("Quality checks:", result["quality"])
Performance Comparison Table
| Pattern | Latency | Throughput | Use Case |
|---|---|---|---|
Sequential chain (|) | Additive | Low | Simple linear transformations |
| RunnableParallel (sync) | Max(branches) | Medium | Multiple LLM calls, same input |
| RunnableParallel (async) | Max(branches) | High | High-concurrency web APIs |
| RunnableBranch | Single branch | Medium | Conditional routing |
| RunnableLambda (sync) | Microseconds | High | Data transformation, filtering |
| Nested parallel | Max(outer) | Medium | Multi-stage enrichment |
The performance characteristics explain why AI research agent build pipelines lean heavily on RunnableParallel — running 4 parallel searches takes the same time as 1, not 4x longer.
Error Handling in LCEL Chains
from langchain_core.runnables import RunnableLambda
def safe_llm_call(data: dict) -> str:
"""Wrap LLM call with error handling."""
try:
chain = pros_prompt | llm | StrOutputParser()
return chain.invoke(data)
except Exception as e:
return f"Error generating content: {str(e)}"
# Or use .with_fallbacks()
primary_chain = pros_prompt | llm | StrOutputParser()
fallback_chain = (
ChatPromptTemplate.from_template("List advantages of {technology}")
| ChatOpenAI(model="gpt-3.5-turbo")
| StrOutputParser()
)
robust_chain = primary_chain.with_fallbacks([fallback_chain])
result = robust_chain.invoke({"technology": "Kubernetes"})
Key Takeaways
RunnableParallel is your tool for concurrent execution — use it whenever you need multiple independent LLM calls on the same input. RunnableLambda is your escape hatch — it wraps any Python logic into the chain model. RunnablePassthrough.assign() is underrated: it lets you enrich data flowing through a chain without replacing it.
The ten patterns above cover the vast majority of production LCEL use cases. They compose cleanly — you can nest any of them inside each other, and the resulting chain still supports .invoke(), .stream(), .ainvoke(), and .astream() uniformly.
For agents that use these chains as tools, see the Build AI agent with LangChain guide. For RAG pipelines that combine retrievers with parallel processing, the RAG system tutorial shows how retrieval steps plug into LCEL chains naturally.
Frequently Asked Questions
What is the difference between RunnableParallel and RunnableLambda? RunnableParallel runs multiple runnables simultaneously and merges their outputs into a dictionary. RunnableLambda wraps a plain Python function so it can participate in an LCEL chain — it runs sequentially, not in parallel.
When should I use RunnablePassthrough versus RunnableLambda? Use RunnablePassthrough to forward the input unchanged to the next step, or RunnablePassthrough.assign() to add new keys to the input dict. Use RunnableLambda when you need custom transformation logic that does not fit a built-in runnable.
Does RunnableParallel actually run in parallel threads? Yes. By default, RunnableParallel uses a ThreadPoolExecutor to run its branches concurrently. For async usage, ainvoke runs the branches as concurrent asyncio tasks, providing true async parallelism.
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
AutoGen vs LangChain: Which for Multi-Agent Systems in 2026?
AutoGen vs LangChain for multi-agent systems in 2026 — feature comparison, same use case in both frameworks, and an honest verdict on when each wins.
AutoGPT vs LangChain Agents: Which is More Autonomous?
Compare AutoGPT's zero-shot autonomy against LangChain's ReAct agents. Discover which handles complex tasks better and when to choose each framework.
10 LangChain Retrieval Strategies for Better RAG Results
Go beyond basic similarity search with ParentDocumentRetriever, MultiQueryRetriever, EnsembleRetriever, HyDE, and 6 more LangChain retrieval strategies — with code for each.
Build a LangChain Agent with Memory and Tools (Full Example)
Build a complete LangChain conversational agent with persistent memory, multiple tools, and step-by-step trace — from setup to a production-ready implementation with code.