Follow AiTechWorlds on LinkedIn for professional AI content!Follow Now →

Python Async Programming Guide 2026 — asyncio, aiohttp & Concurrency

Master async programming in Python with asyncio. Learn concurrent programming, aiohttp for async HTTP, async database operations, and build high-performance Python applications.

A
AiTechWorlds Team
May 12, 2026 7 min readUpdated May 15, 2026
📱

Get more content like this on Telegram!

Daily AI tips, notes & resources — free

Join Free →

Python Async Programming Guide 2026 — Concurrent Code That Actually Works

Your program makes 10 API requests. Each takes 500ms. Synchronous code: 5 seconds. Async code: 500ms.

That is the promise of async programming — and it delivers. When your code spends most of its time waiting (waiting for a network response, a database query, a file to load), it does not need to block the entire program. Other work can happen during that wait.

This guide teaches you asyncio from the ground up — what async actually does, when to use it, and how to write concurrent Python that is clear and correct.


The Problem: Blocking I/O

import requests
import time

def fetch_prices(symbols: list[str]) -> dict:
    """Fetch stock prices — synchronous, blocking."""
    prices = {}
    for symbol in symbols:
        response = requests.get(f"https://api.example.com/price/{symbol}")
        prices[symbol] = response.json()["price"]
    return prices

start = time.perf_counter()
prices = fetch_prices(["AAPL", "GOOG", "MSFT", "AMZN", "META"])
print(f"Time: {time.perf_counter() - start:.2f}s")
# Time: 2.50s (5 requests × 500ms each)

While waiting for each request, the program does nothing — the CPU sits idle. Async fixes this.


The Core Concepts

Coroutines

An async def function is a coroutine — a function that can be paused and resumed.

import asyncio

async def greet(name: str) -> str:
    await asyncio.sleep(1)  # Simulate I/O wait (non-blocking)
    return f"Hello, {name}!"

# You MUST await a coroutine to run it
async def main():
    result = await greet("Alice")
    print(result)

asyncio.run(main())

await tells Python: "pause this coroutine here, let other coroutines run, resume when the awaited thing finishes."

The Event Loop

asyncio runs a single-threaded event loop that:

  1. Starts a coroutine
  2. Runs it until it hits an await
  3. Switches to another coroutine during the wait
  4. Comes back when the awaited operation completes
import asyncio

async def task(name: str, delay: float) -> str:
    print(f"{name}: starting")
    await asyncio.sleep(delay)  # Non-blocking wait
    print(f"{name}: done after {delay}s")
    return name

async def main():
    start = asyncio.get_event_loop().time()
    
    # Sequential — takes 3 seconds total
    await task("Task A", 1)
    await task("Task B", 1)
    await task("Task C", 1)
    
    print(f"Sequential time: {asyncio.get_event_loop().time() - start:.2f}s")

asyncio.run(main())
# Sequential time: 3.00s

Running Tasks Concurrently

asyncio.gather — Run Multiple Coroutines Together

import asyncio
import time

async def task(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} completed"

async def main():
    start = time.perf_counter()
    
    # All three run concurrently
    results = await asyncio.gather(
        task("Task A", 1.0),
        task("Task B", 1.5),
        task("Task C", 0.8),
    )
    
    elapsed = time.perf_counter() - start
    print(f"Results: {results}")
    print(f"Total time: {elapsed:.2f}s")  # ~1.5s instead of 3.3s

asyncio.run(main())
# Results: ['Task A completed', 'Task B completed', 'Task C completed']
# Total time: 1.50s

asyncio.create_task — Background Tasks

async def main():
    # Create tasks immediately (start running)
    task1 = asyncio.create_task(task("A", 2.0))
    task2 = asyncio.create_task(task("B", 1.0))
    task3 = asyncio.create_task(task("C", 1.5))
    
    # Do other work while tasks run in background
    print("Tasks started, doing other work...")
    await asyncio.sleep(0.5)
    print("Other work done, waiting for tasks...")
    
    # Wait for all tasks
    results = await asyncio.gather(task1, task2, task3)
    return results

Async HTTP Requests with aiohttp

pip install aiohttp
import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """Fetch a single URL and return the JSON response."""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            response.raise_for_status()
            data = await response.json()
            return {"url": url, "status": response.status, "data": data}
    except aiohttp.ClientError as e:
        return {"url": url, "error": str(e)}

async def fetch_all(urls: list[str]) -> list[dict]:
    """Fetch multiple URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Real-world example: Check status of multiple APIs
async def health_check():
    endpoints = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/users/1",
        "https://jsonplaceholder.typicode.com/todos/1",
        "https://jsonplaceholder.typicode.com/albums/1",
        "https://jsonplaceholder.typicode.com/photos/1",
    ]
    
    start = time.perf_counter()
    results = await fetch_all(endpoints)
    elapsed = time.perf_counter() - start
    
    for result in results:
        status = result.get("status", "ERROR")
        print(f"  {result['url'][-20:]}: {status}")
    
    print(f"\nChecked {len(endpoints)} endpoints in {elapsed:.2f}s")

asyncio.run(health_check())

Rate Limiting Async Requests

When fetching hundreds of URLs, you need to control concurrency to avoid overwhelming servers:

import asyncio
import aiohttp
from asyncio import Semaphore

async def fetch_with_limit(
    session: aiohttp.ClientSession,
    url: str,
    semaphore: Semaphore
) -> dict:
    async with semaphore:  # Only N requests at a time
        async with session.get(url) as response:
            return {"url": url, "data": await response.json()}

async def fetch_batch(urls: list[str], concurrency: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(concurrency)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
        return await asyncio.gather(*tasks)

# Now you can safely fetch 1000 URLs with max 10 concurrent requests

Async Database Operations

pip install asyncpg  # For PostgreSQL
pip install aiosqlite  # For SQLite
import asyncio
import aiosqlite

async def setup_db(db_path: str) -> None:
    async with aiosqlite.connect(db_path) as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL
            )
        """)
        await db.commit()

async def insert_user(db_path: str, name: str, email: str) -> int:
    async with aiosqlite.connect(db_path) as db:
        cursor = await db.execute(
            "INSERT INTO users (name, email) VALUES (?, ?)", (name, email)
        )
        await db.commit()
        return cursor.lastrowid

async def get_users(db_path: str) -> list[dict]:
    async with aiosqlite.connect(db_path) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute("SELECT * FROM users") as cursor:
            rows = await cursor.fetchall()
            return [dict(row) for row in rows]

async def main():
    db = "users.db"
    await setup_db(db)
    
    # Insert users concurrently
    await asyncio.gather(
        insert_user(db, "Alice", "alice@example.com"),
        insert_user(db, "Bob", "bob@example.com"),
        insert_user(db, "Charlie", "charlie@example.com"),
    )
    
    users = await get_users(db)
    for user in users:
        print(f"User: {user['name']} ({user['email']})")

asyncio.run(main())

Async Context Managers and Iterators

from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource(name: str):
    print(f"Acquiring {name}...")
    await asyncio.sleep(0.1)  # Simulate async acquisition
    try:
        yield name
    finally:
        print(f"Releasing {name}...")
        await asyncio.sleep(0.1)  # Simulate async cleanup

async def main():
    async with managed_resource("database") as resource:
        print(f"Using {resource}")

# Async generators
async def async_range(stop: int, delay: float = 0):
    for i in range(stop):
        await asyncio.sleep(delay)
        yield i

async def main():
    async for value in async_range(5, delay=0.1):
        print(f"Got: {value}")

Async vs Threading vs Multiprocessing

ApproachBest ForThreads UsedGIL Affected
asyncioI/O-bound (network, DB, files)1No (never releases)
threadingI/O-bound + blocking libsMultipleYes (limits CPU work)
multiprocessingCPU-bound (math, ML, image)Multiple processesNo (each has own GIL)
# CPU-bound work → use multiprocessing
from concurrent.futures import ProcessPoolExecutor
import asyncio

def cpu_heavy_work(n: int) -> int:
    return sum(i * i for i in range(n))  # Pure computation

async def run_cpu_tasks():
    loop = asyncio.get_event_loop()
    
    with ProcessPoolExecutor() as executor:
        # Run CPU tasks in separate processes, await the results
        results = await asyncio.gather(
            loop.run_in_executor(executor, cpu_heavy_work, 1_000_000),
            loop.run_in_executor(executor, cpu_heavy_work, 2_000_000),
            loop.run_in_executor(executor, cpu_heavy_work, 3_000_000),
        )
    
    return results

Putting It All Together — Async Web Scraper

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import json

async def scrape_page(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url, headers={"User-Agent": "Mozilla/5.0"}) as response:
        html = await response.text()
    
    soup = BeautifulSoup(html, "lxml")
    return {
        "url": url,
        "title": soup.find("h1").text.strip() if soup.find("h1") else "",
        "links": [a.get("href") for a in soup.find_all("a", href=True)][:10],
    }

async def scrape_all(urls: list[str], concurrency: int = 5) -> list[dict]:
    semaphore = asyncio.Semaphore(concurrency)
    
    async def bounded_scrape(url: str) -> dict:
        async with semaphore:
            return await scrape_page(session, url)
    
    async with aiohttp.ClientSession() as session:
        tasks = [bounded_scrape(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if isinstance(r, dict)]

urls = ["https://example.com", "https://httpbin.org/html"]
results = asyncio.run(scrape_all(urls))
print(json.dumps(results, indent=2))

Async programming integrates naturally with FastAPI — our FastAPI tutorial shows async endpoints throughout. For the decorators pattern used in this guide, see the decorators and generators guide.

The mental model that unlocks async Python: your program is not faster because it has more processing power. It is faster because it stops wasting time waiting. When one task waits, another runs. That is all async is.

Async Python examples and performance benchmarks available in the AiTechWorlds Telegram channel!

Share this article:

Frequently Asked Questions

Use async when your program spends time waiting — waiting for network responses, database queries, file I/O. If you're waiting for 10 API calls, async can do them all concurrently instead of sequentially.
A

AiTechWorlds Team

✓ Verified Writer

The AiTechWorlds team is passionate about AI, technology, and education. We create high-quality, research-backed content to help you learn, grow, and succeed in the modern digital world.

Related Articles

10K+ Members Growing Daily

Get Free AI Notes Daily

Join AiTechWorlds on Telegram and get daily AI tips, prompt engineering templates, coding resources, and exclusive content — 100% free!

📚 Free Study Notes🤖 AI Tips Daily⚡ Prompt Templates💻 Coding Resources
Join Free Channel

No spam. Leave anytime.

!