We've built the fetch agent (asyncio), the memory agent (threading), and the analysis agent (subprocess). In this final part we add the fourth concurrency primitive ProcessPoolExecutor for CPU-bound embedding generation and then wire everything together in the LangChain + Ollama orchestrator. We close with a complete end-to-end walkthrough of a single user turn.
Full project code: research-agent
Embedding generation converting text into dense numeric vectors is CPU-bound. It involves matrix multiplications that peg the CPU at 100%. Python's GIL (Global Interpreter Lock) means that even if you put this work in a thread, only one thread executes Python bytecode at a time. For CPU-bound work, threads give you concurrency but not true parallelism they still take turns on a single core.
ProcessPoolExecutor spawns separate Python interpreter processes, each with their own GIL. Numpy operations in a worker process run truly in parallel with the event loop in the main process the main process can continue awaiting network responses while the worker is crunching numbers on a different core.
The worker function must be at module level (not a lambda or closure). This is a hard requirement of the "spawn" start method used on macOS the function must be picklable so it can be sent to the worker process. Lambdas and nested functions are not picklable.
import numpy as np
from core.config import config
# Must be a top-level function required for pickle serialisation
def _compute_embeddings(texts: list[str]) -> list[list[float]]:
"""
CPU-bound embedding computation. Runs inside a worker process.
In production, replace with a real model:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(texts).tolist()
Here we use deterministic hash-based vectors for a self-contained demo.
"""
dim = config.embedding_dim
embeddings = []
for text in texts:
vec = np.zeros(dim, dtype=np.float32)
words = text.lower().split()
for i, word in enumerate(words[:dim]):
h = hash(word) % (10 ** 6)
vec[i % dim] += (h / 10 ** 6) - 0.5
# L2 normalise so cosine similarity = dot product
norm = np.linalg.norm(vec) + 1e-8
embeddings.append((vec / norm).tolist())
return embeddings
from concurrent.futures import ProcessPoolExecutor
async def generate_embeddings(texts: list[str]) -> list[list[float]]:
"""
Dispatch CPU-bound work to a process pool.
The event loop is free while workers are crunching numbers.
"""
if not texts:
return []
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=config.embedding_workers) as pool:
embeddings = await loop.run_in_executor(pool, _compute_embeddings, texts)
return embeddings
loop.run_in_executor(pool, fn, args) submits the function to the process pool and returns a Future. The event loop can await this Future without blocking other coroutines continue to run. When the worker process completes the computation, the Future resolves and execution returns to generate_embeddings().
Once we have embeddings for all articles and for the query, we rank articles by cosine similarity to find the most relevant ones to inject into the LLM's context. Because all vectors are L2-normalised, cosine similarity reduces to a dot product.
def cosine_similarity(v1: list[float], v2: list[float]) -> float:
"""Dot product of two pre-normalised vectors = cosine similarity."""
a = np.array(v1, dtype=np.float32)
b = np.array(v2, dtype=np.float32)
return float(np.dot(a, b))
def find_most_relevant(
query_embedding: list[float],
article_embeddings: list[list[float]],
top_k: int = 3,
) -> list[int]:
"""Return indices of the top-k most similar articles to the query."""
scores = [
(i, cosine_similarity(query_embedding, emb))
for i, emb in enumerate(article_embeddings)
]
scores.sort(key=lambda x: x[1], reverse=True)
return [i for i, _ in scores[:top_k]]
The orchestrator is the brain of the system. It owns two LangChain chains built with ChatOllama and uses asyncio.gather() to run all agents in parallel. It is stateless all session data lives in ConversationState objects passed in by the caller, so one ResearchPipeline instance can serve multiple concurrent sessions.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama import ChatOllama
class ResearchPipeline:
def __init__(self) -> None:
self.llm = ChatOllama(
model=config.ollama_model, # e.g. "llama3.2"
base_url=config.ollama_base_url, # "http://localhost:11434"
temperature=config.llm_temperature,
)
# Chain 1: extract 1-3 search keywords from the user's question
self._topic_prompt = ChatPromptTemplate.from_messages([
("system",
"Extract 1-3 concise search keywords from the user's question. "
"Return ONLY a comma-separated list no explanation, no numbering. "
"Example: 'transformer architecture, attention mechanism'"),
("human", "{question}"),
])
self._topic_chain = self._topic_prompt | self.llm | StrOutputParser()
# Chain 2: synthesize final answer from context + history + analysis
self._synthesis_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a research assistant. Answer the user's question using "
"the provided source context. Be accurate, concise, and cite sources "
"where relevant. If the context is insufficient, say so honestly."),
("human",
"Question: {question}\n\n"
"--- Source Context ---\n{context}\n\n"
"--- Conversation History ---\n{history}\n\n"
"--- Analysis Report ---\n{analysis}\n\n"
"Provide a clear, well-structured answer:"),
])
self._synthesis_chain = (
self._synthesis_prompt | self.llm | StrOutputParser()
)
async def run(self, question: str, state: ConversationState) -> str:
# 1. Record user message
state.messages.append(Message(role="user", content=question))
# 2. Sequential: extract search topics (agents need these keywords)
topics_raw = await self._topic_chain.ainvoke({"question": question})
topics = [t.strip() for t in topics_raw.split(",") if t.strip()]
primary_topic = topics[0]
# 3. Parallel: fetch + memory restore (fetch and analysis/embed need articles first)
(articles, _, _, _) = await asyncio.gather(
run_fetch_agents(primary_topic), # asyncio Wikipedia + HackerNews
_noop_analysis(), # placeholder
_noop_embeddings(), # placeholder
load_state(state.session_id, state), # threading SQLite read
)
# 4. Parallel: analysis + embedding on the freshly fetched articles
analysis, embeddings = await asyncio.gather(
run_analysis(articles), # subprocess isolated stats
generate_embeddings( # ProcessPoolExecutor CPU-bound
[f"{a.title} {a.content}" for a in articles]
),
)
# 5. Embed the query, find most relevant articles
q_embeddings = await generate_embeddings([question])
top_indices = find_most_relevant(q_embeddings[0], embeddings, top_k=4)
relevant_articles = [articles[i] for i in top_indices]
# 6. Sequential: synthesize the final answer
answer = await self._synthesis_chain.ainvoke({
"question": question,
"context": _format_context(relevant_articles),
"history": build_history_context(state),
"analysis": analysis.summary_text() if analysis else "N/A",
})
# 7. Record answer + fire background tasks
state.messages.append(Message(role="assistant", content=answer))
asyncio.create_task(maybe_summarize(state, self.llm)) # async background
asyncio.create_task(save_state(state)) # threading background
return answer
The two-phase gather is a deliberate design. Analysis and embedding depend on having the articles from the fetch agent, so they can't all run in a single gather() call. The first gather fetches articles and restores memory simultaneously. The second gather then runs analysis and embedding on those articles simultaneously. Within each phase, all work is parallel.
The entry point has two modes: interactive (multi-turn chat) and demo (three preset questions that also trigger token optimization). One important setup line comes first: multiprocessing.set_start_method("fork"). On macOS, the default start method is "spawn" which requires all worker functions to be picklable using "fork" is faster and avoids the need to re-import modules in each worker.
import asyncio, argparse, multiprocessing, sys
# Must be set before any ProcessPoolExecutor is created
if sys.platform != "win32":
multiprocessing.set_start_method("fork", force=True)
async def _chat_loop(session_id: str | None) -> None:
"""Interactive multi-turn conversation loop."""
from agents.threaded_memory.memory_agent import init_db
from orchestrator.pipeline import ResearchPipeline
await init_db()
pipeline = ResearchPipeline()
state = state_store.get_or_create(session_id)
while True:
try:
question = input("You: ").strip()
except (EOFError, KeyboardInterrupt):
break
if not question or question.lower() in {"quit", "exit"}:
break
answer = await pipeline.run(question, state)
print(f"\nAssistant:\n{answer}\n")
async def _demo_run() -> None:
"""3-turn demo that triggers token optimization."""
config.summarize_after_n_messages = 4 # lower threshold for demo
await init_db()
pipeline = ResearchPipeline()
state = state_store.get_or_create("demo-session")
questions = [
"What is the transformer architecture in machine learning?",
"How does attention mechanism work in transformers?",
"What are the main differences between GPT and BERT?",
]
for q in questions:
answer = await pipeline.run(q, state)
print(f"\nAssistant:\n{answer}\n")
await asyncio.sleep(0.5) # allow background tasks to flush
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--session", default=None)
parser.add_argument("--demo", action="store_true")
args = parser.parse_args()
if args.demo:
asyncio.run(_demo_run())
else:
asyncio.run(_chat_loop(args.session))
if __name__ == "__main__":
main()
Here's exactly what happens when the user types "What is the transformer architecture in machine learning?":
═══════════════════════════════════════════════
User: "What is the transformer architecture?"
═══════════════════════════════════════════════
[Step 1] LLM extracts topics (sequential)
→ "transformer architecture, attention mechanism"
[Step 2] First gather fetch + memory restore in parallel
├── [ASYNC] fetch_agent → Wikipedia: "Transformer (deep learning)"
├── [ASYNC] fetch_agent → Wikipedia search: 3 related articles
├── [ASYNC] fetch_agent → HackerNews: top posts matching topic
└── [THREAD] memory_agent.load_state() → restores prior session from SQLite
All four finish in ~1.5s (vs ~4.5s sequential)
[Step 2b] Second gather analysis + embedding in parallel
├── [SUBPROC] analysis_script.py → receives 8 articles via stdin JSON
│ → computes avg relevance, keyword freq, top articles
│ → returns JSON via stdout
└── [PROCESS] _compute_embeddings(8 article texts)
→ numpy matrix ops across 128-dim vectors
→ returns 8 normalised vectors
[Step 3] Embed query + find top-4 most relevant articles
[PROCESS] _compute_embeddings(["What is the transformer..."])
→ cosine similarity ranking → top 4 article indices
[Step 4] LLM synthesizes final answer (sequential)
→ synthesis_chain.ainvoke({question, context, history, analysis})
→ streams response from Ollama
[Step 5] Background tasks (fire-and-forget)
asyncio.create_task(maybe_summarize(state, llm)) ← async
asyncio.create_task(save_state(state)) ← threading
↑ Both scheduled but NOT awaited user sees answer immediately
═══════════════════════════════════════════════
Total wall time: ~3-5s (sequential ≈ 10-15s)
═══════════════════════════════════════════════
The speedup comes entirely from the parallel phases. Network I/O (Wikipedia + HackerNews) normally dominates latency. By running all three fetch coroutines simultaneously plus memory restore, what would take 3-4 seconds sequentially takes about 1.5 seconds the wall time of the slowest single fetch, not the sum.
# 1. Install Ollama and pull the model
# https://ollama.com
ollama pull llama3.2
# 2. Install dependencies
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -r requirements.txt
# 3. Interactive mode
python main.py
# 4. Resume a previous session
python main.py --session my-session-id
# 5. Non-interactive demo (3 turns, triggers token optimization)
python main.py --demo
The table below summarises every concurrency decision made in the project. Use it as a reference when designing your own agent systems.
┌─────────────────────────────┬──────────────────────────┬──────────────────────────────────────────┐
│ Workload │ Primitive │ Why │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Multiple HTTP requests │ asyncio.gather() │ I/O-bound; event loop multiplexes sockets│
│ (Wikipedia, HackerNews) │ │ No threads or processes needed │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ SQLite reads/writes │ ThreadPoolExecutor │ sqlite3 is sync; I/O-bound so GIL ok; │
│ (memory agent) │ + run_in_executor │ threads avoid blocking the event loop │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Stats over article data │ subprocess │ Isolation + hard timeout + future │
│ (analysis agent) │ + wait_for timeout │ sandboxing of dynamic/LLM code │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ numpy embedding computation │ ProcessPoolExecutor │ CPU-bound; needs real parallelism; │
│ (embedding agent) │ + run_in_executor │ GIL bypassed via separate process │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ LLM calls (Ollama) │ asyncio await │ ChatOllama.ainvoke() is native async; │
│ (both chains) │ │ no additional wrapping needed │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Token optimization │ asyncio.create_task() │ Fire-and-forget; no latency impact; │
│ (post-response) │ │ background LLM call │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Memory sync │ asyncio.create_task() │ Fire-and-forget; threading handled │
│ (post-response) │ + ThreadPoolExecutor │ inside save_state() │
└─────────────────────────────┴──────────────────────────┴──────────────────────────────────────────┘
The core principle: match the primitive to the workload. asyncio for anything that waits on I/O. Threads for synchronous blocking libraries that do I/O. Subprocess when you need crash isolation or hard timeouts. Process pools when you need true CPU parallelism. Mixing all four in the same pipeline each in the right place is what makes the research agent fast and resilient.
This six-part series started from theory and ended with a running multi-agent system. Here's how the pieces connect:
The full runnable code, including the GitHub push script and README, is in the repository below.
Full project code: research-agent