Python Concurrency Part 4 Research Agent: Architecture & Async Fetch Agent

← Back to Home

In the first three parts of this series we covered the theory: asyncio for I/O concurrency, subprocess for isolation, and threading for bridging sync libraries. Now we put all three to work in a single project.

We're building a Research Agent an agentic chatbot that answers research questions by fetching live data from Wikipedia and HackerNews, running stats analysis, generating embeddings, and synthesizing an answer via a local Ollama LLM. Each part of the pipeline uses a different concurrency tool, chosen deliberately for its workload type.

Full project code: research-agent

1. What We're Building

Ask the Research Agent a question like "What is the transformer architecture?" and it will:

  1. Extract search keywords from the question using an LLM (LangChain + Ollama)
  2. Simultaneously fetch articles from Wikipedia and HackerNews asyncio
  3. Save the conversation to SQLite in the background threading
  4. Run keyword and relevance analysis in an isolated script subprocess
  5. Generate embeddings to find the most relevant articles ProcessPoolExecutor
  6. Synthesize a final answer with the LLM
research_agent/
├── main.py                              # CLI entry point
├── requirements.txt
├── core/
│   ├── config.py                        # All tunable parameters
│   ├── models.py                        # Shared dataclasses
│   └── state.py                         # In-memory session registry
├── agents/
│   ├── async_fetch/
│   │   └── fetch_agent.py               # asyncio  Wikipedia + HackerNews
│   ├── threaded_memory/
│   │   └── memory_agent.py              # threading  SQLite memory
│   └── subprocess_analysis/
│       ├── analysis_agent.py            # subprocess caller
│       └── analysis_script.py           # isolated analysis script
├── utils/
│   ├── embeddings.py                    # ProcessPoolExecutor
│   └── token_optimizer.py              # LLM summarization
└── orchestrator/
    └── pipeline.py                      # LangChain + Ollama orchestrator

2. The Architecture

The orchestrator sits at the top and coordinates everything. Two things happen in sequence (they depend on each other), and four things happen in parallel (they're independent):

User Question
      │
      ▼
┌──────────────────────────────────────────────────┐
│                  ORCHESTRATOR                    │
│                                                  │
│  Step 1: LLM extracts search topics (sequential) │
│                                                  │
│  Step 2: asyncio.gather()  all in parallel:     │
│    ├── fetch_agent      ← asyncio                │
│    ├── memory restore   ← threading              │
│    ├── analysis_agent   ← subprocess             │
│    └── embedding_agent  ← ProcessPoolExecutor    │
│                                                  │
│  Step 3: LLM synthesizes answer (sequential)     │
│                                                  │
│  Background (fire-and-forget):                   │
│    ├── token_optimizer  ← asyncio task           │
│    └── memory save      ← threading task         │
└──────────────────────────────────────────────────┘

The decision of what runs in parallel vs sequence is the orchestrator's core job. The fetch, memory, analysis, and embedding agents are all independent of each other for a given question they can all start simultaneously. The LLM synthesis must wait because it needs the articles, embeddings, and analysis report first.

3. Core Layer Config, Models, State

config.py

All tunable parameters in one place no magic numbers scattered across files.

core/config.py

from dataclasses import dataclass

@dataclass
class Config:
    # LLM
    ollama_model: str = "llama3.2"
    ollama_base_url: str = "http://localhost:11434"
    llm_temperature: float = 0.3

    # Memory
    db_path: str = "agent_memory.db"
    max_thread_workers: int = 2

    # Token optimization
    summarize_after_n_messages: int = 6
    keep_recent_n_messages: int = 3

    # Fetch
    fetch_timeout_seconds: int = 8
    max_articles_per_source: int = 3

    # Subprocess
    subprocess_timeout_seconds: int = 15

    # Embeddings
    embedding_dim: int = 32
    embedding_workers: int = 2

config = Config()
models.py Shared Dataclasses

Pure dataclasses with no business logic. Every agent operates on these types, keeping the interfaces consistent.

core/models.py

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class Article:
    source: str          # "Wikipedia", "HackerNews", etc.
    title: str
    content: str
    url: str
    relevance: float = 0.8

    def to_dict(self) -> dict:
        return {"source": self.source, "title": self.title,
                "content": self.content, "url": self.url, "relevance": self.relevance}

@dataclass
class Message:
    role: str            # "user" | "assistant"
    content: str
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class ConversationState:
    """
    Two-tier memory:
      Short-term → messages list (in RAM, instant)
      Long-term  → synced to SQLite via threading
    """
    session_id: str
    messages: list[Message] = field(default_factory=list)
    summary: str = ""                    # rolling summary of older messages
    articles: list[Article] = field(default_factory=list)
    embeddings: list[list[float]] = field(default_factory=list)
    last_analysis: Optional[object] = None
state.py Session Registry

A lightweight in-memory registry. In production this would be Redis or a distributed store; for a single-process bot a dict is sufficient.

import uuid
from core.models import ConversationState

_sessions: dict[str, ConversationState] = {}

def get_or_create(session_id: str | None = None) -> ConversationState:
    if session_id is None:
        session_id = str(uuid.uuid4())[:8]
    if session_id not in _sessions:
        _sessions[session_id] = ConversationState(session_id=session_id)
    return _sessions[session_id]

4. The Fetch Agent asyncio in Practice

The fetch agent hits three sources simultaneously. All three are network I/O the ideal workload for asyncio. We use aiohttp (an async HTTP client) so every await session.get(url) suspends only that coroutine, letting the others make progress.

agents/async_fetch/fetch_agent.py

import asyncio
import aiohttp
from core.config import config
from core.models import Article

_TIMEOUT = aiohttp.ClientTimeout(total=config.fetch_timeout_seconds)

async def _fetch_wikipedia_summary(session, query: str) -> list[Article]:
    url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{query.replace(' ', '_')}"
    try:
        async with session.get(url, timeout=_TIMEOUT) as resp:
            if resp.status != 200:
                return []
            data = await resp.json()
            return [Article(
                source="Wikipedia",
                title=data.get("title", ""),
                content=data.get("extract", "")[:600],
                url=data.get("content_urls", {}).get("desktop", {}).get("page", ""),
                relevance=0.92,
            )]
    except Exception as exc:
        print(f"  [FETCH] wikipedia_summary error: {exc}")
        return []

async def _fetch_hackernews(session, query: str) -> list[Article]:
    url = f"https://hn.algolia.com/api/v1/search?query={query.replace(' ', '+')}&tags=story"
    try:
        async with session.get(url, timeout=_TIMEOUT) as resp:
            if resp.status != 200:
                return []
            data = await resp.json()
            return [
                Article(source="HackerNews", title=h["title"],
                        content=h.get("story_text", h.get("title", ""))[:400],
                        url=h.get("url", ""), relevance=0.70)
                for h in data.get("hits", [])[:3] if h.get("title")
            ]
    except Exception as exc:
        print(f"  [FETCH] hackernews error: {exc}")
        return []

The public entry point fires all three sources at once:

async def run_fetch_agents(query: str) -> list[Article]:
    """
    All three fetches start simultaneously.
    Total time = slowest source, not sum of all sources.
    return_exceptions=True: one failing source doesn't abort the others.
    """
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            _fetch_wikipedia_summary(session, query),
            _fetch_wikipedia_search(session, query),
            _fetch_hackernews(session, query),
            return_exceptions=True,
        )
    return [a for batch in results if isinstance(batch, list) for a in batch]

Three sources with latencies of 0.8s, 1.2s, and 0.5s complete in ~1.2 s total, not 2.5 s. The return_exceptions=True flag is critical for resilience if Wikipedia is rate-limiting, HackerNews results still arrive.

5. Setup & Dependencies

# requirements.txt
aiohttp>=3.9.0
langchain-core>=0.2.0
langchain-ollama>=0.1.0
numpy>=1.26.0
# Install Ollama and pull the model
# https://ollama.com
ollama pull llama3.2

# Install Python dependencies
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

6. Conclusion

This post established the project foundation: the file structure, the shared data models, the session registry, and the first agent the asyncio-powered fetch layer. The key takeaway from the fetch agent is that asyncio.gather() is not just a convenience it's a fundamental shift in how wall time scales with the number of I/O sources.

In Part 5 we build the memory agent (threading + sqlite3) and the analysis agent (subprocess) the two agents that handle persistence and isolated computation.

← Part 3: Threading Next: Memory & Analysis Agents →