Python's subprocess module lets you spawn a new OS process, send data to it, read its output, and wait for it to complete all from within your Python program. Unlike threads or asyncio coroutines, a subprocess runs in a completely separate OS process with its own memory, interpreter, and GIL.
This is Part 2 of a six-part series on Python concurrency. In Part 1 we covered asyncio. Here we cover subprocess:
subprocess.run() the simple blocking callsubprocess.Popen() full control over a running processasyncio.create_subprocess_exec() non-blocking subprocess inside an async programFull project code: research-agent
The key reason to use subprocess over threads or asyncio is isolation. A subprocess is a completely separate OS process it has its own memory space, its own Python interpreter, and its own GIL. This gives you three guarantees you can't get from threads or coroutines:
process.kill() to terminate the subprocess unconditionally, even if it's stuck in an infinite loop or blocking on I/O. There is no equivalent for a hung thread.
subprocess.run() is the high-level, blocking interface. It runs a command, waits for it to complete, and returns a CompletedProcess object with the return code and output.
import subprocess
# Run a simple shell command
result = subprocess.run(
["echo", "Hello from subprocess"],
capture_output=True, # capture stdout and stderr
text=True, # decode bytes to str automatically
)
print(result.stdout) # Hello from subprocess
print(result.returncode) # 0 (success)
# Run Python code in a subprocess
result = subprocess.run(
["python3", "-c", "import sys; print(sys.version)"],
capture_output=True,
text=True,
)
print(result.stdout)
# Check return code raises CalledProcessError if non-zero
result = subprocess.run(
["python3", "-c", "raise ValueError('oops')"],
capture_output=True,
text=True,
check=True, # raise on non-zero exit code
)
# Raises: subprocess.CalledProcessError
subprocess.run() blocks the calling thread until the command finishes. For a quick one-shot command where you don't need concurrency, this is fine. For long-running commands or use inside an async program, use the async variant covered in Section 4.
git, ffmpeg, or a one-off CLI command and wait for the result before continuing. It keeps the code simple when parallelism is not the goal.
subprocess.Popen() gives you a live handle to the running process. You can write to its stdin, read from its stdout/stderr, and check or wait on its status all while it's still running.
import subprocess
import json
# Script that reads JSON from stdin, processes it, prints JSON to stdout
script = """
import sys, json
data = json.load(sys.stdin)
result = {"count": len(data), "doubled": [x * 2 for x in data]}
print(json.dumps(result))
"""
proc = subprocess.Popen(
["python3", "-c", script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Send data to stdin, read stdout communicate() handles both safely
input_data = json.dumps([1, 2, 3, 4, 5]).encode()
stdout, stderr = proc.communicate(input=input_data)
result = json.loads(stdout.decode())
print(result) # {'count': 5, 'doubled': [2, 4, 6, 8, 10]}
Always prefer communicate() over manually writing to proc.stdin and reading from proc.stdout. The manual approach can deadlock if the subprocess's stdout buffer fills up while you're still writing to stdin.
Popen gives you full control over that running process so your parent app can exchange data instead of only waiting for a final exit code.try:
stdout, stderr = proc.communicate(input=input_data, timeout=10)
except subprocess.TimeoutExpired:
proc.kill() # hard kill SIGKILL on Unix
stdout, stderr = proc.communicate() # drain remaining output
print("Process timed out and was killed")
subprocess.run() and Popen.communicate() both block the calling thread. Inside an asyncio program this is a problem a blocked thread freezes the event loop, preventing all other coroutines from running.
The solution is asyncio.create_subprocess_exec(). It spawns the subprocess the same way, but proc.communicate() becomes an awaitable coroutine the event loop can run other tasks while waiting for the subprocess to finish.
import asyncio
import json
import sys
async def run_in_subprocess(data: list) -> dict:
script = """
import sys, json
data = json.load(sys.stdin)
print(json.dumps({"count": len(data), "sum": sum(data)}))
"""
proc = await asyncio.create_subprocess_exec(
sys.executable, "-c", script, # sys.executable = same Python as parent
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
payload = json.dumps(data).encode()
# communicate() is awaitable does NOT block the event loop
stdout, stderr = await proc.communicate(input=payload)
if proc.returncode != 0:
raise RuntimeError(f"Subprocess failed: {stderr.decode()}")
return json.loads(stdout.decode())
async def main():
result = await run_in_subprocess([10, 20, 30, 40])
print(result) # {'count': 4, 'sum': 100}
asyncio.run(main())
Combining asyncio.wait_for() with the subprocess gives you a hard timeout that the event loop enforces. If the process doesn't finish in time, it's killed immediately.
async def run_with_timeout(data: list, timeout: float = 10.0) -> dict:
proc = await asyncio.create_subprocess_exec(
sys.executable, "analysis_script.py",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=json.dumps(data).encode()),
timeout=timeout,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait() # reap the zombie process
return {"error": f"Timed out after {timeout}s"}
return json.loads(stdout.decode())
This pattern is one of subprocess's biggest advantages over threads: there is no reliable way to kill a hung thread in Python, but you can always kill a process.
When parent and child processes need to exchange structured data, a clean pattern is to use JSON over stdin/stdout. The parent serialises its input, writes it to stdin, and reads JSON back from stdout. The child reads stdin, does its work, and prints JSON to stdout.
This is the pattern used throughout the Research Agent project. Here's a minimal standalone example of both sides:
analysis_script.py (the child process):
import json
import sys
from collections import Counter
def analyse(items: list) -> dict:
return {
"count": len(items),
"unique": len(set(items)),
"top": Counter(items).most_common(3),
}
if __name__ == "__main__":
try:
raw = sys.stdin.read()
items = json.loads(raw)
print(json.dumps(analyse(items))) # stdout → parent reads this
except Exception as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
sys.exit(1)
main.py (the parent process):
import asyncio, json, sys
from pathlib import Path
async def analyse_in_subprocess(items: list) -> dict:
script = Path("analysis_script.py")
proc = await asyncio.create_subprocess_exec(
sys.executable, str(script),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(
proc.communicate(input=json.dumps(items).encode()),
timeout=15,
)
return json.loads(stdout.decode())
async def main():
data = ["apple", "banana", "apple", "cherry", "banana", "apple"]
result = await analyse_in_subprocess(data)
print(result)
asyncio.run(main())
# {'count': 6, 'unique': 3, 'top': [['apple', 3], ['banana', 2], ['cherry', 1]]}
In the Research Agent project, the analysis_agent uses exactly this pattern. The orchestrator calls run_analysis(articles), which spawns analysis_script.py in an isolated subprocess, sends article data via stdin as JSON, and reads the stats report back from stdout.
Full project code: agents/subprocess_analysis/analysis_agent.py
# agents/subprocess_analysis/analysis_agent.py (excerpt)
import asyncio, json, sys
from pathlib import Path
_SCRIPT_PATH = Path(__file__).parent / "analysis_script.py"
async def run_analysis(articles: list) -> dict:
proc = await asyncio.create_subprocess_exec(
sys.executable, str(_SCRIPT_PATH),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
payload = json.dumps([a.to_dict() for a in articles]).encode()
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=payload),
timeout=15, # hard kill if script hangs
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return {} # safe default orchestrator continues
return json.loads(stdout.decode())
The analysis script itself (analysis_script.py) is completely standalone it imports nothing from the main project, reads from stdin, and writes to stdout. This isolation means a bug in the analysis logic can never corrupt the orchestrator's state.
| Use subprocess when… | Don't use subprocess when… |
|---|---|
| You need crash isolation (a bug can't corrupt the parent) | You're just doing I/O asyncio or threads are lighter |
| You need a hard, enforceable timeout | You need to share in-memory objects between caller and callee |
| Running untrusted or LLM-generated code safely | The overhead of process spawning (~50–100ms) is too high for your use case |
| Calling a CLI tool or a different language runtime | The work is purely CPU-bound Python use ProcessPoolExecutor instead |
| Running a script that has its own dependencies/environment | You just need parallel I/O that's asyncio's job |
subprocess is Python's tool for full process isolation. Its strengths crash safety, hard timeouts, and the ability to kill a hung process make it the right choice wherever you need the child process to be truly independent of the parent. The asyncio.create_subprocess_exec() variant integrates it cleanly into an async program without blocking the event loop.
In the next post we'll look at threading the right tool when you need to run a synchronous, blocking library (like sqlite3) alongside async code without freezing the event loop.
Isolation is not overhead it's a feature. A subprocess that can crash safely is worth far more than a thread that can bring down everything with it.