Python Async Programming Guide 2026 — asyncio, aiohttp & Concurrency
Master async programming in Python with asyncio. Learn concurrent programming, aiohttp for async HTTP, async database operations, and build high-performance Python applications.
Get more content like this on Telegram!
Daily AI tips, notes & resources — free
Python Async Programming Guide 2026 — Concurrent Code That Actually Works
Your program makes 10 API requests. Each takes 500ms. Synchronous code: 5 seconds. Async code: 500ms.
That is the promise of async programming — and it delivers. When your code spends most of its time waiting (waiting for a network response, a database query, a file to load), it does not need to block the entire program. Other work can happen during that wait.
This guide teaches you asyncio from the ground up — what async actually does, when to use it, and how to write concurrent Python that is clear and correct.
The Problem: Blocking I/O
import requests
import time
def fetch_prices(symbols: list[str]) -> dict:
"""Fetch stock prices — synchronous, blocking."""
prices = {}
for symbol in symbols:
response = requests.get(f"https://api.example.com/price/{symbol}")
prices[symbol] = response.json()["price"]
return prices
start = time.perf_counter()
prices = fetch_prices(["AAPL", "GOOG", "MSFT", "AMZN", "META"])
print(f"Time: {time.perf_counter() - start:.2f}s")
# Time: 2.50s (5 requests × 500ms each)
While waiting for each request, the program does nothing — the CPU sits idle. Async fixes this.
The Core Concepts
Coroutines
An async def function is a coroutine — a function that can be paused and resumed.
import asyncio
async def greet(name: str) -> str:
await asyncio.sleep(1) # Simulate I/O wait (non-blocking)
return f"Hello, {name}!"
# You MUST await a coroutine to run it
async def main():
result = await greet("Alice")
print(result)
asyncio.run(main())
await tells Python: "pause this coroutine here, let other coroutines run, resume when the awaited thing finishes."
The Event Loop
asyncio runs a single-threaded event loop that:
- Starts a coroutine
- Runs it until it hits an
await - Switches to another coroutine during the wait
- Comes back when the awaited operation completes
import asyncio
async def task(name: str, delay: float) -> str:
print(f"{name}: starting")
await asyncio.sleep(delay) # Non-blocking wait
print(f"{name}: done after {delay}s")
return name
async def main():
start = asyncio.get_event_loop().time()
# Sequential — takes 3 seconds total
await task("Task A", 1)
await task("Task B", 1)
await task("Task C", 1)
print(f"Sequential time: {asyncio.get_event_loop().time() - start:.2f}s")
asyncio.run(main())
# Sequential time: 3.00s
Running Tasks Concurrently
asyncio.gather — Run Multiple Coroutines Together
import asyncio
import time
async def task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
start = time.perf_counter()
# All three run concurrently
results = await asyncio.gather(
task("Task A", 1.0),
task("Task B", 1.5),
task("Task C", 0.8),
)
elapsed = time.perf_counter() - start
print(f"Results: {results}")
print(f"Total time: {elapsed:.2f}s") # ~1.5s instead of 3.3s
asyncio.run(main())
# Results: ['Task A completed', 'Task B completed', 'Task C completed']
# Total time: 1.50s
asyncio.create_task — Background Tasks
async def main():
# Create tasks immediately (start running)
task1 = asyncio.create_task(task("A", 2.0))
task2 = asyncio.create_task(task("B", 1.0))
task3 = asyncio.create_task(task("C", 1.5))
# Do other work while tasks run in background
print("Tasks started, doing other work...")
await asyncio.sleep(0.5)
print("Other work done, waiting for tasks...")
# Wait for all tasks
results = await asyncio.gather(task1, task2, task3)
return results
Async HTTP Requests with aiohttp
pip install aiohttp
import asyncio
import aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""Fetch a single URL and return the JSON response."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status()
data = await response.json()
return {"url": url, "status": response.status, "data": data}
except aiohttp.ClientError as e:
return {"url": url, "error": str(e)}
async def fetch_all(urls: list[str]) -> list[dict]:
"""Fetch multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
# Real-world example: Check status of multiple APIs
async def health_check():
endpoints = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/users/1",
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/albums/1",
"https://jsonplaceholder.typicode.com/photos/1",
]
start = time.perf_counter()
results = await fetch_all(endpoints)
elapsed = time.perf_counter() - start
for result in results:
status = result.get("status", "ERROR")
print(f" {result['url'][-20:]}: {status}")
print(f"\nChecked {len(endpoints)} endpoints in {elapsed:.2f}s")
asyncio.run(health_check())
Rate Limiting Async Requests
When fetching hundreds of URLs, you need to control concurrency to avoid overwhelming servers:
import asyncio
import aiohttp
from asyncio import Semaphore
async def fetch_with_limit(
session: aiohttp.ClientSession,
url: str,
semaphore: Semaphore
) -> dict:
async with semaphore: # Only N requests at a time
async with session.get(url) as response:
return {"url": url, "data": await response.json()}
async def fetch_batch(urls: list[str], concurrency: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
# Now you can safely fetch 1000 URLs with max 10 concurrent requests
Async Database Operations
pip install asyncpg # For PostgreSQL
pip install aiosqlite # For SQLite
import asyncio
import aiosqlite
async def setup_db(db_path: str) -> None:
async with aiosqlite.connect(db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
""")
await db.commit()
async def insert_user(db_path: str, name: str, email: str) -> int:
async with aiosqlite.connect(db_path) as db:
cursor = await db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)", (name, email)
)
await db.commit()
return cursor.lastrowid
async def get_users(db_path: str) -> list[dict]:
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM users") as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def main():
db = "users.db"
await setup_db(db)
# Insert users concurrently
await asyncio.gather(
insert_user(db, "Alice", "alice@example.com"),
insert_user(db, "Bob", "bob@example.com"),
insert_user(db, "Charlie", "charlie@example.com"),
)
users = await get_users(db)
for user in users:
print(f"User: {user['name']} ({user['email']})")
asyncio.run(main())
Async Context Managers and Iterators
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_resource(name: str):
print(f"Acquiring {name}...")
await asyncio.sleep(0.1) # Simulate async acquisition
try:
yield name
finally:
print(f"Releasing {name}...")
await asyncio.sleep(0.1) # Simulate async cleanup
async def main():
async with managed_resource("database") as resource:
print(f"Using {resource}")
# Async generators
async def async_range(stop: int, delay: float = 0):
for i in range(stop):
await asyncio.sleep(delay)
yield i
async def main():
async for value in async_range(5, delay=0.1):
print(f"Got: {value}")
Async vs Threading vs Multiprocessing
| Approach | Best For | Threads Used | GIL Affected |
|---|---|---|---|
| asyncio | I/O-bound (network, DB, files) | 1 | No (never releases) |
| threading | I/O-bound + blocking libs | Multiple | Yes (limits CPU work) |
| multiprocessing | CPU-bound (math, ML, image) | Multiple processes | No (each has own GIL) |
# CPU-bound work → use multiprocessing
from concurrent.futures import ProcessPoolExecutor
import asyncio
def cpu_heavy_work(n: int) -> int:
return sum(i * i for i in range(n)) # Pure computation
async def run_cpu_tasks():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as executor:
# Run CPU tasks in separate processes, await the results
results = await asyncio.gather(
loop.run_in_executor(executor, cpu_heavy_work, 1_000_000),
loop.run_in_executor(executor, cpu_heavy_work, 2_000_000),
loop.run_in_executor(executor, cpu_heavy_work, 3_000_000),
)
return results
Putting It All Together — Async Web Scraper
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import json
async def scrape_page(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url, headers={"User-Agent": "Mozilla/5.0"}) as response:
html = await response.text()
soup = BeautifulSoup(html, "lxml")
return {
"url": url,
"title": soup.find("h1").text.strip() if soup.find("h1") else "",
"links": [a.get("href") for a in soup.find_all("a", href=True)][:10],
}
async def scrape_all(urls: list[str], concurrency: int = 5) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
async def bounded_scrape(url: str) -> dict:
async with semaphore:
return await scrape_page(session, url)
async with aiohttp.ClientSession() as session:
tasks = [bounded_scrape(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if isinstance(r, dict)]
urls = ["https://example.com", "https://httpbin.org/html"]
results = asyncio.run(scrape_all(urls))
print(json.dumps(results, indent=2))
Async programming integrates naturally with FastAPI — our FastAPI tutorial shows async endpoints throughout. For the decorators pattern used in this guide, see the decorators and generators guide.
The mental model that unlocks async Python: your program is not faster because it has more processing power. It is faster because it stops wasting time waiting. When one task waits, another runs. That is all async is.
Async Python examples and performance benchmarks available in the AiTechWorlds Telegram channel!
Frequently Asked Questions
AiTechWorlds Team
✓ Verified WriterThe AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.
Related Articles
Python OOP Complete Guide 2026 — Object-Oriented Programming Mastery
Master Python object-oriented programming from basics to advanced. Classes, inheritance, polymorphism, SOLID principles, dataclasses — everything you need to write professional Python.
Python Error Handling & Debugging 2026 — Write Bulletproof Code
Master Python error handling and debugging techniques. Learn try/except, custom exceptions, logging, pdb, and professional debugging strategies to write robust Python code.
Python Decorators and Generators — Advanced Python Made Simple 2026
Master Python decorators and generators — two of Python's most powerful features. Clear explanations, real-world examples, and practical patterns you'll actually use.
Python for Machine Learning 2026 — Your First ML Project with scikit-learn
Start your machine learning journey with Python and scikit-learn. Build real ML models, understand the ML workflow, and go from raw data to predictions — complete beginner guide.