Python Concurrency Part 2 subprocess: Isolated Processes, Pipes & Async Execution

← Back to Home

Python's subprocess module lets you spawn a new OS process, send data to it, read its output, and wait for it to complete all from within your Python program. Unlike threads or asyncio coroutines, a subprocess runs in a completely separate OS process with its own memory, interpreter, and GIL.

This is Part 2 of a six-part series on Python concurrency. In Part 1 we covered asyncio. Here we cover subprocess:

Full project code: research-agent

1. Why Subprocess?

The key reason to use subprocess over threads or asyncio is isolation. A subprocess is a completely separate OS process it has its own memory space, its own Python interpreter, and its own GIL. This gives you three guarantees you can't get from threads or coroutines:

If you let users upload spreadsheets and run heavy parsing logic on them, one malformed file should not take down your whole web app. Running that parser in a subprocess keeps the main service alive even if the child crashes.

2. subprocess.run() The Simple Way

subprocess.run() is the high-level, blocking interface. It runs a command, waits for it to complete, and returns a CompletedProcess object with the return code and output.

import subprocess

# Run a simple shell command
result = subprocess.run(
    ["echo", "Hello from subprocess"],
    capture_output=True,    # capture stdout and stderr
    text=True,              # decode bytes to str automatically
)
print(result.stdout)        # Hello from subprocess
print(result.returncode)    # 0 (success)
# Run Python code in a subprocess
result = subprocess.run(
    ["python3", "-c", "import sys; print(sys.version)"],
    capture_output=True,
    text=True,
)
print(result.stdout)
# Check return code  raises CalledProcessError if non-zero
result = subprocess.run(
    ["python3", "-c", "raise ValueError('oops')"],
    capture_output=True,
    text=True,
    check=True,             # raise on non-zero exit code
)
# Raises: subprocess.CalledProcessError

subprocess.run() blocks the calling thread until the command finishes. For a quick one-shot command where you don't need concurrency, this is fine. For long-running commands or use inside an async program, use the async variant covered in Section 4.

This is a good fit when your deployment script just needs to run git, ffmpeg, or a one-off CLI command and wait for the result before continuing. It keeps the code simple when parallelism is not the goal.

3. subprocess.Popen() Full Process Control

subprocess.Popen() gives you a live handle to the running process. You can write to its stdin, read from its stdout/stderr, and check or wait on its status all while it's still running.

import subprocess
import json

# Script that reads JSON from stdin, processes it, prints JSON to stdout
script = """
import sys, json
data = json.load(sys.stdin)
result = {"count": len(data), "doubled": [x * 2 for x in data]}
print(json.dumps(result))
"""

proc = subprocess.Popen(
    ["python3", "-c", script],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

# Send data to stdin, read stdout  communicate() handles both safely
input_data = json.dumps([1, 2, 3, 4, 5]).encode()
stdout, stderr = proc.communicate(input=input_data)

result = json.loads(stdout.decode())
print(result)    # {'count': 5, 'doubled': [2, 4, 6, 8, 10]}

Always prefer communicate() over manually writing to proc.stdin and reading from proc.stdout. The manual approach can deadlock if the subprocess's stdout buffer fills up while you're still writing to stdin.

Suppose you launch a reporting script, stream JSON into it, and expect a large JSON result back. Popen gives you full control over that running process so your parent app can exchange data instead of only waiting for a final exit code.
Enforcing a Timeout
try:
    stdout, stderr = proc.communicate(input=input_data, timeout=10)
except subprocess.TimeoutExpired:
    proc.kill()                  # hard kill  SIGKILL on Unix
    stdout, stderr = proc.communicate()   # drain remaining output
    print("Process timed out and was killed")

4. asyncio.create_subprocess_exec() Non-Blocking Subprocess

subprocess.run() and Popen.communicate() both block the calling thread. Inside an asyncio program this is a problem a blocked thread freezes the event loop, preventing all other coroutines from running.

The solution is asyncio.create_subprocess_exec(). It spawns the subprocess the same way, but proc.communicate() becomes an awaitable coroutine the event loop can run other tasks while waiting for the subprocess to finish.

Imagine an async chatbot that needs to call an OCR script on an uploaded image. If that OCR call blocks the event loop, every other user waits too. The async subprocess version lets the bot keep serving other requests while OCR is running.
import asyncio
import json
import sys

async def run_in_subprocess(data: list) -> dict:
    script = """
import sys, json
data = json.load(sys.stdin)
print(json.dumps({"count": len(data), "sum": sum(data)}))
"""
    proc = await asyncio.create_subprocess_exec(
        sys.executable, "-c", script,   # sys.executable = same Python as parent
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    payload = json.dumps(data).encode()

    # communicate() is awaitable  does NOT block the event loop
    stdout, stderr = await proc.communicate(input=payload)

    if proc.returncode != 0:
        raise RuntimeError(f"Subprocess failed: {stderr.decode()}")

    return json.loads(stdout.decode())

async def main():
    result = await run_in_subprocess([10, 20, 30, 40])
    print(result)    # {'count': 4, 'sum': 100}

asyncio.run(main())

5. Hard Timeouts with asyncio

Combining asyncio.wait_for() with the subprocess gives you a hard timeout that the event loop enforces. If the process doesn't finish in time, it's killed immediately.

async def run_with_timeout(data: list, timeout: float = 10.0) -> dict:
    proc = await asyncio.create_subprocess_exec(
        sys.executable, "analysis_script.py",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    try:
        stdout, stderr = await asyncio.wait_for(
            proc.communicate(input=json.dumps(data).encode()),
            timeout=timeout,
        )
    except asyncio.TimeoutError:
        proc.kill()
        await proc.wait()   # reap the zombie process
        return {"error": f"Timed out after {timeout}s"}

    return json.loads(stdout.decode())

This pattern is one of subprocess's biggest advantages over threads: there is no reliable way to kill a hung thread in Python, but you can always kill a process.

If a PDF extraction script occasionally hangs forever on a corrupt file, a hard timeout prevents one bad input from tying up worker capacity for minutes or hours. That directly protects latency and resource usage in production.

6. The JSON Pipe Protocol

When parent and child processes need to exchange structured data, a clean pattern is to use JSON over stdin/stdout. The parent serialises its input, writes it to stdin, and reads JSON back from stdout. The child reads stdin, does its work, and prints JSON to stdout.

This is useful when one service prepares a batch of orders, sends them to a pricing script, and receives a structured summary back. JSON pipes keep the interface explicit and easy to debug without tightly coupling both programs.

This is the pattern used throughout the Research Agent project. Here's a minimal standalone example of both sides:

analysis_script.py (the child process):

import json
import sys
from collections import Counter

def analyse(items: list) -> dict:
    return {
        "count": len(items),
        "unique": len(set(items)),
        "top": Counter(items).most_common(3),
    }

if __name__ == "__main__":
    try:
        raw = sys.stdin.read()
        items = json.loads(raw)
        print(json.dumps(analyse(items)))   # stdout → parent reads this
    except Exception as e:
        print(json.dumps({"error": str(e)}), file=sys.stderr)
        sys.exit(1)

main.py (the parent process):

import asyncio, json, sys
from pathlib import Path

async def analyse_in_subprocess(items: list) -> dict:
    script = Path("analysis_script.py")
    proc = await asyncio.create_subprocess_exec(
        sys.executable, str(script),
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, _ = await asyncio.wait_for(
        proc.communicate(input=json.dumps(items).encode()),
        timeout=15,
    )
    return json.loads(stdout.decode())

async def main():
    data = ["apple", "banana", "apple", "cherry", "banana", "apple"]
    result = await analyse_in_subprocess(data)
    print(result)

asyncio.run(main())
# {'count': 6, 'unique': 3, 'top': [['apple', 3], ['banana', 2], ['cherry', 1]]}

7. subprocess in the Research Agent

In the Research Agent project, the analysis_agent uses exactly this pattern. The orchestrator calls run_analysis(articles), which spawns analysis_script.py in an isolated subprocess, sends article data via stdin as JSON, and reads the stats report back from stdout.

Full project code: agents/subprocess_analysis/analysis_agent.py

# agents/subprocess_analysis/analysis_agent.py (excerpt)
import asyncio, json, sys
from pathlib import Path

_SCRIPT_PATH = Path(__file__).parent / "analysis_script.py"

async def run_analysis(articles: list) -> dict:
    proc = await asyncio.create_subprocess_exec(
        sys.executable, str(_SCRIPT_PATH),
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    payload = json.dumps([a.to_dict() for a in articles]).encode()
    try:
        stdout, stderr = await asyncio.wait_for(
            proc.communicate(input=payload),
            timeout=15,           # hard kill if script hangs
        )
    except asyncio.TimeoutError:
        proc.kill()
        await proc.wait()
        return {}                 # safe default  orchestrator continues
    return json.loads(stdout.decode())

The analysis script itself (analysis_script.py) is completely standalone it imports nothing from the main project, reads from stdin, and writes to stdout. This isolation means a bug in the analysis logic can never corrupt the orchestrator's state.

8. When to Use subprocess

Use subprocess when…Don't use subprocess when…
You need crash isolation (a bug can't corrupt the parent)You're just doing I/O asyncio or threads are lighter
You need a hard, enforceable timeoutYou need to share in-memory objects between caller and callee
Running untrusted or LLM-generated code safelyThe overhead of process spawning (~50–100ms) is too high for your use case
Calling a CLI tool or a different language runtimeThe work is purely CPU-bound Python use ProcessPoolExecutor instead
Running a script that has its own dependencies/environmentYou just need parallel I/O that's asyncio's job
If you are simply making 50 HTTP calls, subprocess is usually unnecessary overhead. If you are executing a separate toolchain, untrusted code, or a job you may need to kill hard, subprocess is usually the safer boundary.

9. Conclusion

subprocess is Python's tool for full process isolation. Its strengths crash safety, hard timeouts, and the ability to kill a hung process make it the right choice wherever you need the child process to be truly independent of the parent. The asyncio.create_subprocess_exec() variant integrates it cleanly into an async program without blocking the event loop.

In the next post we'll look at threading the right tool when you need to run a synchronous, blocking library (like sqlite3) alongside async code without freezing the event loop.

Isolation is not overhead it's a feature. A subprocess that can crash safely is worth far more than a thread that can bring down everything with it.

← Part 1: asyncio Next: Threading →