Table of contents
- The Full asyncio Playbook
- Why Async? The Problem It Solves
- Core Concepts
- Tasks & Scheduling
- Timeouts & Error Handling
- Async Primitives
- Async Iterators & Context Managers
- Working with Real Libraries
- Common Patterns & Best Practices
- asyncio vs Threading vs Multiprocessing
- Real-World Project: Async API Client
- Summary
The Full asyncio Playbook: Coroutines, Tasks, and Real-World Patterns
Python's asyncio library is one of those tools that feels confusing at first and then clicks all at once. Once it clicks, you start seeing the potential everywhere — web scrapers that run 50× faster, API clients that handle hundreds of requests without spinning up a single extra thread, and servers that juggle thousands of connections on a single core.
This article is a complete, practical playbook. We'll start from first principles — why async exists at all — then build up through coroutines, tasks, synchronization primitives, real library integrations, and finally tie it all together in a real-world project. No hand-waving, no artificial toy examples. Just patterns you can actually use.
Why Async? The Problem It Solves
Most programs spend the majority of their time waiting — waiting for a database query to return, for a network response to arrive, for a file to be read off disk. During that waiting time, the CPU is idle. Traditional synchronous code wastes that idle time completely.
Concurrency vs Parallelism vs Asynchrony
These three terms often get conflated, so let's pin them down:
- Parallelism — multiple computations happening at the exact same instant on multiple CPU cores. True simultaneous execution.
- Concurrency — multiple tasks in progress at the same time, but not necessarily executing simultaneously. One task pauses while another runs.
- Asynchrony — a programming model where a task can pause itself voluntarily (usually while waiting for I/O) and let other tasks run in the meantime.
asyncio gives you concurrency and asynchrony. It does not give you parallelism.
The GIL and Why Threads Aren't Always the Answer
Python's Global Interpreter Lock (GIL) ensures only one thread executes Python bytecode at a time. For CPU-bound work, threads therefore buy you nothing — you're just paying thread scheduling overhead with no parallelism gain. For I/O-bound work, threads do help because the GIL is released during blocking I/O syscalls. But threads carry their own costs: memory per thread, context-switching overhead, and the complexity of shared-state bugs.
asyncio sidesteps all of this. A single thread, a single event loop, cooperative multitasking. The memory footprint of a coroutine is tiny compared to a thread, and there's no context switching — only explicit yield points you control.
I/O-Bound vs CPU-Bound — When asyncio Shines
| Scenario | Best Tool |
|---|---|
| Fetching 500 URLs | asyncio + aiohttp |
| Image resizing | multiprocessing |
| Querying 100 DB rows concurrently | asyncio + asyncpg |
| Training an ML model | multiprocessing / GPU |
| Scraping with rate limits | asyncio + Semaphore |
Brief History: Callbacks → Generators → async/await
Python's async story evolved in stages:
- Callbacks (Twisted, ~2002) — functions registered to fire when I/O completes. Worked but led to deeply nested "callback hell."
- Generator-based coroutines (PEP 342, Python 2.5) —
yieldallowed pausing a function, enabling cooperative multitasking.asyncioin Python 3.4 built on this with@asyncio.coroutineandyield from. async/await(PEP 492, Python 3.5) — native syntax for coroutines. Theyield fromplumbing became invisible. This is the model we use today.
Core Concepts
Event Loop
The event loop is the scheduler at the heart of asyncio. It maintains a queue of tasks ready to run and a set of I/O callbacks waiting to fire. Its job is simple in concept:
- Pick a task from the ready queue and run it until it yields.
- Check if any I/O events (socket readable, timer expired) have fired.
- Wake up any tasks waiting on those events and add them to the ready queue.
- Repeat.
Because tasks voluntarily yield control (via await), there's no preemption and no need for locks around most shared state. The loop never interrupts a task mid-execution.
flowchart TD
A([Start]) --> B[Pick ready task from queue]
B --> C[Run task until it awaits]
C --> D{Any I/O events fired?}
D -- Yes --> E[Wake up waiting tasks]
E --> B
D -- No --> F{Queue empty?}
F -- No --> B
F -- Yes --> G([Done])
import asyncio
async def main():
print("Event loop is running")
await asyncio.sleep(0) # yield control back to the loop
print("Resumed after yield")
asyncio.run(main())
You can introspect the running loop at any time inside a coroutine:
async def inspect():
loop = asyncio.get_running_loop()
print(loop.is_running()) # True
Coroutines
A coroutine is a function defined with async def. Calling it does not execute it — it returns a coroutine object, which is a suspended computation waiting to be driven.
import asyncio
async def greet(name: str) -> str:
await asyncio.sleep(0.1) # simulate async I/O
return f"Hello, {name}!"
# This just creates the coroutine object — nothing runs yet:
coro = greet("Alice")
# To actually run it:
result = asyncio.run(greet("Alice"))
print(result) # Hello, Alice!
The await keyword suspends the current coroutine and hands control back to the event loop until the awaited thing completes.
Coroutine vs Task vs Future
These three things are related but distinct:
- Coroutine — a function defined with
async def. It's a recipe for work, not work itself. Awaiting it runs the recipe inline. - Future — a low-level object representing a value that doesn't exist yet. When the value is eventually set, any code awaiting the future is resumed. You rarely create these directly.
- Task — a
Futuresubclass that wraps a coroutine and schedules it on the event loop. Creating a task starts the coroutine running concurrently with whatever else is running.
import asyncio
async def compute(n: int) -> int:
await asyncio.sleep(n * 0.1)
return n * n
async def main():
# Coroutine — runs inline, blocks until done
result_inline = await compute(2)
# Task — scheduled concurrently, runs alongside this coroutine
task = asyncio.create_task(compute(3))
result_task = await task # wait for it to finish
print(result_inline, result_task) # 4, 9
asyncio.run(main())
asyncio.run()
asyncio.run() is the standard entry point introduced in Python 3.7. It creates a new event loop, runs the given coroutine to completion, cancels any remaining tasks, runs finalizers, and then closes the loop cleanly.
import asyncio
async def main():
print("Starting")
await asyncio.sleep(1)
print("Done")
asyncio.run(main())
Use asyncio.run() at the top level only — never call it from inside a running event loop. Within an already-running loop, just await your coroutines directly.
Tasks & Scheduling
create_task() — Running Coroutines Concurrently
asyncio.create_task() wraps a coroutine in a Task and schedules it to run on the event loop. Unlike await coro, which runs the coroutine inline and blocks until it's done, create_task() returns immediately and lets the coroutine run in the background.
import asyncio
import time
async def fetch(url: str) -> str:
print(f" → Fetching {url}")
await asyncio.sleep(1) # simulate network delay
return f"Response from {url}"
async def main():
start = time.perf_counter()
# Sequential — takes ~3 seconds
# r1 = await fetch("https://api.example.com/users")
# r2 = await fetch("https://api.example.com/posts")
# r3 = await fetch("https://api.example.com/comments")
# Concurrent — takes ~1 second
t1 = asyncio.create_task(fetch("https://api.example.com/users"))
t2 = asyncio.create_task(fetch("https://api.example.com/posts"))
t3 = asyncio.create_task(fetch("https://api.example.com/comments"))
r1, r2, r3 = await t1, await t2, await t3
print(f"Completed in {time.perf_counter() - start:.2f}s")
asyncio.run(main())
gather() — Running Multiple Tasks at Once
asyncio.gather() is the go-to tool when you have a collection of coroutines to run concurrently and want all their results back in order.
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await asyncio.gather(
*[fetch_user(uid) for uid in user_ids]
)
for user in users:
print(user)
asyncio.run(main())
By default, if any coroutine raises an exception, gather() propagates it immediately. Pass return_exceptions=True to collect exceptions as return values instead of stopping everything:
results = await asyncio.gather(
fetch_user(1),
fetch_user(0), # raises ValueError
fetch_user(3),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(r)
wait() — Fine-Grained Control
asyncio.wait() gives you more control than gather(). It returns two sets — done and pending — and lets you react as tasks complete rather than waiting for all of them.
import asyncio
async def job(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} finished"
async def main():
tasks = {
asyncio.create_task(job("A", 1.0)),
asyncio.create_task(job("B", 0.3)),
asyncio.create_task(job("C", 1.5)),
}
# React to the first task that finishes
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in done:
print("First done:", t.result())
# Cancel whatever's left
for t in pending:
t.cancel()
asyncio.run(main())
Return modes: ALL_COMPLETED (default), FIRST_COMPLETED, FIRST_EXCEPTION.
Task Cancellation and CancelledError
Tasks can be cancelled at any await point. When cancelled, asyncio.CancelledError is injected at the next await inside the task.
import asyncio
async def long_running():
try:
print("Starting long task...")
await asyncio.sleep(10)
print("Finished long task!")
except asyncio.CancelledError:
print("Task was cancelled — running cleanup")
raise # always re-raise CancelledError
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(0.5) # let it start
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Main: confirmed task cancelled")
asyncio.run(main())
Always re-raise CancelledError after cleanup — swallowing it silently prevents proper shutdown propagation.
Timeouts & Error Handling
wait_for() — Setting Timeouts
asyncio.wait_for() wraps a coroutine with a deadline. If it doesn't complete in time, the coroutine is cancelled and asyncio.TimeoutError is raised.
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(5)
return "finally done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out after 2 seconds")
asyncio.run(main())
Python 3.11+ introduced asyncio.timeout() as a context manager, which is cleaner when you want to apply a deadline to a block of code:
async def main():
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
print(result)
except asyncio.TimeoutError:
print("Block timed out")
Handling Exceptions in Tasks and gather()
Exceptions raised inside tasks don't propagate automatically — they're stored on the task object and surface only when you await the task.
import asyncio
async def risky(n: int) -> int:
await asyncio.sleep(0.1)
if n == 0:
raise ValueError("Cannot process zero")
return 100 // n
async def main():
task = asyncio.create_task(risky(0))
await asyncio.sleep(0.2)
try:
result = await task
except ValueError as e:
print(f"Caught from task: {e}")
asyncio.run(main())
With gather() and return_exceptions=True, exceptions are collected as ordinary return values so sibling tasks are not cancelled:
results = await asyncio.gather(
risky(5),
risky(0),
risky(2),
return_exceptions=True
)
# [20, ValueError('Cannot process zero'), 50]
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"OK: {r}")
shield() — Protecting Tasks from Cancellation
asyncio.shield() lets a task continue running even if the coroutine awaiting it is cancelled. Useful for critical operations like committing a database transaction that must complete regardless.
import asyncio
async def commit_transaction():
print("Committing transaction...")
await asyncio.sleep(1)
print("Transaction committed.")
async def main():
task = asyncio.create_task(commit_transaction())
try:
# Timeout cancels the outer wait, but the inner task keeps going
await asyncio.wait_for(asyncio.shield(task), timeout=0.3)
except asyncio.TimeoutError:
print("Timed out waiting — but commit is still running!")
await task # wait for it to actually finish
asyncio.run(main())
Async Primitives (Sync Between Tasks)
asyncio.Lock
An asyncio.Lock prevents concurrent access to a shared resource. Unlike a threading lock, acquiring it never blocks the event loop — it simply suspends the awaiting coroutine until the lock is available.
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment(name: str):
global counter
async with lock:
current = counter
await asyncio.sleep(0) # simulate work between read and write
counter = current + 1
print(f"{name}: counter is now {counter}")
async def main():
await asyncio.gather(*[increment(f"task-{i}") for i in range(5)])
print(f"Final counter: {counter}") # always 5
asyncio.run(main())
asyncio.Event
An asyncio.Event is a one-shot signal. It starts unset; any number of coroutines can await event.wait() indefinitely until another coroutine calls event.set().
import asyncio
ready = asyncio.Event()
async def worker(name: str):
print(f"{name}: waiting for start signal...")
await ready.wait()
print(f"{name}: got signal, starting work")
async def controller():
await asyncio.sleep(1)
print("Controller: signalling all workers")
ready.set()
async def main():
await asyncio.gather(
worker("W1"),
worker("W2"),
worker("W3"),
controller(),
)
asyncio.run(main())
asyncio.Queue — Producer/Consumer Pattern
asyncio.Queue is the standard way to implement producer/consumer pipelines. It coordinates work between tasks without explicit locking.
import asyncio
async def producer(queue: asyncio.Queue, items: list[str]):
for item in items:
await asyncio.sleep(0.2)
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # sentinel to signal done
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.3) # simulate processing
print(f"{name} consumed: {item}")
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=3)
items = ["job-1", "job-2", "job-3", "job-4", "job-5"]
await asyncio.gather(
producer(queue, items),
consumer(queue, "Consumer-A"),
)
asyncio.run(main())
asyncio.Semaphore — Limiting Concurrency
A Semaphore limits how many coroutines can run a section simultaneously. It's the right tool for rate-limiting outbound requests so you don't overwhelm an API or database.
import asyncio
sem = asyncio.Semaphore(3) # at most 3 concurrent fetches
async def fetch(session_id: int) -> str:
async with sem:
print(f" Session {session_id}: fetching")
await asyncio.sleep(0.5)
return f"data-{session_id}"
async def main():
results = await asyncio.gather(*[fetch(i) for i in range(10)])
print(f"Total results: {len(results)}")
asyncio.run(main())
Async Iterators & Context Managers
async for — Async Iteration
async for works with objects implementing __aiter__() and __anext__(). Each call to __anext__() is awaitable, so the event loop can process other work between iterations — ideal for streaming data sources.
import asyncio
class AsyncCounter:
def __init__(self, stop: int):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # simulate async data source
value = self.current
self.current += 1
return value
async def main():
async for value in AsyncCounter(5):
print(value)
asyncio.run(main())
async with — Async Context Managers
Async context managers implement __aenter__() and __aexit__(). They're used for resources that require async setup or teardown — database connections, HTTP sessions, file handles.
import asyncio
class AsyncDBConnection:
async def __aenter__(self):
print("Opening DB connection...")
await asyncio.sleep(0.1) # simulate async connect
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing DB connection...")
await asyncio.sleep(0.05)
async def query(self, sql: str) -> list[dict]:
await asyncio.sleep(0.1)
return [{"result": sql}]
async def main():
async with AsyncDBConnection() as db:
rows = await db.query("SELECT * FROM users")
print(rows)
asyncio.run(main())
Async Generators
Async generators combine async def with yield. They're the most ergonomic way to produce a sequence of values asynchronously.
import asyncio
async def paginated_fetch(total_pages: int):
"""Simulates fetching paginated API results one page at a time."""
for page in range(1, total_pages + 1):
await asyncio.sleep(0.2) # simulate network delay per page
yield {"page": page, "items": [f"item-{page}-{i}" for i in range(3)]}
async def main():
async for page_data in paginated_fetch(4):
print(f"Page {page_data['page']}: {page_data['items']}")
asyncio.run(main())
Working with Real Libraries
HTTP Requests — aiohttp / httpx
For async HTTP, aiohttp and httpx (with AsyncClient) are the two most common choices. Both use async context managers to manage connection pools.
# pip install aiohttp
import asyncio
import aiohttp
async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_json(session, url) for url in urls]
)
for r in results:
print(r["title"])
asyncio.run(main())
With httpx:
# pip install httpx
import asyncio
import httpx
async def main():
async with httpx.AsyncClient() as client:
response = await client.get("https://jsonplaceholder.typicode.com/posts/1")
print(response.json()["title"])
asyncio.run(main())
Database Access — asyncpg / aiosqlite
For PostgreSQL use asyncpg; for SQLite use aiosqlite. Both expose async connection and cursor APIs.
# pip install asyncpg
import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect("postgresql://user:password@localhost/mydb")
try:
rows = await conn.fetch("SELECT id, name FROM users LIMIT 5")
for row in rows:
print(dict(row))
finally:
await conn.close()
asyncio.run(main())
# pip install aiosqlite
import asyncio
import aiosqlite
async def main():
async with aiosqlite.connect("data.db") as db:
await db.execute(
"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)"
)
await db.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
await db.commit()
async with db.execute("SELECT * FROM users") as cursor:
async for row in cursor:
print(row)
asyncio.run(main())
File I/O — aiofiles
Standard file operations (open, read, write) block the event loop. aiofiles provides async wrappers that offload them to a thread pool transparently.
# pip install aiofiles
import asyncio
import aiofiles
async def write_report(path: str, content: str):
async with aiofiles.open(path, "w") as f:
await f.write(content)
async def read_report(path: str) -> str:
async with aiofiles.open(path, "r") as f:
return await f.read()
async def main():
await write_report("/tmp/report.txt", "Async I/O complete.\n")
content = await read_report("/tmp/report.txt")
print(content)
asyncio.run(main())
Running Sync Code in Async Context with run_in_executor
Sometimes you have to call a blocking library that has no async counterpart. loop.run_in_executor() runs the call in a thread pool and returns an awaitable, so the event loop stays unblocked.
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_task(n: int) -> int:
"""A synchronous, blocking function we can't change."""
time.sleep(n)
return n * n
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=4) as pool:
# Run two blocking calls concurrently in the thread pool
results = await asyncio.gather(
loop.run_in_executor(pool, blocking_task, 1),
loop.run_in_executor(pool, blocking_task, 2),
)
print(results) # [1, 4]
asyncio.run(main())
Pass None as the executor to use the default thread pool:
result = await loop.run_in_executor(None, blocking_task, 3)
Common Patterns & Best Practices
Fan-Out / Fan-In Pattern
Fan-out dispatches work to multiple concurrent tasks; fan-in collects and aggregates their results. This is the fundamental pattern behind batch processing in async code.
import asyncio
from dataclasses import dataclass
@dataclass
class ProcessedItem:
id: int
value: str
async def process_item(item_id: int) -> ProcessedItem:
"""Fan-out: each item processed independently."""
await asyncio.sleep(0.1) # simulate async work
return ProcessedItem(id=item_id, value=f"result-{item_id}")
async def aggregate(results: list[ProcessedItem]) -> dict:
"""Fan-in: combine all results."""
return {r.id: r.value for r in results}
async def main():
item_ids = list(range(20))
# Fan-out: launch all concurrently
tasks = [asyncio.create_task(process_item(i)) for i in item_ids]
# Fan-in: collect results
results = await asyncio.gather(*tasks)
summary = await aggregate(results)
print(f"Processed {len(summary)} items")
asyncio.run(main())
Rate Limiting Async Tasks with Semaphore
When hitting an external API, you often need to stay under a requests-per-second limit. Combine a Semaphore with asyncio.sleep() to enforce spacing between requests.
import asyncio
import time
MAX_CONCURRENT = 5
DELAY_BETWEEN = 0.1 # seconds
sem = asyncio.Semaphore(MAX_CONCURRENT)
async def rate_limited_fetch(url: str) -> str:
async with sem:
result = f"data from {url}"
await asyncio.sleep(DELAY_BETWEEN)
return result
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(30)]
start = time.perf_counter()
results = await asyncio.gather(*[rate_limited_fetch(url) for url in urls])
elapsed = time.perf_counter() - start
print(f"Fetched {len(results)} items in {elapsed:.2f}s")
asyncio.run(main())
Avoiding Blocking the Event Loop
The single biggest mistake in async code is calling a blocking function from a coroutine. This freezes the entire event loop for every other task.
Common blocking culprits:
time.sleep()→ useawait asyncio.sleep()requests.get()→ useaiohttporhttpx.AsyncClientopen()/f.read()→ useaiofiles- Heavy CPU computation in a coroutine → use
run_in_executorwithProcessPoolExecutor
import asyncio
import time
# BAD — blocks the entire event loop
async def bad_delay():
time.sleep(2) # every other task is frozen for 2 seconds
# GOOD — yields control back to the event loop
async def good_delay():
await asyncio.sleep(2) # other tasks run during this wait
A quick way to check if something is blocking: if calling it in a synchronous context would block, it'll block the event loop too.
Debugging Async Code
asyncio has a built-in debug mode that logs slow callbacks, coroutines that were never awaited, and other common mistakes.
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
async def main():
await asyncio.sleep(0.1)
# Enable debug mode
asyncio.run(main(), debug=True)
Or via environment variable — no code change needed:
PYTHONASYNCIODEBUG=1 python my_script.py
Debug mode will warn you about:
- Coroutines created but never awaited
- Callbacks taking longer than 100ms (configurable)
- Tasks destroyed while still pending
asyncio vs Threading vs Multiprocessing
| Model | Best For | GIL Limited? | True Parallelism? | Overhead |
|---|---|---|---|---|
| asyncio | I/O-bound, many connections | No | No | Very low |
| threading | I/O-bound, legacy libs | Yes | No | Medium |
| multiprocessing | CPU-bound tasks | No | Yes | High |
When to use each:
- asyncio — you have many concurrent I/O operations (HTTP calls, DB queries, WebSocket connections). A single-threaded event loop handles thousands of open connections effortlessly.
- threading — you're working with a legacy blocking library that has no async alternative and can't easily be wrapped with
run_in_executor. Also useful for background tasks that need to share state with the main thread. - multiprocessing — the bottleneck is pure computation (image processing, ML inference, number crunching). Only true parallelism bypasses the GIL for CPU work.
Combining asyncio + ProcessPoolExecutor
For the pattern of coordinating many async I/O tasks that occasionally need CPU-intensive processing, combine both:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_heavy(data: list[int]) -> int:
"""Pure computation — runs in a separate process."""
return sum(x ** 2 for x in data)
async def pipeline(items: list[int]) -> int:
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
# Offload CPU work without blocking the event loop
result = await loop.run_in_executor(pool, cpu_heavy, items)
return result
async def main():
data = list(range(100_000))
result = await pipeline(data)
print(f"Sum of squares: {result}")
asyncio.run(main())
Real-World Project: Async API Client
Let's tie everything together. We'll build an async client that fetches posts and their authors from a public API concurrently, rate-limits requests, handles errors gracefully, and streams results through a queue.
import asyncio
import aiohttp
from dataclasses import dataclass
BASE_URL = "https://jsonplaceholder.typicode.com"
MAX_CONCURRENT = 5
@dataclass
class Post:
id: int
title: str
author_name: str
sem = asyncio.Semaphore(MAX_CONCURRENT)
async def get_json(session: aiohttp.ClientSession, url: str) -> dict:
async with sem:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
async def fetch_post_with_author(
session: aiohttp.ClientSession,
post_id: int,
result_queue: asyncio.Queue,
):
try:
post, user = await asyncio.gather(
get_json(session, f"{BASE_URL}/posts/{post_id}"),
get_json(session, f"{BASE_URL}/users/{post_id % 10 + 1}"),
)
await result_queue.put(Post(
id=post["id"],
title=post["title"],
author_name=user["name"],
))
except Exception as e:
await result_queue.put(e)
async def consumer(queue: asyncio.Queue, total: int):
received = 0
while received < total:
item = await queue.get()
if isinstance(item, Exception):
print(f" Error: {item}")
else:
print(f" [{item.id}] {item.title[:40]}... — {item.author_name}")
queue.task_done()
received += 1
async def main():
post_ids = list(range(1, 11)) # fetch posts 1–10
result_queue: asyncio.Queue = asyncio.Queue()
async with aiohttp.ClientSession() as session:
# Fan-out: create all fetch tasks
producers = [
asyncio.create_task(
fetch_post_with_author(session, pid, result_queue)
)
for pid in post_ids
]
# Fan-in: consume results as they arrive
consumer_task = asyncio.create_task(
consumer(result_queue, len(post_ids))
)
await asyncio.gather(*producers)
await consumer_task
print(f"\nDone. Fetched {len(post_ids)} posts with authors.")
asyncio.run(main())
This project demonstrates: gather() for concurrent fetching, Semaphore for rate limiting, Queue for producer/consumer streaming, structured error handling, and proper session lifecycle management.
Summary
asyncio is Python's answer to the question: how do we handle thousands of concurrent I/O operations without thousands of threads? The answer is a single-threaded event loop where coroutines cooperatively yield control at every await.
Here's the mental map to carry forward:
- Use
async defandawaitto define and drive coroutines. - Use
asyncio.create_task()orasyncio.gather()when you want things to run concurrently. - Use
asyncio.wait()when you need to react to individual task completion. - Protect shared state with
Lock, coordinate tasks withEvent, pipeline work withQueue, and cap concurrency withSemaphore. - Use
aiohttp/httpxfor HTTP,asyncpg/aiosqlitefor databases,aiofilesfor file I/O. - Use
run_in_executorto integrate blocking libraries without freezing the loop. - Reserve
multiprocessingfor CPU-bound work —asyncioshines at I/O, not computation.
The hardest part of learning asyncio is usually the first hour. After that, the patterns are consistent and composable. Start with gather() for parallel fetches, add a Semaphore when you hit rate limits, and reach for Queue when you need a pipeline. Everything else builds on those three.
