Python Concurrency Part 1 asyncio: Event Loops, Coroutines & Synchronization

← Back to Home

Python's asyncio library lets you write concurrent code using a single thread and a single event loop. Instead of blocking while waiting for slow I/O a network response, a database query, a file read your program suspends the waiting task and immediately runs something else. When the I/O completes, execution resumes right where it left off.

This is the first post in a six-part series on Python concurrency. By the end of the series we'll build a complete agentic research bot that uses all three concurrency tools together. In this post we cover asyncio end-to-end:

Full project code: research-agent

1. The Event Loop

The event loop is the scheduler at the heart of asyncio. It maintains a queue of runnable coroutines and switches between them whenever one suspends at an await point. Crucially, there is only one thread no OS-level context switches, no race conditions on shared memory, just cooperative multitasking.

When you call asyncio.run(main()), Python creates an event loop, schedules main() on it, and runs until main() completes. Every await inside is a checkpoint where the loop can switch to another coroutine.

Real-world analogy: Think of a restaurant waiter. When you place your order, the waiter doesn't stand at your table waiting for the chef they write down your order, walk to other tables, take more orders, and return only when your food is ready. The event loop is that waiter: a single thread that never stands idle, switching between tasks at every natural pause.
import asyncio

# An async function is called a coroutine function.
# Calling it returns a coroutine object  it does NOT execute yet.
async def main():
    print("Hello World")

# asyncio.run() creates the event loop and runs the coroutine.
asyncio.run(main())
# Output: Hello World

# In Jupyter notebooks, use top-level await instead:
await main()

The key insight: async def marks a function as a coroutine. Calling it produces a coroutine object but executes nothing. The event loop executes it only when the object is scheduled (via await, create_task, or asyncio.run).

2. Coroutines async def and await

The await keyword suspends the current coroutine and hands control back to the event loop until the awaited coroutine finishes. Other coroutines that are ready can run during this suspension window.

async def fetch_data(delay):
    print("Fetching data...")
    await asyncio.sleep(delay)   # suspends here  event loop runs other tasks
    print("Data fetched!")
    return {"data": "dummy data"}

async def main():
    print("Starting")
    result = await fetch_data(2)  # runs fetch_data, waits for it to finish
    print("Received:", result)

await main()
# Output:
Starting
Fetching data...
Data fetched!
Received: {'data': 'dummy data'}
Real-world analogy: You've submitted a support ticket and are waiting for a reply. Instead of staring at your inbox doing nothing, you close the tab and work on something else. When the email arrives, a notification brings you back. await tells the event loop to do exactly this hand control back and return only when the result is ready.

Note that await asyncio.sleep(2) is non-blocking. The thread is not sleeping the event loop can run other coroutines during those 2 seconds. This is completely unlike time.sleep(2), which actually blocks the thread.

await can only be used inside an async def function. Using it at the top level is only valid in interactive environments (Jupyter, Python REPL with asyncio support).

3. Tasks asyncio.create_task()

Plain await is sequential: one coroutine runs, finishes, then the next starts. To run coroutines concurrently, wrap them in Tasks. A Task schedules the coroutine on the event loop immediately, allowing the loop to interleave it with others.

async def fetch_data(id, delay):
    print(f"Fetching id={id}...")
    await asyncio.sleep(delay)
    print(f"Done id={id}")
    return {"id": id, "data": "result"}

async def main():
    # create_task() schedules the coroutine immediately  all three start now
    task1 = asyncio.create_task(fetch_data(1, 2))
    task2 = asyncio.create_task(fetch_data(2, 3))
    task3 = asyncio.create_task(fetch_data(3, 1))

    # await each task to get its result
    result1 = await task1
    result2 = await task2
    result3 = await task3
    print(result1, result2, result3)

await main()
# Output  notice all three start before any finishes:
Fetching id=1...
Fetching id=2...
Fetching id=3...
Done id=3      # delay=1, finishes first
Done id=1      # delay=2
Done id=2      # delay=3, finishes last
Real-world analogy: A news editor assigns breaking stories to three reporters simultaneously sports, finance, and politics. All three start reporting at the same time. The editor doesn't wait for the first reporter to finish before assigning the second. create_task() does the same: all coroutines start immediately and run concurrently.

Even though we await task1 first, tasks 2 and 3 are already running in the background. Total elapsed time is ~3 seconds (the longest delay), not 6 seconds (sum of all delays).

4. asyncio.gather() Concurrent Fan-Out

asyncio.gather() is the idiomatic way to run multiple coroutines concurrently and collect all their results. It schedules all coroutines as tasks internally and returns a list of results in the same order as the inputs.

Imagine a dashboard that needs user profile data, notifications, and billing info before it can render. If you fetch them one after another, the page feels slow. gather() lets you request all three at once so total wait time is closer to the slowest API, not the sum of all three.
async def main():
    results = await asyncio.gather(
        fetch_data(1, 2),
        fetch_data(2, 3),
        fetch_data(3, 1),
    )
    # results is a list in INPUT order, not completion order
    for r in results:
        print(r)

await main()
# Output:
Fetching id=1...
Fetching id=2...
Fetching id=3...
Done id=3
Done id=1
Done id=2
{'id': 1, 'data': 'result'}
{'id': 2, 'data': 'result'}
{'id': 3, 'data': 'result'}

Important caveat: by default, if any coroutine raises an exception, gather() propagates it immediately and the other coroutines are not automatically cancelled (though their results are lost). Pass return_exceptions=True to capture exceptions as values instead of raising them this prevents one failed source from killing the whole fan-out.

results = await asyncio.gather(
    fetch_data(1, 2),
    fetch_data(2, 3),
    some_failing_coroutine(),
    return_exceptions=True,   # exceptions become values, not crashes
)
# results[2] will be an Exception instance rather than raising

5. asyncio.TaskGroup Structured Concurrency (Python 3.11+)

asyncio.TaskGroup is the modern, safer alternative to gather(). It uses the async with context manager pattern and provides structured concurrency: if any task inside the group raises an exception, all other tasks in the group are automatically cancelled, and the exception is propagated cleanly.

Suppose you are processing one customer checkout that depends on stock check, fraud check, and payment confirmation. If payment fails, you do not want the rest of the workflow continuing in the background. TaskGroup fits that "all parts succeed together or stop together" pattern.
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data(1, 2))
        task2 = tg.create_task(fetch_data(2, 3))
        task3 = tg.create_task(fetch_data(3, 1))
    # Execution reaches here only when ALL tasks have finished (or one raised)

    print(task1.result())
    print(task2.result())
    print(task3.result())

await main()

Use TaskGroup when you want automatic cleanup on failure. Use gather(return_exceptions=True) when you want resilience one source failing shouldn't abort the rest.

Featuregather()TaskGroup
Concurrent execution
Results in input ordervia task.result()
Auto-cancel on failure (unless return_exceptions=False)
Python version3.4+3.11+
Partial failure resilience (return_exceptions=True)

6. Futures

A Future is a low-level object representing a value that will be available at some point. Unlike a coroutine, a Future has no body its result is set externally via future.set_result(). You can await a Future just like a coroutine.

Futures are mostly used when bridging callback-based libraries into the async world. In everyday asyncio code you rarely create them manually create_task() returns a Future (Task is a subclass), and many asyncio internals use them under the hood.

Think about waiting for a file upload widget or a third-party SDK callback to say "done" before moving forward. A Future is the placeholder that lets the rest of your async code wait cleanly for that signal without blocking the thread.
async def main():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    async def set_result():
        await asyncio.sleep(2)
        future.set_result("Ready!")   # signal the future from another coroutine

    asyncio.create_task(set_result())

    print("Waiting for future...")
    result = await future             # suspends until set_result() fires
    print("Got:", result)             # Got: Ready!

await main()

7. Synchronization Primitives

Even though asyncio is single-threaded, shared mutable state is still dangerous when multiple coroutines interleave. asyncio ships four synchronization tools that mirror the threading equivalents but are coroutine-safe.

Lock

A Lock ensures only one coroutine can access a protected block at a time. Use it when multiple coroutines read and write the same object and interleaving would corrupt it.

If two coroutines are both updating the same in-memory shopping cart total, one update can overwrite the other. A Lock forces those updates to happen one at a time so the final value stays correct.
async def main():
    lock = asyncio.Lock()

    async def task(name):
        async with lock:                    # acquire  other tasks wait here
            print(f"{name} acquired lock")
            await asyncio.sleep(1)          # simulate work
            print(f"{name} releasing lock")

    await asyncio.gather(task("A"), task("B"), task("C"))

await main()
# Output  tasks execute strictly one at a time:
A acquired lock
A releasing lock
B acquired lock
B releasing lock
C acquired lock
C releasing lock

Deadlock note: asyncio locks are not reentrant. If a coroutine that already holds the lock tries to acquire it again, it will deadlock. Always release the lock before re-acquiring, or use a different design.

Semaphore

A Semaphore is a generalised lock that allows up to N concurrent holders. It's perfect for rate-limiting for example, capping the number of simultaneous outbound HTTP connections.

If your script hits 200 product pages at once, the target site may throttle or block you. A Semaphore lets you keep concurrency for speed while still respecting a limit like 5 or 10 requests at a time.
async def main():
    semaphore = asyncio.Semaphore(2)   # at most 2 concurrent

    async def task(name):
        async with semaphore:
            print(f"{name} running")
            await asyncio.sleep(1)
            print(f"{name} done")

    await asyncio.gather(task("A"), task("B"), task("C"), task("D"))

await main()
# A and B run concurrently; C and D wait until one slot frees:
A running
B running
A done
B done
C running
D running
C done
D done
Event

An Event is a simple signal: coroutines can wait on it with await event.wait() and another coroutine can fire it with event.set(). All waiters unblock simultaneously when the event is set.

Say several parts of your app must wait until configuration is loaded from a remote service. Instead of polling repeatedly, they can all wait on one Event and resume together the moment setup completes.
async def main():
    event = asyncio.Event()

    async def waiter(name):
        print(f"{name} waiting...")
        await event.wait()
        print(f"{name} unblocked!")

    async def setter():
        await asyncio.sleep(2)
        print("Firing event")
        event.set()

    await asyncio.gather(waiter("W1"), waiter("W2"), setter())

await main()
W1 waiting...
W2 waiting...
Firing event
W1 unblocked!
W2 unblocked!
Condition

A Condition combines a Lock with a signalling mechanism. Waiters call await condition.wait() inside async with condition; the setter calls condition.notify_all() after updating shared state. Waiters re-check the condition in a loop to handle spurious wakeups.

Imagine an inventory worker waiting until stock for an item reaches at least 5 units before dispatching a batch order. A Condition is useful when you need to wait for a specific state, not just a generic "something happened" signal.
async def main():
    condition = asyncio.Condition()
    shared = {"value": 0}

    async def waiter(name):
        async with condition:
            while shared["value"] < 5:
                await condition.wait()    # releases lock, waits for notify
            print(f"{name} saw value={shared['value']}")

    async def setter():
        for i in range(1, 6):
            await asyncio.sleep(0.5)
            async with condition:
                shared["value"] = i
                condition.notify_all()

    await asyncio.gather(waiter("W1"), waiter("W2"), setter())

await main()

8. When to Use asyncio

asyncio excels at I/O-bound concurrency situations where your program is spending most of its time waiting for external resources rather than computing. Good fits include:

asyncio does not help with CPU-bound work. If your code is doing heavy computation matrix multiplication, image processing, ML inference a single event loop thread will still saturate one CPU core regardless of how many coroutines you run. For CPU-bound work, use ProcessPoolExecutor (covered in Part 3).

If your API server spends most of its time waiting on databases or other APIs, asyncio can cut latency nicely. If it spends most of its time resizing images or scoring ML models, asyncio will not speed that up because the CPU is still the bottleneck.
Bottleneckasyncio helps?Use instead
Network I/O (async library) Yes
Network I/O (sync library, e.g. requests)Partiallythreading via run_in_executor
File I/O with aiofilesthreading for sync APIs
CPU computation (numpy, ML) NoProcessPoolExecutor
Spawning isolated scripts asyncio.create_subprocess_execsubprocess for blocking use

9. asyncio in the Research Agent

Throughout this series we'll build a Research Agent an agentic chatbot that answers questions by fetching live data from Wikipedia and HackerNews, analysing it, generating embeddings, and synthesizing an answer via a local Ollama LLM.

asyncio powers the fetch layer. Three sources are queried simultaneously with asyncio.gather(), cutting total fetch time from ~2.5 s to ~1.2 s (the slowest source):

Full project code: agents/async_fetch/fetch_agent.py

import asyncio
import aiohttp

async def run_fetch_agents(query: str) -> list:
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            _fetch_wikipedia_summary(session, query),
            _fetch_wikipedia_search(session, query),
            _fetch_hackernews(session, query),
            return_exceptions=True,     # one failing source won't crash the others
        )
    return [a for batch in results if isinstance(batch, list) for a in batch]

The return_exceptions=True is critical here if Wikipedia is down, HackerNews results still come through. The orchestrator then runs the fetch agent alongside memory, analysis, and embedding agents all inside a single asyncio.gather() call.

10. Conclusion

asyncio gives Python a powerful tool for concurrent I/O without the complexity of threads. The mental model is simple: an event loop runs one coroutine at a time, suspending at every await to let others make progress. The primitives build on top of this:

In the next post we cover subprocess how to run code in a completely isolated OS process, communicate via stdin/stdout, and enforce hard timeouts. It's the right tool when you need true isolation rather than concurrency.

Concurrent doesn't mean parallel and with asyncio, one well-structured thread can outperform many poorly coordinated ones.

Next: Subprocess →