Python Concurrency Part 5 Memory Agent, Token Optimizer & Analysis Agent

← Back to Home

In Part 4 we built the project skeleton and the asyncio fetch agent. Now we build the two agents that complement it: the memory agent which uses threading to persist conversations to SQLite without blocking the event loop, and the analysis agent which runs in a subprocess for complete isolation and hard timeout guarantees. Between them sits the token optimizer, which keeps the conversation history from growing indefinitely.

Full project code: research-agent

1. The Memory Architecture Two Tiers

The research agent stores conversation history in two places simultaneously. Short-term memory is a ConversationState object held in RAM lookups are instant and no serialisation is needed. Long-term memory is a SQLite database on disk it survives process restarts and allows users to reconnect to prior sessions.

The challenge is synchronisation. Every assistant response must be persisted to SQLite, but SQLite has no async API. Calling conn.execute() directly inside an async def blocks the entire event loop all other agents freeze while the disk write completes. The solution is to push every SQLite operation into a ThreadPoolExecutor so the event loop can continue while the write happens on a background thread.

Short-term (RAM):
    ConversationState.messages     ← instant O(1) access
    ConversationState.articles     ← same Python process, no serialisation
    ConversationState.summary      ← updated in-place after each turn

Long-term (SQLite on disk):
    sessions table   → session_id, summary, updated_at
    messages table   → role, content, timestamp (append-only)
    articles table   → source, title, content, url, relevance

Sync mechanism:
    After every turn: asyncio.create_task(save_state(state))
    ↓
    run_in_executor(ThreadPoolExecutor) → _save_state_sync() on a thread
    ↓
    Returns immediately  event loop is never blocked

2. Memory Agent Full Walkthrough

agents/threaded_memory/memory_agent.py

The shared executor

One ThreadPoolExecutor is created at module import time and shared by all DB operations in the process. This avoids the overhead of creating a new pool for every read or write.

from concurrent.futures import ThreadPoolExecutor
from core.config import config

# One shared executor for all DB operations in the process.
_db_executor = ThreadPoolExecutor(
    max_workers=config.max_thread_workers,
    thread_name_prefix="memory-agent",
)
Database initialisation

_init_db() is a plain synchronous function it creates the three tables if they don't already exist. The public async wrapper init_db() pushes it into the executor using loop.run_in_executor(), which returns an awaitable Future without blocking anything.

def _init_db(db_path: str) -> None:
    """Create tables if they don't exist. Called once at startup."""
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS sessions (
            session_id  TEXT PRIMARY KEY,
            summary     TEXT DEFAULT '',
            updated_at  TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS messages (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            session_id  TEXT,
            role        TEXT,
            content     TEXT,
            timestamp   TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS articles (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            session_id  TEXT,
            source      TEXT,
            title       TEXT,
            content     TEXT,
            url         TEXT,
            relevance   REAL
        )
    """)
    conn.commit()
    conn.close()


async def init_db() -> None:
    """Initialise the database schema (called once at startup)."""
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(_db_executor, _init_db, config.db_path)
    print(f"  [THREAD] memory_agent → DB initialised at '{config.db_path}'")
Saving state the sync worker

_save_state_sync() does the actual SQLite work. It upserts the session summary, appends only new messages (avoiding re-inserting rows already persisted), and replaces the articles for the session. This function always runs inside a worker thread never on the event loop.

def _save_state_sync(state: ConversationState, db_path: str) -> None:
    conn = sqlite3.connect(db_path)
    try:
        # Upsert session summary
        conn.execute("""
            INSERT INTO sessions (session_id, summary, updated_at)
            VALUES (?, ?, ?)
            ON CONFLICT(session_id) DO UPDATE SET
                summary    = excluded.summary,
                updated_at = excluded.updated_at
        """, (state.session_id, state.summary, datetime.now().isoformat()))

        # Append only NEW messages (skip already persisted ones)
        existing = conn.execute(
            "SELECT COUNT(*) FROM messages WHERE session_id = ?",
            (state.session_id,),
        ).fetchone()[0]

        new_messages = state.messages[existing:]
        conn.executemany("""
            INSERT INTO messages (session_id, role, content, timestamp)
            VALUES (?, ?, ?, ?)
        """, [
            (state.session_id, m.role, m.content, m.timestamp.isoformat())
            for m in new_messages
        ])

        # Replace articles for this session
        conn.execute("DELETE FROM articles WHERE session_id = ?", (state.session_id,))
        conn.executemany("""
            INSERT INTO articles (session_id, source, title, content, url, relevance)
            VALUES (?, ?, ?, ?, ?, ?)
        """, [
            (state.session_id, a.source, a.title, a.content, a.url, a.relevance)
            for a in state.articles
        ])

        conn.commit()
    finally:
        conn.close()
Saving state the fire-and-forget async wrapper

This is where the threading and asyncio patterns combine. save_state() is an async def that the orchestrator calls, but it returns immediately. It wraps the sync work in a coroutine and schedules it as a background task. The caller doesn't wait for the disk write.

async def save_state(state: ConversationState) -> None:
    """
    Fire-and-forget state sync to SQLite.

    Uses asyncio.create_task() so the caller doesn't wait for the write.
    The write runs in a thread  the event loop stays unblocked.
    """
    loop = asyncio.get_event_loop()

    async def _do_save():
        await loop.run_in_executor(_db_executor, _save_state_sync, state, config.db_path)
        print(f"  [THREAD] memory_agent → synced session '{state.session_id}' "
              f"({len(state.messages)} msgs, {len(state.articles)} articles)")

    # Non-blocking: schedule and return immediately
    asyncio.create_task(_do_save())

The key insight: asyncio.create_task() schedules the coroutine to run on the event loop but does not wait for it. The orchestrator continues immediately to the next step. When the event loop has a spare moment between I/O waits, it picks up _do_save() and runs it which itself offloads to a thread via run_in_executor(). The entire pipeline is non-blocking end to end.

Loading state

Session restore follows the same pattern but is awaited directly (we need the data before processing the question). _load_state_sync() runs in the thread pool and returns a raw dict, which is then deserialised back into ConversationState on the event loop.

async def load_state(session_id: str, state: ConversationState) -> None:
    loop = asyncio.get_event_loop()
    data = await loop.run_in_executor(
        _db_executor, _load_state_sync, session_id, config.db_path
    )
    if data is None:
        return

    state.summary = data["summary"]
    state.messages = [
        Message(role=m["role"], content=m["content"],
                timestamp=datetime.fromisoformat(m["timestamp"]))
        for m in data["messages"]
    ]
    state.articles = [Article.from_dict(a) for a in data["articles"]]

3. Token Optimizer Rolling LLM Summaries

Every LLM has a context window limit. Without a memory management strategy, a long conversation eventually overflows it. The token optimizer solves this by compressing older messages into a rolling summary using the LLM itself, keeping only the most recent turns verbatim.

utils/token_optimizer.py

async def maybe_summarize(
    state: ConversationState,
    llm: "ChatOllama",
) -> bool:
    threshold = config.summarize_after_n_messages   # e.g. 10
    keep = config.keep_recent_n_messages             # e.g. 4

    if len(state.messages) < threshold:
        return False

    # Split history: messages to compress vs. messages to keep verbatim
    to_compress = state.messages[:-keep]
    to_keep = state.messages[-keep:]

    # Build the LLM prompt  include prior summary if one exists
    history_text = "\n".join(
        f"{m.role.upper()}: {m.content}" for m in to_compress
    )
    if state.summary:
        history_text = (
            f"[Previous summary]: {state.summary}\n\n"
            f"[New messages to add]:\n{history_text}"
        )

    from langchain_core.messages import HumanMessage, SystemMessage

    prompt = [
        SystemMessage(content=(
            "You are a conversation summarizer. "
            "Produce a concise but complete summary of the conversation below. "
            "Preserve key facts, decisions, and context. "
            "Write in third person, past tense. Maximum 150 words."
        )),
        HumanMessage(content=history_text),
    ]

    response = await llm.ainvoke(prompt)
    new_summary = response.content.strip()

    # Update in-memory state  replace old messages with summary
    state.summary = new_summary
    state.messages = list(to_keep)

    # Fire-and-forget: persist the compressed state to SQLite
    asyncio.create_task(save_state(state))
    return True

Notice the flow: maybe_summarize() is itself called as a fire-and-forget task from the orchestrator (asyncio.create_task(maybe_summarize(state, self.llm))). This means the LLM summarisation doesn't add latency to the user's response time it runs while the answer is being displayed or while the user is reading it. The user only experiences compression on their next turn, where the context is already smaller.

The build_history_context() helper formats what gets injected into each synthesis prompt: the rolling summary (if any) plus the last few verbatim messages:

def build_history_context(state: ConversationState) -> str:
    parts = []
    if state.summary:
        parts.append(f"[Conversation summary so far]:\n{state.summary}")
    if state.messages:
        recent = "\n".join(
            f"{m.role.upper()}: {m.content}" for m in state.messages[-4:]
        )
        parts.append(f"[Recent messages]:\n{recent}")
    return "\n\n".join(parts) if parts else "No prior conversation."

4. Analysis Agent Subprocess for Isolation

The analysis agent computes statistics over the fetched articles: source breakdown, average relevance scores, top keywords by frequency, and top articles ranked by relevance. None of this requires a network call or the LLM it's a self-contained computation. The choice to run it in a subprocess rather than as a regular async function comes down to three guarantees: isolation, timeout, and safety.

If the analysis logic crashes (divide-by-zero, memory overflow, a bug in a future version) it doesn't bring down the orchestrator. If it hangs, we can kill it with a hard deadline. And in production, if you replace this with LLM-generated code for dynamic analysis, the subprocess sandbox ensures that code can't access the main process's memory or file handles.

The analysis script (child process)

agents/subprocess_analysis/analysis_script.py

The script reads a JSON array from stdin, does all computation in pure Python (no dependencies), and prints a JSON object to stdout. That's the entire protocol simple, robust, and testable in isolation.

"""
Protocol:
  stdin  → JSON array of article dicts
  stdout → JSON object with analysis results
  stderr → error messages (if any)
"""
import json, sys
from collections import Counter
from statistics import mean, stdev

def analyse(articles: list[dict]) -> dict:
    # Per-source stats
    by_source: dict[str, list[float]] = {}
    for a in articles:
        by_source.setdefault(a["source"], []).append(a["relevance"])

    source_stats = {
        src: {"count": len(scores), "avg_relevance": round(mean(scores), 3)}
        for src, scores in by_source.items()
    }

    # Top articles by relevance
    sorted_articles = sorted(articles, key=lambda x: x["relevance"], reverse=True)
    top_articles = [
        {"title": a["title"], "source": a["source"], "relevance": a["relevance"]}
        for a in sorted_articles[:3]
    ]

    # Keyword frequency
    STOPWORDS = {
        "the", "and", "for", "this", "that", "with", "from", "have",
        "been", "will", "are", "was", "its", "also", "their", "into",
    }
    all_text = " ".join(
        f"{a['title']} {a['content']}" for a in articles
    ).lower()
    words = [
        w.strip(".,!?;:'\"()[]")
        for w in all_text.split()
        if len(w) > 4 and w not in STOPWORDS
    ]
    keyword_freq = Counter(words).most_common(10)

    all_scores = [a["relevance"] for a in articles]
    overall = {
        "count": len(articles),
        "avg_relevance": round(mean(all_scores), 3),
        "std_relevance": round(stdev(all_scores), 3) if len(all_scores) > 1 else 0.0,
    }
    return {
        "overall": overall, "by_source": source_stats,
        "top_articles": top_articles, "top_keywords": keyword_freq,
    }

if __name__ == "__main__":
    try:
        raw = sys.stdin.read()
        articles = json.loads(raw)
        result = analyse(articles)
        print(json.dumps(result))     # parent process reads this from stdout
    except Exception as exc:
        print(json.dumps({"error": str(exc)}), file=sys.stderr)
        sys.exit(1)
The analysis agent (parent process)

agents/subprocess_analysis/analysis_agent.py

The parent uses asyncio.create_subprocess_exec() the async-friendly way to spawn processes. Unlike subprocess.Popen(), proc.communicate() is a coroutine that can be awaited without blocking the event loop. The asyncio.wait_for() wrapper enforces a hard deadline if the script takes longer than config.subprocess_timeout_seconds, the process is killed and a fallback AnalysisReport is returned.

async def run_analysis(articles: list[Article]) -> AnalysisReport:
    payload = json.dumps([a.to_dict() for a in articles]).encode()

    # Spawn the subprocess  non-blocking
    proc = await asyncio.create_subprocess_exec(
        sys.executable,           # same Python interpreter as the parent
        str(_SCRIPT_PATH),
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    try:
        # Send JSON via stdin, receive JSON via stdout  awaitable
        stdout, stderr = await asyncio.wait_for(
            proc.communicate(input=payload),
            timeout=config.subprocess_timeout_seconds,   # hard deadline (e.g. 15s)
        )
    except asyncio.TimeoutError:
        proc.kill()
        await proc.wait()
        print(f"  [SUBPROC] TIMEOUT  process killed")
        return AnalysisReport(article_count=len(articles), ...)

    if proc.returncode != 0:
        err = stderr.decode().strip()
        print(f"  [SUBPROC] script error: {err}")
        return AnalysisReport(article_count=len(articles), ...)

    data = json.loads(stdout.decode())
    overall = data.get("overall", {})
    return AnalysisReport(
        article_count=overall.get("count", len(articles)),
        by_source={src: stats["count"] for src, stats in data.get("by_source", {}).items()},
        avg_relevance=overall.get("avg_relevance", 0.0),
        top_articles=data.get("top_articles", []),
        top_keywords=data.get("top_keywords", []),
    )

5. Why Each Pattern Was Chosen Here

The threading and subprocess choices here are not arbitrary. The table below shows the decision matrix for these two agents:

┌─────────────────────┬────────────────────┬──────────────────────────┐
│ Agent               │ Pattern            │ Reason                   │
├─────────────────────┼────────────────────┼──────────────────────────┤
│ memory_agent        │ ThreadPoolExecutor │ sqlite3 has no async API; │
│ (DB reads/writes)   │                   │ I/O-bound → GIL releases; │
│                     │                   │ threads are sufficient    │
├─────────────────────┼────────────────────┼──────────────────────────┤
│ token_optimizer     │ asyncio task       │ LLM call is already async;│
│ (LLM summarization) │ (fire-and-forget)  │ no blocking  pure await │
├─────────────────────┼────────────────────┼──────────────────────────┤
│ analysis_agent      │ subprocess         │ Crash isolation needed;  │
│ (stats computation) │                   │ hard timeout enforced;    │
│                     │                   │ could run sandboxed code  │
└─────────────────────┴────────────────────┴──────────────────────────┘

The memory agent does not use subprocess because subprocess adds serialisation overhead (you'd have to marshal the entire ConversationState back and forth as JSON on every write) and crash isolation is not needed for stable library code like sqlite3. Threads are the right weight class for offloading a blocking sync call while staying in the same process.

The analysis script does not use threads because we want crash isolation and a kill-able hard timeout. A crashing thread takes down the whole process; a killed subprocess does not.

6. Conclusion

We now have three of the four concurrency layers working: asyncio for fetching, threading for persistence, and subprocess for analysis. Each agent can run concurrently with the others because they communicate through well-defined interfaces (SQLite schema, JSON pipe protocol, in-memory state) rather than shared mutable data structures.

In Part 6 we complete the pipeline with the ProcessPoolExecutor embedding agent (CPU-bound numpy work), the LangChain + Ollama orchestrator that ties everything together, and a full end-to-end walkthrough of what happens when you ask the agent a question.

← Part 4: Architecture & Fetch Agent Next: Embeddings & Orchestrator →