Follow AiTechWorlds on LinkedIn for professional AI content!Follow Now →
22 minLesson 20 of 34
Advanced Python

Async Python with asyncio

Async Python with asyncio: Handling Concurrent Operations

Regular Python code is synchronous — each line waits for the previous one to finish. That's fine for CPU work, but it's wasteful when waiting for network responses or file reads. While you're waiting for one API call, you could be making 50 more. asyncio makes this possible.

The Problem: Synchronous I/O Blocking

import time
import requests

# Synchronous: each request waits for the previous to complete
def fetch_all_sync(urls):
    results = []
    for url in urls:
        response = requests.get(url)  # BLOCKS for 200ms each
        results.append(response.json())
    return results

urls = ["https://api.example.com/user/1", 
        "https://api.example.com/user/2",
        "https://api.example.com/user/3"]

start = time.time()
results = fetch_all_sync(urls)   # Takes ~600ms (3 × 200ms)
print(f"Took: {time.time() - start:.2f}s")

# Asynchronous: all requests run concurrently
# Total time ≈ 200ms (all waiting in parallel)

Core asyncio Concepts

import asyncio

# 'async def' creates a coroutine — a function that can be paused
async def greet(name, delay):
    print(f"Hello, {name}!")
    await asyncio.sleep(delay)   # 'await' pauses this coroutine
    print(f"Goodbye, {name}!")   # Resumes when sleep completes

# Run a single coroutine
asyncio.run(greet("Alice", 1))

# Run multiple coroutines concurrently
async def main():
    # asyncio.gather runs all coroutines concurrently
    await asyncio.gather(
        greet("Alice", 2),
        greet("Bob", 1),
        greet("Carol", 1.5)
    )
    # Alice starts → Bob starts → Carol starts
    # Bob finishes after 1s → Carol after 1.5s → Alice after 2s
    # Total: ~2s instead of 4.5s

asyncio.run(main())

Async HTTP Requests: The Real Use Case

import asyncio
import aiohttp  # pip install aiohttp
import time

async def fetch(session, url):
    """Fetch a URL asynchronously."""
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls):
    """Fetch multiple URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# Compare sync vs async
urls = [f"https://httpbin.org/delay/0.5" for _ in range(10)]

# Async version
start = time.time()
results = asyncio.run(fetch_all(urls))
print(f"Async: {time.time() - start:.2f}s")   # ~0.5s

# Sync version would take: ~5s (10 × 0.5s)

Error Handling in Async Code

import asyncio
import aiohttp

async def safe_fetch(session, url, retries=3):
    """Fetch with error handling and retries."""
    for attempt in range(retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                resp.raise_for_status()  # Raises for 4xx/5xx status codes
                return await resp.json()
        
        except aiohttp.ClientConnectionError as e:
            if attempt == retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
        
        except aiohttp.ClientResponseError as e:
            if e.status == 404:
                return None  # Not found — not worth retrying
            raise

async def fetch_with_timeout(urls, timeout_seconds=30):
    async with aiohttp.ClientSession() as session:
        try:
            tasks = [safe_fetch(session, url) for url in urls]
            # gather with return_exceptions=True: errors become values, not exceptions
            results = await asyncio.wait_for(
                asyncio.gather(*tasks, return_exceptions=True),
                timeout=timeout_seconds
            )
        except asyncio.TimeoutError:
            print(f"Total timeout after {timeout_seconds}s")
            return []
    
    # Separate successes from failures
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]
    
    if failures:
        print(f"Failed: {len(failures)} requests")
    
    return successes

Creating and Managing Tasks

async def background_worker(name, interval):
    """A task that runs in the background."""
    while True:
        print(f"Worker {name} running at {asyncio.get_event_loop().time():.1f}")
        await asyncio.sleep(interval)

async def main():
    # Create tasks — they run immediately in background
    task1 = asyncio.create_task(background_worker("A", 1.0))
    task2 = asyncio.create_task(background_worker("B", 2.0))
    
    # Do other work while tasks run
    await asyncio.sleep(5)
    
    # Cancel tasks when done
    task1.cancel()
    task2.cancel()
    
    # Wait for cancellation to complete
    await asyncio.gather(task1, task2, return_exceptions=True)
    print("All tasks cancelled")

asyncio.run(main())

Async Context Managers and Iterators

# Async context manager
class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.1)  # Simulate connection delay
        self.connection = {"status": "connected"}
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection")
        self.connection = None
        return False

    async def query(self, sql):
        await asyncio.sleep(0.05)  # Simulate query
        return [{"id": 1, "name": "Alice"}]

async def database_operation():
    async with AsyncDatabaseConnection() as db:
        results = await db.query("SELECT * FROM users")
        print(f"Got {len(results)} users")

# Async generator / async iterator
async def async_range(start, stop, delay=0):
    """Async version of range with optional delay."""
    for i in range(start, stop):
        await asyncio.sleep(delay)
        yield i

async def use_async_range():
    async for value in async_range(0, 5, delay=0.1):
        print(value)  # Prints 0,1,2,3,4 with 100ms between each
    
    # Async list comprehension
    results = [value async for value in async_range(0, 10)]
    print(results)

Queues: Async Producer-Consumer Pattern

async def producer(queue, items):
    """Add items to the queue."""
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.1)
    
    # Signal consumers to stop
    await queue.put(None)

async def consumer(queue, name):
    """Process items from the queue."""
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # Pass signal to other consumers
            break
        
        # Process item
        await asyncio.sleep(0.3)  # Simulate work
        print(f"Consumer {name} processed: {item}")
        queue.task_done()

async def run_pipeline():
    queue = asyncio.Queue(maxsize=5)
    items = list(range(20))
    
    # Start multiple consumers
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
    
    # Produce items
    await producer(queue, items)
    
    # Wait for all consumers to finish
    await asyncio.gather(*consumers)

asyncio.run(run_pipeline())

asyncio with File I/O: aiofiles

import aiofiles  # pip install aiofiles

async def process_log_files(filepaths):
    """Process multiple log files concurrently."""
    
    async def read_file(filepath):
        async with aiofiles.open(filepath, 'r') as f:
            content = await f.read()
        return content
    
    # Read all files concurrently
    tasks = [read_file(fp) for fp in filepaths]
    contents = await asyncio.gather(*tasks)
    
    return contents

async def write_results(filepath, data):
    async with aiofiles.open(filepath, 'w') as f:
        await f.write(data)

When to Use asyncio

USE asyncio when:
  ✓ Making multiple HTTP/API requests
  ✓ Handling many WebSocket connections
  ✓ Building web servers (FastAPI uses asyncio)
  ✓ Database queries that can run concurrently
  ✓ I/O-bound tasks that spend time waiting

DON'T USE asyncio when:
  ✗ CPU-intensive computation (use multiprocessing instead)
  ✗ Simple scripts that make 1-2 requests
  ✗ Code that uses sync-only libraries with no async alternative
  ✗ You need to mix heavily with sync code (use threading instead)

asyncio is fundamental to modern Python web development — every major async web framework (FastAPI, Starlette, aiohttp server) is built on it.

Next lesson: HTTP Requests with the requests Library — calling APIs and web services.

📱

Get this course's notes on Telegram!

Free cheat sheets, summaries & practice exercises

Get Notes Free →
!