Asynchronous programming allows your code to handle multiple operations concurrently without blocking. This is especially powerful for I/O-bound operations like API calls, database queries, and file operations.
Let's start with a visual comparison:
Synchronous Execution: ┌──────────────────────────────────────────────────────────────┐ │ Task 1 ██████████│ Task 2 ██████████│ Task 3 ██████████ │ │ (3 sec) │ (3 sec) │ (3 sec) │ │ │ │ Total Time: 9 seconds │ └──────────────────────────────────────────────────────────────┘ Asynchronous Execution: ┌──────────────────────────────────────────────────────────────┐ │ Task 1 ██████████ │ │ Task 2 ██████████ (running concurrently) │ │ Task 3 ██████████ │ │ │ │ Total Time: ~3 seconds │ └──────────────────────────────────────────────────────────────┘
import asyncio # Regular synchronous function def sync_fetch_data(): print("Fetching data...") time.sleep(2) # Blocks the entire program return {"data": "result"} # Asynchronous function async def async_fetch_data(): print("Fetching data...") await asyncio.sleep(2) # Non-blocking, allows other tasks to run return {"data": "result"} # Running async code async def main(): result = await async_fetch_data() print(result) # Entry point asyncio.run(main())
# 'async def' creates a coroutine function async def my_coroutine(): return "Hello" # 'await' pauses execution until the awaited coroutine completes async def caller(): result = await my_coroutine() # Wait for my_coroutine to finish print(result) # You can only use 'await' inside an 'async def' function def regular_function(): # await my_coroutine() # SyntaxError! pass
import asyncio import time async def fetch_user(user_id: int) -> dict: """Simulate fetching user from database""" print(f"Fetching user {user_id}...") await asyncio.sleep(1) # Simulate network delay return {"id": user_id, "name": f"User {user_id}"} async def fetch_orders(user_id: int) -> list: """Simulate fetching orders from database""" print(f"Fetching orders for user {user_id}...") await asyncio.sleep(1.5) # Simulate network delay return [{"order_id": 1, "total": 99.99}] async def fetch_recommendations(user_id: int) -> list: """Simulate fetching recommendations""" print(f"Fetching recommendations for user {user_id}...") await asyncio.sleep(0.8) return ["Product A", "Product B"] # Sequential execution - SLOW async def get_user_data_sequential(user_id: int): start = time.time() user = await fetch_user(user_id) orders = await fetch_orders(user_id) recommendations = await fetch_recommendations(user_id) print(f"Sequential time: {time.time() - start:.2f}s") # ~3.3 seconds return {"user": user, "orders": orders, "recommendations": recommendations} # Concurrent execution - FAST async def get_user_data_concurrent(user_id: int): start = time.time() user, orders, recommendations = await asyncio.gather( fetch_user(user_id), fetch_orders(user_id), fetch_recommendations(user_id) ) print(f"Concurrent time: {time.time() - start:.2f}s") # ~1.5 seconds return {"user": user, "orders": orders, "recommendations": recommendations} # Run both versions asyncio.run(get_user_data_sequential(1)) asyncio.run(get_user_data_concurrent(1))
async def background_task(name: str, duration: int): print(f"Task {name} started") await asyncio.sleep(duration) print(f"Task {name} completed") return f"Result from {name}" async def main(): # Create tasks - they start running immediately task1 = asyncio.create_task(background_task("A", 2)) task2 = asyncio.create_task(background_task("B", 1)) print("Tasks created, doing other work...") # Wait for tasks when needed result1 = await task1 result2 = await task2 print(f"Results: {result1}, {result2}") asyncio.run(main())
async def risky_operation(should_fail: bool): await asyncio.sleep(0.5) if should_fail: raise ValueError("Operation failed!") return "Success" async def main(): # Handle exceptions normally try: result = await risky_operation(True) except ValueError as e: print(f"Caught error: {e}")
async def task_that_fails(): await asyncio.sleep(0.5) raise RuntimeError("I failed!") async def task_that_succeeds(): await asyncio.sleep(0.5) return "I succeeded!" async def main(): # By default, gather() cancels all tasks if one fails try: results = await asyncio.gather( task_that_fails(), task_that_succeeds() ) except RuntimeError as e: print(f"One task failed: {e}") # Use return_exceptions=True to capture exceptions as results results = await asyncio.gather( task_that_fails(), task_that_succeeds(), return_exceptions=True ) for i, result in enumerate(results): if isinstance(result, Exception): print(f"Task {i} failed: {result}") else: print(f"Task {i} succeeded: {result}") asyncio.run(main())
import asyncio import aiohttp from typing import List, Dict async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict: """Fetch a single URL""" try: async with session.get(url) as response: data = await response.json() return {"url": url, "status": response.status, "data": data} except Exception as e: return {"url": url, "error": str(e)} async def fetch_all_urls(urls: List[str]) -> List[Dict]: """Fetch multiple URLs concurrently""" async with aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) return results # Example usage async def main(): urls = [ "https://api.github.com/users/python", "https://api.github.com/users/django", "https://api.github.com/users/fastapi", ] results = await fetch_all_urls(urls) for result in results: print(f"{result['url']}: {result.get('status', 'error')}") asyncio.run(main())
import asyncio from asyncio import Semaphore class RateLimitedClient: def __init__(self, max_concurrent: int = 5, delay: float = 0.1): self.semaphore = Semaphore(max_concurrent) self.delay = delay async def fetch(self, url: str) -> dict: async with self.semaphore: # Limit concurrent requests async with aiohttp.ClientSession() as session: async with session.get(url) as response: result = await response.json() await asyncio.sleep(self.delay) # Rate limiting delay return result async def fetch_many(self, urls: list) -> list: tasks = [self.fetch(url) for url in urls] return await asyncio.gather(*tasks, return_exceptions=True) # Usage async def main(): client = RateLimitedClient(max_concurrent=3, delay=0.5) urls = [f"https://api.example.com/item/{i}" for i in range(20)] results = await client.fetch_many(urls)
import asyncio import asyncpg from typing import List, Optional class AsyncDatabase: def __init__(self, dsn: str): self.dsn = dsn self.pool: Optional[asyncpg.Pool] = None async def connect(self): """Create connection pool""" self.pool = await asyncpg.create_pool( self.dsn, min_size=5, max_size=20 ) async def disconnect(self): """Close connection pool""" if self.pool: await self.pool.close() async def fetch_user(self, user_id: int) -> dict: """Fetch single user""" async with self.pool.acquire() as conn: row = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id ) return dict(row) if row else None async def fetch_users(self, user_ids: List[int]) -> List[dict]: """Fetch multiple users concurrently""" tasks = [self.fetch_user(uid) for uid in user_ids] return await asyncio.gather(*tasks) async def insert_user(self, name: str, email: str) -> int: """Insert a user and return ID""" async with self.pool.acquire() as conn: return await conn.fetchval( "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id", name, email ) # Usage with context manager pattern class DatabaseContext: def __init__(self, dsn: str): self.db = AsyncDatabase(dsn) async def __aenter__(self): await self.db.connect() return self.db async def __aexit__(self, exc_type, exc_val, exc_tb): await self.db.disconnect() async def main(): async with DatabaseContext("postgresql://user:pass@localhost/db") as db: user = await db.fetch_user(1) users = await db.fetch_users([1, 2, 3, 4, 5])
import asyncio from asyncio import Queue from typing import Any async def producer(queue: Queue, items: list): """Produces items and puts them in the queue""" for item in items: await queue.put(item) print(f"Produced: {item}") await asyncio.sleep(0.1) # Signal that production is done await queue.put(None) async def consumer(queue: Queue, name: str): """Consumes items from the queue""" while True: item = await queue.get() if item is None: # Put None back for other consumers await queue.put(None) break print(f"Consumer {name} processing: {item}") await asyncio.sleep(0.3) # Simulate processing queue.task_done() async def main(): queue = Queue(maxsize=10) items = list(range(20)) # Start producer and multiple consumers producer_task = asyncio.create_task(producer(queue, items)) consumer_tasks = [ asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3) ] # Wait for producer to finish await producer_task # Wait for all items to be processed await queue.join() # Cancel consumers for task in consumer_tasks: task.cancel() asyncio.run(main())
import asyncio async def long_running_task(): """Simulates a task that takes too long""" print("Starting long task...") await asyncio.sleep(10) print("Task completed!") return "Done" async def main(): # Using wait_for with timeout try: result = await asyncio.wait_for( long_running_task(), timeout=2.0 ) except asyncio.TimeoutError: print("Task timed out!") # Manual cancellation task = asyncio.create_task(long_running_task()) await asyncio.sleep(1) # Let task run for 1 second task.cancel() # Cancel the task try: await task except asyncio.CancelledError: print("Task was cancelled!") asyncio.run(main())
from fastapi import FastAPI, HTTPException from typing import List import asyncio import httpx app = FastAPI() async def fetch_external_data(item_id: int) -> dict: """Fetch data from external API""" async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/items/{item_id}") return response.json() @app.get("/items/{item_id}") async def get_item(item_id: int): """Async endpoint that fetches external data""" try: data = await fetch_external_data(item_id) return {"item_id": item_id, "data": data} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/items/batch/") async def get_items_batch(ids: str): """Fetch multiple items concurrently""" item_ids = [int(id) for id in ids.split(",")] # Fetch all items concurrently tasks = [fetch_external_data(id) for id in item_ids] results = await asyncio.gather(*tasks, return_exceptions=True) return { "items": [ {"id": id, "data": result} for id, result in zip(item_ids, results) if not isinstance(result, Exception) ] }
# views.py from django.http import JsonResponse import asyncio import httpx async def fetch_user_data(user_id: int) -> dict: async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json() async def fetch_user_orders(user_id: int) -> list: async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}/orders") return response.json() async def user_dashboard(request, user_id): """Async Django view""" # Fetch data concurrently user_data, orders = await asyncio.gather( fetch_user_data(user_id), fetch_user_orders(user_id) ) return JsonResponse({ "user": user_data, "orders": orders })
import time import asyncio # BAD - This blocks the entire event loop async def bad_example(): time.sleep(5) # Blocks! return "done" # GOOD - Use asyncio.sleep or run in executor async def good_example(): await asyncio.sleep(5) # Non-blocking return "done" # For CPU-bound or blocking I/O, use run_in_executor async def run_blocking_code(): loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, # Use default executor time.sleep, # Blocking function 5 # Arguments ) return result
# BAD - May overwhelm the server or hit rate limits async def bad_bulk_fetch(urls: list): tasks = [fetch(url) for url in urls] # 10,000 concurrent requests! return await asyncio.gather(*tasks) # GOOD - Use semaphore to limit concurrency async def good_bulk_fetch(urls: list, max_concurrent: int = 50): semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_limit(url): async with semaphore: return await fetch(url) tasks = [fetch_with_limit(url) for url in urls] return await asyncio.gather(*tasks)
# BAD - Coroutine never executed! async def main(): fetch_data() # Returns coroutine, doesn't execute! # RuntimeWarning: coroutine 'fetch_data' was never awaited # GOOD async def main(): await fetch_data() # Actually executes
import asyncio import time import aiohttp import requests URLS = [f"https://httpbin.org/delay/1" for _ in range(10)] # Synchronous version def sync_fetch_all(): start = time.time() results = [] for url in URLS: response = requests.get(url) results.append(response.status_code) print(f"Sync time: {time.time() - start:.2f}s") return results # Asynchronous version async def async_fetch_all(): start = time.time() async with aiohttp.ClientSession() as session: tasks = [] for url in URLS: tasks.append(session.get(url)) responses = await asyncio.gather(*tasks) results = [r.status for r in responses] print(f"Async time: {time.time() - start:.2f}s") return results # Results: # Sync time: ~10.5s (sequential) # Async time: ~1.2s (concurrent)
Asynchronous programming in Python is essential for building high-performance applications. Key takeaways:
await and run_in_executor for blocking codeWith these patterns, you can build applications that handle thousands of concurrent operations efficiently.