Python Concurrency Part 6 Embeddings, Orchestrator & End-to-End Walkthrough

← Back to Home

We've built the fetch agent (asyncio), the memory agent (threading), and the analysis agent (subprocess). In this final part we add the fourth concurrency primitive ProcessPoolExecutor for CPU-bound embedding generation and then wire everything together in the LangChain + Ollama orchestrator. We close with a complete end-to-end walkthrough of a single user turn.

Full project code: research-agent

1. Embedding Agent ProcessPoolExecutor for CPU-Bound Work

Embedding generation converting text into dense numeric vectors is CPU-bound. It involves matrix multiplications that peg the CPU at 100%. Python's GIL (Global Interpreter Lock) means that even if you put this work in a thread, only one thread executes Python bytecode at a time. For CPU-bound work, threads give you concurrency but not true parallelism they still take turns on a single core.

ProcessPoolExecutor spawns separate Python interpreter processes, each with their own GIL. Numpy operations in a worker process run truly in parallel with the event loop in the main process the main process can continue awaiting network responses while the worker is crunching numbers on a different core.

utils/embeddings.py

The worker function runs in a separate process

The worker function must be at module level (not a lambda or closure). This is a hard requirement of the "spawn" start method used on macOS the function must be picklable so it can be sent to the worker process. Lambdas and nested functions are not picklable.

import numpy as np
from core.config import config

# Must be a top-level function  required for pickle serialisation
def _compute_embeddings(texts: list[str]) -> list[list[float]]:
    """
    CPU-bound embedding computation. Runs inside a worker process.

    In production, replace with a real model:
        from sentence_transformers import SentenceTransformer
        model = SentenceTransformer("all-MiniLM-L6-v2")
        return model.encode(texts).tolist()

    Here we use deterministic hash-based vectors for a self-contained demo.
    """
    dim = config.embedding_dim
    embeddings = []

    for text in texts:
        vec = np.zeros(dim, dtype=np.float32)
        words = text.lower().split()
        for i, word in enumerate(words[:dim]):
            h = hash(word) % (10 ** 6)
            vec[i % dim] += (h / 10 ** 6) - 0.5
        # L2 normalise so cosine similarity = dot product
        norm = np.linalg.norm(vec) + 1e-8
        embeddings.append((vec / norm).tolist())

    return embeddings
The async interface bridging process pool into the event loop
from concurrent.futures import ProcessPoolExecutor

async def generate_embeddings(texts: list[str]) -> list[list[float]]:
    """
    Dispatch CPU-bound work to a process pool.
    The event loop is free while workers are crunching numbers.
    """
    if not texts:
        return []

    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=config.embedding_workers) as pool:
        embeddings = await loop.run_in_executor(pool, _compute_embeddings, texts)

    return embeddings

loop.run_in_executor(pool, fn, args) submits the function to the process pool and returns a Future. The event loop can await this Future without blocking other coroutines continue to run. When the worker process completes the computation, the Future resolves and execution returns to generate_embeddings().

Similarity search

Once we have embeddings for all articles and for the query, we rank articles by cosine similarity to find the most relevant ones to inject into the LLM's context. Because all vectors are L2-normalised, cosine similarity reduces to a dot product.

def cosine_similarity(v1: list[float], v2: list[float]) -> float:
    """Dot product of two pre-normalised vectors = cosine similarity."""
    a = np.array(v1, dtype=np.float32)
    b = np.array(v2, dtype=np.float32)
    return float(np.dot(a, b))


def find_most_relevant(
    query_embedding: list[float],
    article_embeddings: list[list[float]],
    top_k: int = 3,
) -> list[int]:
    """Return indices of the top-k most similar articles to the query."""
    scores = [
        (i, cosine_similarity(query_embedding, emb))
        for i, emb in enumerate(article_embeddings)
    ]
    scores.sort(key=lambda x: x[1], reverse=True)
    return [i for i, _ in scores[:top_k]]

2. Orchestrator Coordinating Everything with LangChain

The orchestrator is the brain of the system. It owns two LangChain chains built with ChatOllama and uses asyncio.gather() to run all agents in parallel. It is stateless all session data lives in ConversationState objects passed in by the caller, so one ResearchPipeline instance can serve multiple concurrent sessions.

orchestrator/pipeline.py

The two LangChain chains
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama import ChatOllama

class ResearchPipeline:
    def __init__(self) -> None:
        self.llm = ChatOllama(
            model=config.ollama_model,       # e.g. "llama3.2"
            base_url=config.ollama_base_url, # "http://localhost:11434"
            temperature=config.llm_temperature,
        )

        # Chain 1: extract 1-3 search keywords from the user's question
        self._topic_prompt = ChatPromptTemplate.from_messages([
            ("system",
             "Extract 1-3 concise search keywords from the user's question. "
             "Return ONLY a comma-separated list  no explanation, no numbering. "
             "Example: 'transformer architecture, attention mechanism'"),
            ("human", "{question}"),
        ])
        self._topic_chain = self._topic_prompt | self.llm | StrOutputParser()

        # Chain 2: synthesize final answer from context + history + analysis
        self._synthesis_prompt = ChatPromptTemplate.from_messages([
            ("system",
             "You are a research assistant. Answer the user's question using "
             "the provided source context. Be accurate, concise, and cite sources "
             "where relevant. If the context is insufficient, say so honestly."),
            ("human",
             "Question: {question}\n\n"
             "--- Source Context ---\n{context}\n\n"
             "--- Conversation History ---\n{history}\n\n"
             "--- Analysis Report ---\n{analysis}\n\n"
             "Provide a clear, well-structured answer:"),
        ])
        self._synthesis_chain = (
            self._synthesis_prompt | self.llm | StrOutputParser()
        )
The pipeline run() method step by step
async def run(self, question: str, state: ConversationState) -> str:
    # 1. Record user message
    state.messages.append(Message(role="user", content=question))

    # 2. Sequential: extract search topics (agents need these keywords)
    topics_raw = await self._topic_chain.ainvoke({"question": question})
    topics = [t.strip() for t in topics_raw.split(",") if t.strip()]
    primary_topic = topics[0]

    # 3. Parallel: fetch + memory restore (fetch and analysis/embed need articles first)
    (articles, _, _, _) = await asyncio.gather(
        run_fetch_agents(primary_topic),   # asyncio  Wikipedia + HackerNews
        _noop_analysis(),                  # placeholder
        _noop_embeddings(),                # placeholder
        load_state(state.session_id, state),  # threading  SQLite read
    )

    # 4. Parallel: analysis + embedding on the freshly fetched articles
    analysis, embeddings = await asyncio.gather(
        run_analysis(articles),            # subprocess  isolated stats
        generate_embeddings(              # ProcessPoolExecutor  CPU-bound
            [f"{a.title} {a.content}" for a in articles]
        ),
    )

    # 5. Embed the query, find most relevant articles
    q_embeddings = await generate_embeddings([question])
    top_indices = find_most_relevant(q_embeddings[0], embeddings, top_k=4)
    relevant_articles = [articles[i] for i in top_indices]

    # 6. Sequential: synthesize the final answer
    answer = await self._synthesis_chain.ainvoke({
        "question": question,
        "context": _format_context(relevant_articles),
        "history": build_history_context(state),
        "analysis": analysis.summary_text() if analysis else "N/A",
    })

    # 7. Record answer + fire background tasks
    state.messages.append(Message(role="assistant", content=answer))
    asyncio.create_task(maybe_summarize(state, self.llm))  # async background
    asyncio.create_task(save_state(state))                 # threading background

    return answer

The two-phase gather is a deliberate design. Analysis and embedding depend on having the articles from the fetch agent, so they can't all run in a single gather() call. The first gather fetches articles and restores memory simultaneously. The second gather then runs analysis and embedding on those articles simultaneously. Within each phase, all work is parallel.

3. main.py CLI Entry Point

main.py

The entry point has two modes: interactive (multi-turn chat) and demo (three preset questions that also trigger token optimization). One important setup line comes first: multiprocessing.set_start_method("fork"). On macOS, the default start method is "spawn" which requires all worker functions to be picklable using "fork" is faster and avoids the need to re-import modules in each worker.

import asyncio, argparse, multiprocessing, sys

# Must be set before any ProcessPoolExecutor is created
if sys.platform != "win32":
    multiprocessing.set_start_method("fork", force=True)


async def _chat_loop(session_id: str | None) -> None:
    """Interactive multi-turn conversation loop."""
    from agents.threaded_memory.memory_agent import init_db
    from orchestrator.pipeline import ResearchPipeline

    await init_db()
    pipeline = ResearchPipeline()
    state = state_store.get_or_create(session_id)

    while True:
        try:
            question = input("You: ").strip()
        except (EOFError, KeyboardInterrupt):
            break
        if not question or question.lower() in {"quit", "exit"}:
            break
        answer = await pipeline.run(question, state)
        print(f"\nAssistant:\n{answer}\n")


async def _demo_run() -> None:
    """3-turn demo that triggers token optimization."""
    config.summarize_after_n_messages = 4   # lower threshold for demo

    await init_db()
    pipeline = ResearchPipeline()
    state = state_store.get_or_create("demo-session")

    questions = [
        "What is the transformer architecture in machine learning?",
        "How does attention mechanism work in transformers?",
        "What are the main differences between GPT and BERT?",
    ]
    for q in questions:
        answer = await pipeline.run(q, state)
        print(f"\nAssistant:\n{answer}\n")
        await asyncio.sleep(0.5)   # allow background tasks to flush


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("--session", default=None)
    parser.add_argument("--demo", action="store_true")
    args = parser.parse_args()

    if args.demo:
        asyncio.run(_demo_run())
    else:
        asyncio.run(_chat_loop(args.session))

if __name__ == "__main__":
    main()

4. End-to-End Walkthrough

Here's exactly what happens when the user types "What is the transformer architecture in machine learning?":

═══════════════════════════════════════════════
  User: "What is the transformer architecture?"
═══════════════════════════════════════════════

[Step 1] LLM extracts topics (sequential)
  → "transformer architecture, attention mechanism"

[Step 2] First gather  fetch + memory restore in parallel
  ├── [ASYNC]  fetch_agent → Wikipedia: "Transformer (deep learning)"
  ├── [ASYNC]  fetch_agent → Wikipedia search: 3 related articles
  ├── [ASYNC]  fetch_agent → HackerNews: top posts matching topic
  └── [THREAD] memory_agent.load_state() → restores prior session from SQLite

  All four finish in ~1.5s (vs ~4.5s sequential)

[Step 2b] Second gather  analysis + embedding in parallel
  ├── [SUBPROC] analysis_script.py → receives 8 articles via stdin JSON
  │             → computes avg relevance, keyword freq, top articles
  │             → returns JSON via stdout
  └── [PROCESS] _compute_embeddings(8 article texts)
                → numpy matrix ops across 128-dim vectors
                → returns 8 normalised vectors

[Step 3] Embed query + find top-4 most relevant articles
  [PROCESS] _compute_embeddings(["What is the transformer..."])
  → cosine similarity ranking → top 4 article indices

[Step 4] LLM synthesizes final answer (sequential)
  → synthesis_chain.ainvoke({question, context, history, analysis})
  → streams response from Ollama

[Step 5] Background tasks (fire-and-forget)
  asyncio.create_task(maybe_summarize(state, llm))   ← async
  asyncio.create_task(save_state(state))             ← threading
  ↑ Both scheduled but NOT awaited  user sees answer immediately

═══════════════════════════════════════════════
  Total wall time: ~3-5s  (sequential ≈ 10-15s)
═══════════════════════════════════════════════

The speedup comes entirely from the parallel phases. Network I/O (Wikipedia + HackerNews) normally dominates latency. By running all three fetch coroutines simultaneously plus memory restore, what would take 3-4 seconds sequentially takes about 1.5 seconds the wall time of the slowest single fetch, not the sum.

5. Running the Project

# 1. Install Ollama and pull the model
# https://ollama.com
ollama pull llama3.2

# 2. Install dependencies
python -m venv .venv
source .venv/bin/activate     # Windows: .venv\Scripts\activate
pip install -r requirements.txt

# 3. Interactive mode
python main.py

# 4. Resume a previous session
python main.py --session my-session-id

# 5. Non-interactive demo (3 turns, triggers token optimization)
python main.py --demo

6. Concurrency Decision Guide

The table below summarises every concurrency decision made in the project. Use it as a reference when designing your own agent systems.

┌─────────────────────────────┬──────────────────────────┬──────────────────────────────────────────┐
│ Workload                    │ Primitive                │ Why                                      │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Multiple HTTP requests       │ asyncio.gather()         │ I/O-bound; event loop multiplexes sockets│
│ (Wikipedia, HackerNews)      │                          │ No threads or processes needed           │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ SQLite reads/writes          │ ThreadPoolExecutor       │ sqlite3 is sync; I/O-bound so GIL ok;   │
│ (memory agent)              │ + run_in_executor        │ threads avoid blocking the event loop    │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Stats over article data      │ subprocess               │ Isolation + hard timeout + future        │
│ (analysis agent)            │ + wait_for timeout        │ sandboxing of dynamic/LLM code           │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ numpy embedding computation  │ ProcessPoolExecutor      │ CPU-bound; needs real parallelism;       │
│ (embedding agent)           │ + run_in_executor        │ GIL bypassed via separate process        │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ LLM calls (Ollama)          │ asyncio await            │ ChatOllama.ainvoke() is native async;    │
│ (both chains)               │                          │ no additional wrapping needed            │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Token optimization          │ asyncio.create_task()    │ Fire-and-forget; no latency impact;      │
│ (post-response)             │                          │ background LLM call                      │
├─────────────────────────────┼──────────────────────────┼──────────────────────────────────────────┤
│ Memory sync                 │ asyncio.create_task()    │ Fire-and-forget; threading handled       │
│ (post-response)             │ + ThreadPoolExecutor     │ inside save_state()                      │
└─────────────────────────────┴──────────────────────────┴──────────────────────────────────────────┘

The core principle: match the primitive to the workload. asyncio for anything that waits on I/O. Threads for synchronous blocking libraries that do I/O. Subprocess when you need crash isolation or hard timeouts. Process pools when you need true CPU parallelism. Mixing all four in the same pipeline each in the right place is what makes the research agent fast and resilient.

7. Series Recap

This six-part series started from theory and ended with a running multi-agent system. Here's how the pieces connect:

The full runnable code, including the GitHub push script and README, is in the repository below.

Full project code: research-agent

← Part 5: Memory & Analysis Agents Back to Home →