22 minLesson 20 of 34
Advanced Python
Async Python with asyncio
Async Python with asyncio: Handling Concurrent Operations
Regular Python code is synchronous — each line waits for the previous one to finish. That's fine for CPU work, but it's wasteful when waiting for network responses or file reads. While you're waiting for one API call, you could be making 50 more. asyncio makes this possible.
The Problem: Synchronous I/O Blocking
import time
import requests
# Synchronous: each request waits for the previous to complete
def fetch_all_sync(urls):
results = []
for url in urls:
response = requests.get(url) # BLOCKS for 200ms each
results.append(response.json())
return results
urls = ["https://api.example.com/user/1",
"https://api.example.com/user/2",
"https://api.example.com/user/3"]
start = time.time()
results = fetch_all_sync(urls) # Takes ~600ms (3 × 200ms)
print(f"Took: {time.time() - start:.2f}s")
# Asynchronous: all requests run concurrently
# Total time ≈ 200ms (all waiting in parallel)
Core asyncio Concepts
import asyncio
# 'async def' creates a coroutine — a function that can be paused
async def greet(name, delay):
print(f"Hello, {name}!")
await asyncio.sleep(delay) # 'await' pauses this coroutine
print(f"Goodbye, {name}!") # Resumes when sleep completes
# Run a single coroutine
asyncio.run(greet("Alice", 1))
# Run multiple coroutines concurrently
async def main():
# asyncio.gather runs all coroutines concurrently
await asyncio.gather(
greet("Alice", 2),
greet("Bob", 1),
greet("Carol", 1.5)
)
# Alice starts → Bob starts → Carol starts
# Bob finishes after 1s → Carol after 1.5s → Alice after 2s
# Total: ~2s instead of 4.5s
asyncio.run(main())
Async HTTP Requests: The Real Use Case
import asyncio
import aiohttp # pip install aiohttp
import time
async def fetch(session, url):
"""Fetch a URL asynchronously."""
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls):
"""Fetch multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Compare sync vs async
urls = [f"https://httpbin.org/delay/0.5" for _ in range(10)]
# Async version
start = time.time()
results = asyncio.run(fetch_all(urls))
print(f"Async: {time.time() - start:.2f}s") # ~0.5s
# Sync version would take: ~5s (10 × 0.5s)
Error Handling in Async Code
import asyncio
import aiohttp
async def safe_fetch(session, url, retries=3):
"""Fetch with error handling and retries."""
for attempt in range(retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status() # Raises for 4xx/5xx status codes
return await resp.json()
except aiohttp.ClientConnectionError as e:
if attempt == retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
except aiohttp.ClientResponseError as e:
if e.status == 404:
return None # Not found — not worth retrying
raise
async def fetch_with_timeout(urls, timeout_seconds=30):
async with aiohttp.ClientSession() as session:
try:
tasks = [safe_fetch(session, url) for url in urls]
# gather with return_exceptions=True: errors become values, not exceptions
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
print(f"Total timeout after {timeout_seconds}s")
return []
# Separate successes from failures
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
if failures:
print(f"Failed: {len(failures)} requests")
return successes
Creating and Managing Tasks
async def background_worker(name, interval):
"""A task that runs in the background."""
while True:
print(f"Worker {name} running at {asyncio.get_event_loop().time():.1f}")
await asyncio.sleep(interval)
async def main():
# Create tasks — they run immediately in background
task1 = asyncio.create_task(background_worker("A", 1.0))
task2 = asyncio.create_task(background_worker("B", 2.0))
# Do other work while tasks run
await asyncio.sleep(5)
# Cancel tasks when done
task1.cancel()
task2.cancel()
# Wait for cancellation to complete
await asyncio.gather(task1, task2, return_exceptions=True)
print("All tasks cancelled")
asyncio.run(main())
Async Context Managers and Iterators
# Async context manager
class AsyncDatabaseConnection:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(0.1) # Simulate connection delay
self.connection = {"status": "connected"}
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection")
self.connection = None
return False
async def query(self, sql):
await asyncio.sleep(0.05) # Simulate query
return [{"id": 1, "name": "Alice"}]
async def database_operation():
async with AsyncDatabaseConnection() as db:
results = await db.query("SELECT * FROM users")
print(f"Got {len(results)} users")
# Async generator / async iterator
async def async_range(start, stop, delay=0):
"""Async version of range with optional delay."""
for i in range(start, stop):
await asyncio.sleep(delay)
yield i
async def use_async_range():
async for value in async_range(0, 5, delay=0.1):
print(value) # Prints 0,1,2,3,4 with 100ms between each
# Async list comprehension
results = [value async for value in async_range(0, 10)]
print(results)
Queues: Async Producer-Consumer Pattern
async def producer(queue, items):
"""Add items to the queue."""
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.1)
# Signal consumers to stop
await queue.put(None)
async def consumer(queue, name):
"""Process items from the queue."""
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Pass signal to other consumers
break
# Process item
await asyncio.sleep(0.3) # Simulate work
print(f"Consumer {name} processed: {item}")
queue.task_done()
async def run_pipeline():
queue = asyncio.Queue(maxsize=5)
items = list(range(20))
# Start multiple consumers
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Produce items
await producer(queue, items)
# Wait for all consumers to finish
await asyncio.gather(*consumers)
asyncio.run(run_pipeline())
asyncio with File I/O: aiofiles
import aiofiles # pip install aiofiles
async def process_log_files(filepaths):
"""Process multiple log files concurrently."""
async def read_file(filepath):
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
return content
# Read all files concurrently
tasks = [read_file(fp) for fp in filepaths]
contents = await asyncio.gather(*tasks)
return contents
async def write_results(filepath, data):
async with aiofiles.open(filepath, 'w') as f:
await f.write(data)
When to Use asyncio
USE asyncio when:
✓ Making multiple HTTP/API requests
✓ Handling many WebSocket connections
✓ Building web servers (FastAPI uses asyncio)
✓ Database queries that can run concurrently
✓ I/O-bound tasks that spend time waiting
DON'T USE asyncio when:
✗ CPU-intensive computation (use multiprocessing instead)
✗ Simple scripts that make 1-2 requests
✗ Code that uses sync-only libraries with no async alternative
✗ You need to mix heavily with sync code (use threading instead)
asyncio is fundamental to modern Python web development — every major async web framework (FastAPI, Starlette, aiohttp server) is built on it.
Next lesson: HTTP Requests with the requests Library — calling APIs and web services.
📱
Get Notes Free →Get this course's notes on Telegram!
Free cheat sheets, summaries & practice exercises