01. Why this system exists
This system runs ten independent sales workflows on shared infrastructure. Each one discovers contacts, enriches data, scores fit, reaches out, and learns from the result. A new workflow ships as a graph plus a prompt, not a fresh service. Three goals drove the design. Uniform observability everywhere. No single flow able to sink the rest. And no cross team coordination to add a flow.
The architecture rests on three planes. The control plane owns graph identity and the routing contract. It stays cheap to load, free of any model or database. The data plane runs one worker pool per capability, covering email, enrichment, classification, and discovery. A jammed graph stays inside its own pool, so the blast radius equals the pool, never the platform. The observability plane builds a distributed run tree. That tree crosses the hop from TypeScript to Python, turning one user action into one debuggable thing.
The trade off decided the shape. A monolith with branching prompts has the smallest footprint, but its failure domains collapse into one, so any change risks every flow. Microservices per flow give clean isolation, yet each flow pays a platform tax. So operations cost grows with every addition. The chosen design is a shared runtime fronted by a registry plus per capability workers. Isolation sits at the pool level, and growth is purely additive. The deciding axis was failure domain cost, not lines of code saved.
Adding a capability means three small edits. You add one registry row, one builder, and one route. No service scaffolding, no per team wiring, no coordination call.
One failure mode is worth naming. The registry can become a merge bottleneck as the team count climbs. The first symptom shows up as conflicting pull requests on a single shared file.
Adding a sales workflow is one row in a frozen dataclass tuple — no new service, no new wiring. That tuple is the whole fleet.
apps/agentic-sales/backend/infra/registry.py
Graph identity lives in exactly one frozen dataclass. Both runtimes (FastAPI + langgraph dev) read it; the resumable flag decides whether a graph gets the shared checkpointer.
@dataclass(frozen=True)
class GraphSpec:
assistant_id: str # public id used in /runs/wait, langgraph.json, TS client
module: str # dotted import path, e.g. "graphs.email_compose_graph"
compiled_attr: str = "graph" # module-level symbol referenced in langgraph.json
# Callable(checkpointer) -> CompiledGraph. ``None`` means the module only
# exposes a pre-compiled ``compiled_attr`` instance (built at import time
# with no checkpointer); the FastAPI runtime uses that instance directly
# and the graph runs without persistence. Most graphs implement
# ``build_graph(checkpointer)``; the ones that don't need durability use
# the precompiled form.
builder_attr: str | None = "build_graph"
# True only for graphs that genuinely need to resume from a checkpoint —
# i.e. ones invoked with a stable ``thread_id`` so a SIGKILL mid-run can
# pick up where it left off. Every other graph is invoked with a random
# UUID thread_id (cron + /runs/wait), so its checkpoint rows are written
# and never read. ``core/app.py:_compile_one`` passes ``checkpointer=None``
# when this is False, which keeps ``checkpoint_blobs`` / ``checkpoint_writes``
# from blowing past the Neon storage cap.
apps/agentic-sales/backend/infra/registry.py
The fleet is one tuple of 35 GraphSpecs (first rows shown). To add a capability you add a row — that is the only edit.
GRAPHS: tuple[GraphSpec, ...] = (
# ── Email graphs ────────────────────────────────────────
GraphSpec("email_compose", "graphs.email_compose_graph"),
GraphSpec("email_opportunity", "graphs.email_opportunity_graph"),
GraphSpec("email_reply", "graphs.email_reply_graph"),
GraphSpec("email_outreach", "graphs.email_outreach_graph"),
# Durable-thread campaign engine (reactive, CF-cron driven). One thread per
# (campaign, contact) with a stable ``campaign-<cid>-<contactId>`` thread_id:
# check_reply → guard → generate_touch (email_outreach) → send_touch → record
# → schedule_next → interrupt(wake_at). resumable=True — the D1 checkpointer
# (infra/checkpointer.py) persists state between touches so a CF cron can
# resume the thread days later via Command(resume). See
# specs/2026-06-04-durable-campaign-engine/.
GraphSpec(
"campaign",
"graphs.campaign_graph",
resumable=True,
builder_attr="build_campaign_graph",
),