Python's asyncio library lets you write concurrent code using a single thread and a single event loop. Instead of blocking while waiting for slow I/O a network response, a database query, a file read your program suspends the waiting task and immediately runs something else. When the I/O completes, execution resumes right where it left off.
This is the first post in a six-part series on Python concurrency. By the end of the series we'll build a complete agentic research bot that uses all three concurrency tools together. In this post we cover asyncio end-to-end:
async def and awaitasyncio.create_task()asyncio.gather() and asyncio.TaskGroupLock, Semaphore, Event, ConditionFull project code: research-agent
The event loop is the scheduler at the heart of asyncio. It maintains a queue of runnable coroutines and switches between them whenever one suspends at an await point. Crucially, there is only one thread no OS-level context switches, no race conditions on shared memory, just cooperative multitasking.
When you call asyncio.run(main()), Python creates an event loop, schedules main() on it, and runs until main() completes. Every await inside is a checkpoint where the loop can switch to another coroutine.
import asyncio
# An async function is called a coroutine function.
# Calling it returns a coroutine object it does NOT execute yet.
async def main():
print("Hello World")
# asyncio.run() creates the event loop and runs the coroutine.
asyncio.run(main())
# Output: Hello World
# In Jupyter notebooks, use top-level await instead:
await main()
The key insight: async def marks a function as a coroutine. Calling it produces a coroutine object but executes nothing. The event loop executes it only when the object is scheduled (via await, create_task, or asyncio.run).
The await keyword suspends the current coroutine and hands control back to the event loop until the awaited coroutine finishes. Other coroutines that are ready can run during this suspension window.
async def fetch_data(delay):
print("Fetching data...")
await asyncio.sleep(delay) # suspends here event loop runs other tasks
print("Data fetched!")
return {"data": "dummy data"}
async def main():
print("Starting")
result = await fetch_data(2) # runs fetch_data, waits for it to finish
print("Received:", result)
await main()
# Output:
Starting
Fetching data...
Data fetched!
Received: {'data': 'dummy data'}
await tells the event loop to do exactly this hand control back and return only when the result is ready.
Note that await asyncio.sleep(2) is non-blocking. The thread is not sleeping the event loop can run other coroutines during those 2 seconds. This is completely unlike time.sleep(2), which actually blocks the thread.
await can only be used inside an async def function. Using it at the top level is only valid in interactive environments (Jupyter, Python REPL with asyncio support).
Plain await is sequential: one coroutine runs, finishes, then the next starts. To run coroutines concurrently, wrap them in Tasks. A Task schedules the coroutine on the event loop immediately, allowing the loop to interleave it with others.
async def fetch_data(id, delay):
print(f"Fetching id={id}...")
await asyncio.sleep(delay)
print(f"Done id={id}")
return {"id": id, "data": "result"}
async def main():
# create_task() schedules the coroutine immediately all three start now
task1 = asyncio.create_task(fetch_data(1, 2))
task2 = asyncio.create_task(fetch_data(2, 3))
task3 = asyncio.create_task(fetch_data(3, 1))
# await each task to get its result
result1 = await task1
result2 = await task2
result3 = await task3
print(result1, result2, result3)
await main()
# Output notice all three start before any finishes:
Fetching id=1...
Fetching id=2...
Fetching id=3...
Done id=3 # delay=1, finishes first
Done id=1 # delay=2
Done id=2 # delay=3, finishes last
create_task() does the same: all coroutines start immediately and run concurrently.
Even though we await task1 first, tasks 2 and 3 are already running in the background. Total elapsed time is ~3 seconds (the longest delay), not 6 seconds (sum of all delays).
asyncio.gather() is the idiomatic way to run multiple coroutines concurrently and collect all their results. It schedules all coroutines as tasks internally and returns a list of results in the same order as the inputs.
gather() lets you request all three at once so total wait time is closer to the slowest API, not the sum of all three.async def main():
results = await asyncio.gather(
fetch_data(1, 2),
fetch_data(2, 3),
fetch_data(3, 1),
)
# results is a list in INPUT order, not completion order
for r in results:
print(r)
await main()
# Output:
Fetching id=1...
Fetching id=2...
Fetching id=3...
Done id=3
Done id=1
Done id=2
{'id': 1, 'data': 'result'}
{'id': 2, 'data': 'result'}
{'id': 3, 'data': 'result'}
Important caveat: by default, if any coroutine raises an exception, gather() propagates it immediately and the other coroutines are not automatically cancelled (though their results are lost). Pass return_exceptions=True to capture exceptions as values instead of raising them this prevents one failed source from killing the whole fan-out.
results = await asyncio.gather(
fetch_data(1, 2),
fetch_data(2, 3),
some_failing_coroutine(),
return_exceptions=True, # exceptions become values, not crashes
)
# results[2] will be an Exception instance rather than raising
asyncio.TaskGroup is the modern, safer alternative to gather(). It uses the async with context manager pattern and provides structured concurrency: if any task inside the group raises an exception, all other tasks in the group are automatically cancelled, and the exception is propagated cleanly.
TaskGroup fits that "all parts succeed together or stop together" pattern.async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(1, 2))
task2 = tg.create_task(fetch_data(2, 3))
task3 = tg.create_task(fetch_data(3, 1))
# Execution reaches here only when ALL tasks have finished (or one raised)
print(task1.result())
print(task2.result())
print(task3.result())
await main()
Use TaskGroup when you want automatic cleanup on failure. Use gather(return_exceptions=True) when you want resilience one source failing shouldn't abort the rest.
| Feature | gather() | TaskGroup |
|---|---|---|
| Concurrent execution | ||
| Results in input order | via task.result() | |
| Auto-cancel on failure | (unless return_exceptions=False) | |
| Python version | 3.4+ | 3.11+ |
| Partial failure resilience | (return_exceptions=True) |
A Future is a low-level object representing a value that will be available at some point. Unlike a coroutine, a Future has no body its result is set externally via future.set_result(). You can await a Future just like a coroutine.
Futures are mostly used when bridging callback-based libraries into the async world. In everyday asyncio code you rarely create them manually create_task() returns a Future (Task is a subclass), and many asyncio internals use them under the hood.
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
async def set_result():
await asyncio.sleep(2)
future.set_result("Ready!") # signal the future from another coroutine
asyncio.create_task(set_result())
print("Waiting for future...")
result = await future # suspends until set_result() fires
print("Got:", result) # Got: Ready!
await main()
Even though asyncio is single-threaded, shared mutable state is still dangerous when multiple coroutines interleave. asyncio ships four synchronization tools that mirror the threading equivalents but are coroutine-safe.
A Lock ensures only one coroutine can access a protected block at a time. Use it when multiple coroutines read and write the same object and interleaving would corrupt it.
Lock forces those updates to happen one at a time so the final value stays correct.async def main():
lock = asyncio.Lock()
async def task(name):
async with lock: # acquire other tasks wait here
print(f"{name} acquired lock")
await asyncio.sleep(1) # simulate work
print(f"{name} releasing lock")
await asyncio.gather(task("A"), task("B"), task("C"))
await main()
# Output tasks execute strictly one at a time:
A acquired lock
A releasing lock
B acquired lock
B releasing lock
C acquired lock
C releasing lock
Deadlock note: asyncio locks are not reentrant. If a coroutine that already holds the lock tries to acquire it again, it will deadlock. Always release the lock before re-acquiring, or use a different design.
A Semaphore is a generalised lock that allows up to N concurrent holders. It's perfect for rate-limiting for example, capping the number of simultaneous outbound HTTP connections.
Semaphore lets you keep concurrency for speed while still respecting a limit like 5 or 10 requests at a time.async def main():
semaphore = asyncio.Semaphore(2) # at most 2 concurrent
async def task(name):
async with semaphore:
print(f"{name} running")
await asyncio.sleep(1)
print(f"{name} done")
await asyncio.gather(task("A"), task("B"), task("C"), task("D"))
await main()
# A and B run concurrently; C and D wait until one slot frees:
A running
B running
A done
B done
C running
D running
C done
D done
An Event is a simple signal: coroutines can wait on it with await event.wait() and another coroutine can fire it with event.set(). All waiters unblock simultaneously when the event is set.
Event and resume together the moment setup completes.async def main():
event = asyncio.Event()
async def waiter(name):
print(f"{name} waiting...")
await event.wait()
print(f"{name} unblocked!")
async def setter():
await asyncio.sleep(2)
print("Firing event")
event.set()
await asyncio.gather(waiter("W1"), waiter("W2"), setter())
await main()
W1 waiting...
W2 waiting...
Firing event
W1 unblocked!
W2 unblocked!
A Condition combines a Lock with a signalling mechanism. Waiters call await condition.wait() inside async with condition; the setter calls condition.notify_all() after updating shared state. Waiters re-check the condition in a loop to handle spurious wakeups.
Condition is useful when you need to wait for a specific state, not just a generic "something happened" signal.async def main():
condition = asyncio.Condition()
shared = {"value": 0}
async def waiter(name):
async with condition:
while shared["value"] < 5:
await condition.wait() # releases lock, waits for notify
print(f"{name} saw value={shared['value']}")
async def setter():
for i in range(1, 6):
await asyncio.sleep(0.5)
async with condition:
shared["value"] = i
condition.notify_all()
await asyncio.gather(waiter("W1"), waiter("W2"), setter())
await main()
asyncio excels at I/O-bound concurrency situations where your program is spending most of its time waiting for external resources rather than computing. Good fits include:
asyncio does not help with CPU-bound work. If your code is doing heavy computation matrix multiplication, image processing, ML inference a single event loop thread will still saturate one CPU core regardless of how many coroutines you run. For CPU-bound work, use ProcessPoolExecutor (covered in Part 3).
| Bottleneck | asyncio helps? | Use instead |
|---|---|---|
| Network I/O (async library) | Yes | |
| Network I/O (sync library, e.g. requests) | Partially | threading via run_in_executor |
| File I/O | with aiofiles | threading for sync APIs |
| CPU computation (numpy, ML) | No | ProcessPoolExecutor |
| Spawning isolated scripts | asyncio.create_subprocess_exec | subprocess for blocking use |
Throughout this series we'll build a Research Agent an agentic chatbot that answers questions by fetching live data from Wikipedia and HackerNews, analysing it, generating embeddings, and synthesizing an answer via a local Ollama LLM.
asyncio powers the fetch layer. Three sources are queried simultaneously with asyncio.gather(), cutting total fetch time from ~2.5 s to ~1.2 s (the slowest source):
Full project code: agents/async_fetch/fetch_agent.py
import asyncio
import aiohttp
async def run_fetch_agents(query: str) -> list:
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
_fetch_wikipedia_summary(session, query),
_fetch_wikipedia_search(session, query),
_fetch_hackernews(session, query),
return_exceptions=True, # one failing source won't crash the others
)
return [a for batch in results if isinstance(batch, list) for a in batch]
The return_exceptions=True is critical here if Wikipedia is down, HackerNews results still come through. The orchestrator then runs the fetch agent alongside memory, analysis, and embedding agents all inside a single asyncio.gather() call.
asyncio gives Python a powerful tool for concurrent I/O without the complexity of threads. The mental model is simple: an event loop runs one coroutine at a time, suspending at every await to let others make progress. The primitives build on top of this:
async def + await: the building blockscreate_task(): schedule a coroutine to run concurrentlyIn the next post we cover subprocess how to run code in a completely isolated OS process, communicate via stdin/stdout, and enforce hard timeouts. It's the right tool when you need true isolation rather than concurrency.
Concurrent doesn't mean parallel and with asyncio, one well-structured thread can outperform many poorly coordinated ones.