Mental Model
The opening scene: one FastAPI surface, sixty-plus specialized StateGraph instances, one shared Neon-hosted AsyncPostgresSaver checkpoint store, and one Langfuse tracing pipeline. That is the lead-gen sales-intelligence backend. It is built almost entirely on LangGraph, but it looks nothing like the three-node tutorial graph you probably learned on. Most introductions show a single graph with three nodes, a linear flow, a clean state. Lead-gen is the opposite end of the spectrum. Dozens of graphs compose, fan out, fan in, and call each other across container boundaries. And still, every node writes to a Postgres-backed shared state, and every large language model call lands in Langfuse with an actual US dollar cost attached.
The system lives in three containers. The core container is the hub. Its harness lives at apps slash lead-gen slash backend slash core slash app dot py. That file, lines one through forty-eight, declares a FastAPI application that compiles every in-process graph against that shared Neon AsyncPostgresSaver. For the eight cross-container boundaries enumerated in core dot remote graphs dot underscore ADAPTER BUILDERS, it exposes thin RemoteGraph adapters under app dot state dot remote adapters. That means individual graphs can register a remote node with a simple import and call, like from core dot remote graphs import get underscore jobbert underscore ner underscore adapter, then builder dot add node with the string extract underscore skills and the adapter function. The HTTP surface at slash runs slash wait plus slash threads asterisk matches exactly what the client-side TypeScript library in src slash lib slash langgraph-client dot ts already calls. So flipping the LANGGRAPH URL environment variable on the client side is the only cut-over required. There is a bearer-token middleware that gates every non-public path when LANGGRAPH AUTH TOKEN is set. The dispatcher worker forwards that header. And the thirteen LinkedIn-extension REST routes that used to live in scripts slash linkedin posts server dot py are now mounted at slash linkedin asterisk, so the core container shares its Neon pool with the extension traffic: one connection pool, one deploy.
The endpoints are familiar. GET slash health is a cheap liveness probe that does not touch the database or any language model. POST slash runs slash wait takes an assistant identifier, input, and an optional thread identifier and returns the final graph state. POST slash threads mints a new thread identifier. POST slash threads slash thread ID slash runs is fire-and-forget; a webhook carries the result. GET slash threads slash thread ID slash runs slash run ID lets you poll run status. There are two dispatch endpoints: POST slash dispatch slash positioning-all fans out positioning over all products, and POST slash dispatch slash lead-gen-teams fans out the lead gen team over products times segments. And then there is the slash linkedin wildcard for the Chrome extension API.
You can run it locally with uvicorn app colon app dash dash host zero dot zero dot zero dot zero dash dash port eight thousand.
Now, picture the mental model. The system is structured around four loose layers. The first is atomic graphs. These are small, single-purpose StateGraphs like classify underscore paper, score underscore contact, or country underscore classify. One to three nodes each. They produce deterministic JSON output. No human in the loop. They are the building blocks.
The second layer is pipeline graphs. These are composite graphs like pipeline, product underscore intel, email underscore outreach. They fan out to many parallel workers using the Send API and collect results through reducer-merged state slots. So you start a pipeline, it dispatches work into parallel branches, and then gathers everything back together with reducer functions that merge the state.
The third layer is supervisor graphs. These are graphs that ainvoke other compiled graphs as subgraphs. Take product underscore intel. It runs deep underscore ICP, pricing, GTM, and deep underscore competitor in turn. But it tolerates partial failure. Instead of crashing when one subgraph fails, it records errors in a subgraph underscore errors dictionary and continues with the remaining work.
The fourth layer is remote graphs. These live in other containers: leadgen dash ml, leadgen dash research, leadgen dash scrape. You reach them through the RemoteGraph adapter, and there is a circuit breaker in front of every adapter call. So if a remote container is down, the circuit breaker opens and the system fails fast rather than hanging on a network timeout.
That is the high-level architecture. The rest of this guide is a tour of the wiring. It assumes you already know LangGraph fundamentals. You are not here to learn what a StateGraph is or how a node function works. You are here to see how sixty-plus graphs compose under a single FastAPI surface, how they share state through one Postgres-backed checkpoint store, how they trace every call through one Langfuse pipeline, and how they tolerate failures across container boundaries.
There is no diagram you need to see. Just hold this picture in your mind. A FastAPI router receives a run request. It looks up the graph in a registry of about sixty entries. The graph might be atomic, a single-purpose node that classifies a paper. Or it might be a pipeline that fans out to twenty parallel enrichment workers. Or a supervisor that runs four subgraphs in sequence, catching errors as it goes. Or a graph that reaches into the ML container via a remote adapter, crossing a container boundary with a circuit breaker in the path. Every step writes checkpoints to Neon. Every language model call sends a trace to Langfuse. And every thread progresses through its state without you having to manage queues, retries, or manual storage.
The goal is to give you the patterns so you can apply them in your own systems. Not the tutorial examples. The real ones.
Three-Container Split
You have three containers: core, ML, and research. They form the deployment topology for lead-gen, a LangGraph system. Each container has a distinct responsibility, and they communicate over HTTP, never sharing memory or a filesystem. The core container is the brain. It holds the FastAPI harness, the registry, and every graph that needs in-process Postgres checkpointing. FastAPI serves the LangGraph API at port 7860, as you see in the app dot py setup. The registry lives in a file at leadgen_agent/registry.py, which is the single source of truth for graph identity. The core container also runs AsyncPostgresSaver backed by Neon, so threads survive the sleep-wake cycle of a Hugging Face Spaces free tier. Only the core container writes checkpoints; the other two containers never write a checkpoint directly.
The ML container is dedicated to two heavy models. It runs JobBERT for named entity recognition, and BGE M3 for embeddings. In the code, you find these as get_jobbert_ner_adapter and BgeM3EmbedInput. The research container handles data acquisition. It runs scholar fetchers, Common Crawl scrapers, and Playwright crawlers. The source code shows CommonCrawlInput and other related inputs. Both ML and research containers are stateless from the checkpoint perspective; they produce outputs that the core container consumes.
How do they talk? Every cross-container call from core into leadgen-ml or leadgen-research is wrapped inside a remote graph adapter in the file apps/lead-gen/backend/core/remote_graphs.py, lines 1 through 60. Each adapter validates its input dict against a matching input Pydantic contract from leadgen_agent.contracts before the HTTP round trip. It validates the returned dict against the matching output contract after the round trip. If the remote side responds with a schema version the caller does not recognize, the adapter raises ContractsVersionMismatch. This surfaces shape drift at the very first call instead of at the leaf of some deep response stream. The public surface of every adapter is ainvoke with state and config, the same shape a compiled in-process StateGraph exposes. So a core graph can register a remote node by calling, for example, get_jobbert_ner_adapter and passing it to builder.add_node. URLs are read from environment variables at adapter build time: ML_URL for lead-gen-ml and RESEARCH_URL for lead-gen-research. Bearer auth is forwarded via ML_INTERNAL_AUTH_TOKEN and RESEARCH_INTERNAL_AUTH_TOKEN.
The core container uses LangGraph's RemoteGraph class from langgraph.dot pregel dot remote. The remote graph adapter in remote_graphs.py imports RemoteGraph and uses it under the hood. It also uses httpx and asyncio to perform the HTTP calls. The adapter is built once at import time, not per request, so the URL and auth token are fixed at container startup.
Why does this split exist? Three reasons. First, independent scaling. The core container handles request routing and checkpointing; it may need more replicas than the ML or research containers. The ML container runs a heavy PyTorch model for JobBERT; scaling that independently means you can allocate GPU or dedicated CPU resources without affecting the core hot path. The research container runs Playwright, which is a full browser automation tool; that brings heavy native dependencies and high memory usage. Isolating those dependencies off the core hot path prevents them from bloating the core container's image or slowing its cold start. Second, separate cold-start budgets. Hugging Face Spaces free tier has a cold start every time the Space wakes from sleep. The README describes the sleep-wake cycle for the core container. The ML and research containers may have different cold-start tolerances. By splitting them, you can tune each container's startup time and resource allocation independently. Third, security and isolation. The core container holds the Postgres checkpoint writer credential. By ensuring that ML and research containers never write checkpoints, you limit the blast radius of a potential compromise in those containers. They talk only through validated HTTP contracts, not through shared database connections.
The file apps/lead-gen/backend/README.md lines 1 through 90 gives you the context for the two runtimes that share the same graph code. One is langgraph dev with in-memory checkpointer for local development on port 8002. The other is app dot py with FastAPI and uvicorn for containerized deployment on port 7860. The ML and research containers also run their own FastAPI servers, but those servers expose only the remote graph endpoints, not the full LangGraph API. They do not need a registry or checkpointing.
In summary, the three-container split separates the core runtime from the compute-heavy and dep-heavy services. Core owns state persistence and routing. ML owns entity extraction and embedding. Research owns web data gathering. They communicate through a clean contract layer that catches version mismatches early. You get independent scaling, isolated cold starts, and a reduced attack surface. The code in remote_graphs.py makes this transparent to the core graph logic: each remote node looks like any other node, validating incoming and outgoing data before and after the HTTP round trip.
The Graph Registry
The registry file lives at apps/lead-gen/backend/leadgen_agent/registry.py, and its opening comment declares it the single source of truth for the lead-gen LangGraph registry. Both runtimes read graph identity from this file: the local langgraph dev server on port 8002 and the FastAPI Cloudflare Containers app at core/app.py. To add a graph you drop a row in the GRAPHS tuple and run make gen-langgraph-json. That is the only edit needed. The module is deliberately dependency-free. It imports nothing from any leadgen_agent.*_graph module at the top level, so the JSON generator can build the registry without compiling the more than sixty graphs, and without dragging in optional dependencies that some graph modules import at import time.
The backbone of the registry is a frozen dataclass called GraphSpec. It has five fields. assistant_id is a string, the public identifier used in the /runs/wait endpoint, in langgraph.json, and by the TypeScript client. module is a dotted import path, for example leadgen_agent.email_compose_graph. compiled_attr defaults to the string "graph"; it names the module-level symbol that holds the compiled graph in langgraph.json. builder_attr is either None or a string defaulting to "build_graph". When it is None, the module exports only a precompiled instance via the compiled_attr attribute, built at import time without a checkpointer. The FastAPI runtime uses that instance directly, and the graph runs without persistence. Most graphs implement a build_graph(checkpointer) callable, so they use the default builder attribute. The fifth field is the crucial resumable flag, a boolean that defaults to False.
That resumable flag is where the design gets interesting. It is set to True only for graphs that genuinely need to resume from a checkpoint, meaning graphs invoked with a stable thread identifier so that a SIGKILL mid-run can pick up where it left off. Every other graph is invoked with a random UUID thread identifier, both from the cron job and from the /runs/wait endpoint. Because the thread identifier changes each time, the checkpoint rows written for those runs are never read. They accumulate. To prevent the checkpoint_blobs and checkpoint_writes tables from blowing past the Neon storage cap, core/app.py:_compile_one passes checkpointer=None when resumable is False. So the vast majority of graphs, more than sixty of them, set resumable to False. They write no checkpoint data at all, and the Neon database stays lean. Only one graph keeps a stable thread identifier: the research loop graph. Its job requires it to survive container restarts, so a cron job can resume the same thread. That graph therefore sets resumable to True.
The GRAPHS tuple itself is ordered for presentation only; runtime resolution is by assistant_id. The tuple starts with email graphs: email_compose, email_opportunity, email_reply, email_outreach, and paper_author_interview. Then come chat and SQL graphs: admin_chat and text_to_sql. Product intelligence follows: deep_icp, icp_team, competitors_team, deep_competitor, pricing, gtm, product_intel, and a key module-stem mismatch – analyze_product_v2 registers under a different id but points to leadgen_agent.product_intel_v2_graph. Then freshness, positioning, and lead_gen_team. Contact enrichment graphs appear next: contact_enrich, contact_enrich_sales, contact_enrich_paper_author, and a batch fan-out variant, contact_enrich_paper_authors_batch, which uses the same module but different exported symbols: compiled_attr="batch_graph" and builder_attr="build_batch_graph". Classification graphs follow: classify_paper, classify_recruitment, and classify_recruitment_bulk – the excerpt cuts off there, but the full tuple runs well past sixty entries.
Once the registry is defined, the /runs/wait endpoint in the FastAPI application dispatches runs. That endpoint, declared in core/app.py, accepts assistant_id, input, and an optional thread_id. It looks up the assistant identifier in the GRAPHS registry, retrieves the corresponding GraphSpec, compiles the graph if necessary, and calls ainvoke on the compiled graph. The endpoint surface matches what the client library at src/lib/langgraph-client.ts already calls. So flipping the LANGGRAPH_URL client-side is the only cut-over needed when you switch environments.
Finally, the file core/langgraph.json is generated from the GRAPHS tuple by the script backend/scripts/gen_langgraph_json.py. Because the registry is the single source of truth, the local langgraph dev server and the production harness see the exact same dispatch table. They both read the same module paths, the same compiled attribute names, the same builder functions. There is no duplication, no drift. You add a graph to the tuple, run the generation script, and both runtimes instantly know about it. That is the power of a single registry.
State and Reducers
Reducers are the most under-appreciated part of running a real LangGraph fleet. When you design a state machine that fans out work to multiple parallel nodes, you immediately bump into the question: what happens when two nodes both try to write to the same key in the state dictionary? LangGraph, by default, panics. It raises an error named INVALID_CONCURRENT_GRAPH_UPDATE because the framework sees two conflicting updates and has no built-in rule to decide which one to keep. Without a reducer, your pipeline crashes at runtime. That is why every parallel fan-out graph in the fleet needs reducers, and specifically the two you see in apps/lead-gen/backend/leadgen_agent/state.py starting at line one.
Let us walk through the first reducer, _merge_dict. Open your mental editor to lines four through fifteen of that file. The function signature is def _merge_dict(left: dict[str, Any] | None, right: dict[str, Any] | None) -> dict[str, Any]:. The two arguments are optional dictionaries — left is the current state value for that key, and right is the new value coming from a node update. The return type is a plain dictionary. The purpose, as the docstring says, is to be a reducer for dict state keys that multiple parallel nodes write to. Without it, LangGraph raises INVALID_CONCURRENT_GRAPH_UPDATE when two fan-out nodes both emit e.g. agent_timings — the graph cannot pick one.
Now read the body line by line. The first line inside the function: out: dict[str, Any] = dict(left or {}). This creates a new dictionary out from the existing left dictionary, or an empty dictionary if left is None. The or operator is crucial — if the left value is None, you dont want to pass None into the dict constructor because that raises a TypeError. Instead, you default to an empty dict. Next: if right: — if the right argument is truthy, i.e. not None and not an empty dict, then you call out.update(right). This merges all keys from the new update into the existing dictionary. Keys already present in out are overwritten by values from right — that is standard dictionary update behavior. Finally, return out. That is the entire logic. Simple, but without it, LangGraph sees two updates arriving at the same step and has no way to combine them. The reducer tells LangGraph: when there is a conflict, merge the dictionaries by taking the union, with later writes winning on a per-key basis. That is enough to satisfy the framework and let your parallel nodes coexist.
But what if the dictionary itself contains nested sub-dictionaries, and you want to merge those too without losing data? That is where the second reducer comes in: _merge_graph_meta. Find it at lines seventeen through thirty-two in the same file. The docstring explains that this reducer is for the graph_meta key on graphs that also write per-node telemetry. When parallel fan-out nodes each emit a dictionary like {"graph_meta": {"telemetry": {"<node_name>": {...}}}}, a plain update() would clobber the entire telemetry sub-dict when two branches arrive simultaneously. The last write would win, and you would lose all telemetry from the other branch. That is unacceptable for observability.
Read the function line by line. It starts the same way: out: dict[str, Any] = dict(left or {}). Then if not right: return out — if there is no update, just return the existing state. Then a for k, v in right.items(): loop iterates over every key in the incoming update. Inside the loop, there is an if condition: if k == "telemetry" and isinstance(v, dict):. When the key is exactly "telemetry" and the value is a dictionary, you do a key-wise merge. merged = dict(out.get("telemetry") or {}) — you get the existing telemetry sub-dict, or an empty one. Then merged.update(v) — you add all the per-node entries from the new update. Then out["telemetry"] = merged — you assign the merged sub-dict back. For any other key — version, graph, model, run_at, agent_timings, totals — you take the else branch and do out[k] = v. That is last-write-wins: the latest update for that key simply overwrites the previous value. The result is a shallow merge on top-level keys, plus a careful deep-ish merge on the telemetry sub-dict so that telemetry from different parallel Send workers accumulates instead of overwriting each other.
You might wonder how LangGraph knows to call these functions. The answer is the Annotated type hint. Python's typing.Annotated lets you attach metadata to a type. LangGraph reads that metadata — specifically the second argument — and uses it as the reducer for that state field. You import Annotated from typing as you see on line seven of the source file. Then on a state field like agent_timings, you would write agent_timings: Annotated[dict, _merge_dict]. That tells LangGraph: whenever multiple nodes try to write to agent_timings in the same step, use _merge_dict to combine the values. Similarly, for a field named graph_meta you would write graph_meta: Annotated[dict, _merge_graph_meta]. The reducer function becomes part of the type annotation and is discovered at graph construction time.
This same pattern — an Annotated field with a custom reducer — recurs across every parallel fan-out graph in the fleet. The lead generation pipeline you are listening to now is just one example. Any graph that uses Send to spawn parallel workers and expects those workers to write to a shared state key must supply a reducer, or accept the INVALID_CONCURRENT_GRAPH_UPDATE error. You will see _merge_dict used for flat dictionaries like agent_timings or totals, and _merge_graph_meta for the richer nested telemetry structure. The shape is always the same: a reducer function that takes left and right optional dicts and returns a merged dict. And LangGraph, when it detects concurrent updates, calls the reducer on the fly.
So when you are debugging a parallel graph and see that error, you know exactly where to look: the state definition. Find the key that both branches are writing to. It either lacks a reducer entirely, or the reducer is too shallow and is clobbering nested data. The solution is to write a reducer like _merge_graph_meta that understands the nested structure. It is not glamorous work, but it is the backbone that keeps your fleet from falling apart under concurrency. Reducers are the unsung heroes of state management in LangGraph, and now you know exactly how they tick.
Send-Based Fan-Out
Picture a single-threaded pipeline. One node runs, finishes, and passes its output to the next node in line. LangGraph handles that natively — each node reads the shared state, writes its result, and the graph scheduler moves on. But what happens when the output of one node is a list of a hundred companies, and each company needs its own independent enrichment run? You could loop over them sequentially inside a single node, but that defeats concurrency. You could spawn threads manually, but then you have to manage shared state, locks, and error propagation yourself. LangGraph offers something better: the Send API.
Send is LangGraph's built-in mechanism for dynamic fan-out. It works like a message queue inside the graph. A node returns a list of Send objects. Each Send object contains the name of a target node and a payload — typically a dictionary of state updates. LangGraph's scheduler sees that list and spawns one invocation of the target node per Send item, running them concurrently. Each invocation runs independently, reads from the shared state, and writes its own results. The key question is: how do you collect those results without race conditions? The answer is reducers.
The canonical pattern is queue, dispatch, worker, collect. A queue node prepares a list of work items and dispatches them via Send. The worker nodes, each with a specific slice of the work, run concurrently. The collector node fans them back in. The collector does not run until all concurrent workers have finished. LangGraph's reducer fan-in mechanism ensures that. When multiple workers write to the same state field, the reducer function merges their contributions deterministically. No locking needed.
Now let's ground this in the real pipeline. The file at apps slash lead-gen backend slash leadgen_agent slash pipeline_graph dot py contains the topology in a code comment. Let me read it to you as a roadmap. The graph starts at a node called discover. That feeds into enrich_queue. From enrich_queue, a Send action fans out to multiple enrich_one workers. Each enrich_one worker converges on enrich_collect. Below that, contacts_queue fans out to contact_one and then to contact_collect. Next comes qa_counts, leading to qa_critic. Then outreach_queue, an interrupt gate unless auto_confirm is true, then outreach_run, and finally END. The comment says explicitly: Topology — each Send-fanout phase is queue, dispatch, worker, collect.
Focus on the enrichment leg. The node named enrich_queue receives the output of the discover node: a list of company identifiers. Inside enrich_queue, the code builds a list of Send objects. Each Send is keyed by a company identifier. The target node is enrich_one, and the payload is a state update containing that single company ID. So if the discover node found fifty companies, enrich_queue returns fifty Send objects. LangGraph schedules every enrich_one invocation concurrently. You get fifty parallel invocations of enrich_one, each enriching a different company.
Those fifty invocations write to the same state field. The field is called underscore enrich results, and it has a reducer. The reducer is typically a merge function, such as operator dot add for list fields or a custom dict merge. In this pipeline's state module at state dot py, lines one through sixty, you will find two reducer functions: underscore merge dict and underscore merge graph meta. The first one, underscore merge dict, does a straightforward dictionary update. The second, underscore merge graph meta, handles nested telemetry data. These reducers are what make concurrent writes safe.
Here is how it works. Each enrich_one worker writes to underscore enrich results with its own company identifier as the key. The workers write disjoint keys. No two workers ever write to the same key. When LangGraph detects that multiple workers are trying to write to the same state field simultaneously, it applies the reducer. The reducer updates the dictionary with the new key-value pairs. Since the keys are disjoint, there is no conflict. The reducer simply composes them. No locking required. No race condition. No lost data.
The collector node, enrich_collect, is a fixed node in the graph. It runs after all enrich_one workers have finished. The reducer fan-in has already aggregated all enrich_one outputs into a single dictionary. The collector node simply reads that dictionary from the state and proceeds with the next pipeline stage — in this case, it passes the aggregated enrichment data downstream.
Now, the contacts leg mirrors the same pattern exactly. The node contacts_queue builds a list of Send objects, each targeted at contact_one. The workers run concurrently. The collector is contact_collect. The same reducer mechanism on the state field ensures safe merging. The topology comment shows this with the arrow from contacts_queue to contact_one to contact_collect, exactly parallel to the enrichment leg.
What makes this design elegant is that the Send-based fan-out is not limited to these two legs. The source excerpt notes that the Send pattern mirrors the in-house deep_competitor_graph dot underscore fan out — same shape, applied at orchestration level. In the product_intel supervisor, you see the same shape powering parallel pricing and go-to-market analysis. A queue node dispatches multiple workers, each exploring a different facet, and the reducer collects their outputs.
Let me emphasize the safety aspect again, because it is the most common concern. Engineers coming from threaded or async environments worry about data races. In LangGraph's Send fan-out, the runtime ensures that each worker invocation is isolated. They share no mutable state except the graph's state object, and that state object is protected by reducers. The workers write to keys that are disjoint by design. The reducer is a pure function that combines the contributed values. There is no shared memory. There is no concurrent access to the same variable. It is deterministic and safe.
You can see this design explicitly in the state schemas defined in state dot py. The underscore merge dict reducer is annotated as the merge function for dictionary fields. When multiple parallel nodes write to the same field, LangGraph calls underscore merge dict with the existing left value and the incoming right value. It creates a new dictionary, copies the left, updates with the right, and returns the merged result. This is a classic conflict-free replicated data type merge. As long as the keys are disjoint, it is idempotent and commutative.
The same logic applies to underscore merge graph meta, but that reducer is slightly more careful. It does a shallow merge at the top level and a key-wise merge on a nested telemetry sub-dictionary. The comment explains that parallel fan-out nodes each emit a telemetry entry under their own node name. A plain dictionary update would clobber the telemetry sub-dict when two branches arrive simultaneously. So the merge does a deep merge only on telemetry, leaving other keys with last-write-wins semantics. This is a practical concession — telemetry keys are disjoint by node name, so a deep merge preserves them.
Now, consider the big picture. The entire B2B lead generation pipeline at apps slash lead-gen backend slash leadgen_agent slash pipeline_graph dot py is a single LangGraph. It replaces a Rust orchestrator that lived in crates slash metal slash src slash teams slash mod dot rs. Instead of re-implementing the phase logic, this graph invokes the existing LangGraph subgraphs that already own each phase. The discover phase uses company_discovery_graph dot graph. The enrich phase uses company_enrichment_graph dot graph, with the Send fan-out applied. The contacts phase uses contact_discovery_graph dot graph and contact_enrich_graph dot graph, again with the Send fan-out. Then qa uses SQL counts and an LLM critic. Outreach uses email_outreach_graph dot graph, with a human-in-the-loop interrupt gate.
The Send fans are the glue. They take a list of inputs — company IDs, contact IDs — and turn them into parallel work. They do not care whether the underlying subgraph is a simple API call or a complex multi-step reasoning chain. The same pattern scales from ten items to a thousand, limited only by the max_per_stage cap and the underlying API rate limits.
So when you read the topology comment at the top of pipeline_graph dot py, you see the queue arrow to dispatch arrow to worker arrow to collect repeating twice. It is not a coincidence. It is the fundamental concurrency primitive. Enrich queue sends to enrich one, collects at enrich collect. Contacts queue sends to contact one, collects at contact collect. The same shape powers deep_competitor and the product_intel supervisor's parallel pricing and go-to-market leg. Once you internalize this pattern, you can apply it to any stage that needs to fan out and fan in. No locks, no mutexes, no channel buffers. Just Send and a reducer.
Telemetry That Merges
Every large language model call in lead-gen goes through a function named ainvoke underscore json underscore with underscore telemetry. This function, defined at line 620 of apps slash lead-gen slash backend slash leadgen agent slash llm dot py, is the single chokepoint for all language model invocations in the lead generation pipeline. Its signature reads: async def ainvoke underscore json underscore with underscore telemetry, with the first parameter llm of type ChatOpenAI, then a list of message dictionaries, then two keyword-only arguments: provider which is an optional string, and max underscore tokens which is an optional integer. The return type annotation says tuple of any and dictionary of string to any. In practice the first element is the parsed JSON object, and the second element is the telemetry dictionary you care about.
Read that signature aloud one more time, but slower. Async def ainvoke underscore json underscore with underscore telemetry. llm colon ChatOpenAI. messages colon list of dict of string to string. star, provider colon string or None equals None. max underscore tokens colon int or None equals None. The arrow says tuple of any, dict of string to any. Every caller in the system that needs a parsed JSON result plus observability data uses this one function.
Now walk through what lives inside that telemetry dictionary. The function builds it right after the language model call succeeds, just after the retry loop. Look at lines 680 through 690. First, it reads the model name. It uses getattr on the llm object, trying model underscore name first, then model, then falls back to the string unknown. That value is stored under the key model. Next come input underscore tokens and output underscore tokens, extracted by a private helper called underscore extract underscore usage. Then total underscore tokens is simply input underscore tokens plus output underscore tokens. The key cost underscore usd is computed by a helper named underscore cost underscore usd, which takes the model name and the two token counts and returns a float representing the dollar cost of that call. Finally, latency underscore ms is an integer: the wall clock time in milliseconds from the moment the function started until just after the response arrived, including any retry delays. The function also includes a comment noting that emitting calls equals 1 makes the single call case explicit.
All of those fields are set directly in the telemetry dictionary. The retry logic above them uses exponential backoff with full jitter. You see that in lines 620 to 640, the loop calls underscore sleep underscore with backoff, passing the attempt number, a base delay, a cap, and the optional exception. The latency measurement starts with a call to time dot perf underscore counter before the loop, and ends after the successful attempt, so the telemetry reflects the total wall clock time across all retries. A slow node that required multiple retries will have a high latency underscore ms value, flagging it for diagnostics.
Now imagine you are building a graph node that calls this function. In the lead-gen system, every node that makes a language model call follows this pattern: it invokes ainvoke underscore json underscore with underscore telemetry, gets back the parsed result and the telemetry dictionary, and then returns a dictionary that includes a graph underscore meta key. Inside graph underscore meta, there is a telemetry key, and inside that, the node name maps to the telemetry dictionary. The docstring of ainvoke underscore json underscore with underscore telemetry shows the exact shape: parsed comma tel equals await ainvoke underscore json underscore with underscore telemetry of llm comma msgs comma provider equals deepseek. Then it returns a dictionary with an entry like graph underscore meta colon brace telemetry colon brace node underscore name colon tel brace brace.
The critical piece is how multiple such returns from parallel fan-out workers are merged into a single state dictionary without losing data. Ordinary dict dot update would clobber the telemetry sub dictionary when two branches write at the same time — the second writer would overwrite the entire telemetry dictionary, losing the first writer's data. That is why the state definition in apps slash lead-gen slash backend slash leadgen agent slash state dot py includes a custom reducer for the graph underscore meta key. That reducer, named underscore merge underscore graph underscore meta, is defined at line 20. Its signature is straightforward: it takes two optional dictionaries, left and right, and returns a dictionary. The logic is a careful shallow merge. For each key in the right dictionary, if the key is telemetry and the value is itself a dictionary, the reducer does a key wise merge inside the telemetry sub dictionary: it takes any existing telemetry from left, merges in the new node specific entries from right, and writes the result back. Non telemetry keys like version, graph, model, run underscore at, agent underscore timings, and totals use last write wins. That means if two branches return different values for, say, the model key, whichever branch is processed last will win — which is fine because those keys are meant to be node independent metadata, not per node measurements.
This design means that observability data flows through the same reducer pattern that merges the graph state itself. There is no side channel, no separate logging system that accumulates telemetry outside the state. The telemetry becomes a first class citizen inside the state graph, merged just like any other piece of shared state. When you see a state object after a parallel node execution, the telemetry dictionary under graph underscore meta contains entries for every node that produced a telemetry record, each keyed by the node name.
After the graph finishes all node executions, a terminal step rolls up these per node numbers into aggregate totals. The totals key under graph underscore meta contains fields like total underscore tokens summed across all nodes, and total cost underscore usd summed across all nodes. The exact function that performs this roll up is not shown in the excerpts, but its purpose is clear: it iterates over every node telemetry entry, adds up input and output tokens, and sums the dollar costs. The reducer treats totals as a last write wins key, so the terminal step simply writes the aggregated totals into that key, overwriting any prior value.
By the end of the graph run, your fully merged state dictionary holds per node granularity under graph underscore meta dot telemetry, and a high level summary under graph underscore meta dot totals. You can inspect the latency of the research node versus the compose node, compare the cost of the deepseek provider versus the openai provider, and confirm that total tokens match the sum across all invocations. All of this is possible because the observability path is hardwired into the state reduction, not bolted on as an afterthought.
The key takeaway for you as an engineer building similar systems is this: when you need to collect telemetry from parallel workers, put it in the state and write a reducer that merges sub dictionaries by key. That is exactly what underscore merge underscore graph underscore meta does. It is the same reducer you met two chapters ago. Now you see how it applies to observability data. The telemetry dictionary itself is not stored in some external database or emitted as a side effect. It lives in graph underscore meta, right alongside the other metadata that flows through the graph. That makes your observability as reliable as the state merge itself, and it guarantees that parallel writes never silently drop data.
Supervisor Composition
The product_intel supervisor lives in a file called product_intel_graph.py, at line one under apps slash lead-gen slash backend slash leadgen_agent. It is a state graph that embodies canonical sequential composition with a parallel fan-out in the middle. You can think of it as a pipeline with five major nodes: load_and_profile, ensure_icp, ensure_competitors, which then fans into two parallel subgraphs — run_pricing and run_gtm — and finally synthesize_report. The entire flow is a directed acyclic graph, or DAG, that produces a ProductIntelReport.
Each of those subgraphs — deep_icp, the pricing graph, the gtm graph — is itself a fully compiled state graph with its own checkpointer. When you step through the product_intel supervisor, it calls them using .ainvoke, which lets them run as independent units that share the same AsyncPostgresSaver when the whole thing is compiled inside app.py. That means they each have their own state, their own nodes, and their own telemetry. But the supervisor needs to fold that telemetry back into its own bookkeeping so that the parent function compute_totals can accurately capture every token spent and every dollar billed across every leg of the pipeline.
That folding is handled by the helper function _fold_subgraph_telemetry, defined at line ninety-two of product_intel_graph.py. Let me read its docstring and body aloud so you can hear exactly how the namespace nesting works.
The function signature is: it takes three arguments. First, supervisor_telemetry, which is a dictionary or None. Second, node_name, which is a string — that's the name the supervisor uses for this subgraph, like "run_pricing". Third, sub_state, which is the state returned by the subgraph after its .ainvoke call. It returns a dictionary that becomes the supervisor's updated telemetry.
Inside the function, it starts by making a copy of the supervisor telemetry, or an empty dict if it was None. Then it checks that sub_state is actually a dictionary; if not, it returns the copy unchanged. That null safety means you never have to add a guard before calling it. Next, it extracts sub_meta from sub_state.get of "graph_meta", defaulting to an empty dict if missing. From that, it pulls sub_tel, which is sub_meta.get of "telemetry". If that is not a dictionary, again it returns the copy unchanged. The real work happens in a loop: for each inner_node and entry in sub_tel.items, if the entry is a dictionary, it sets a key in the telemetry copy. The key is constructed by an f-string: f"{node_name}/{inner_node}". That's the namespace nesting. For example, if node_name is "run_pricing" and the inner node is "load_inputs", the resulting key becomes "run_pricing/load_inputs". If you also have a "run_gtm" subgraph with its own "load_inputs", the key becomes "run_gtm/load_inputs". They don't collide because the first part of the key is the supervisor-side node name. This preserves per-inner-node granularity while keeping each subgraph's telemetry in its own namespace.
Once that dictionary is returned to the calling node — say the run_pricing node in the supervisor — the supervisor's state merges that telemetry into its own graph_meta.telemetry. Later, when the pipeline reaches the END, the compute_totals function runs over the supervisor's telemetry dictionary. It sums up every token count and every dollar cost across all keys, including those nested ones like "run_pricing/load_inputs". That's how a single token spent inside the pricing subgraph's first node gets attributed all the way up to the parent run.
Let's step back and see how this fits into the overall architecture. The product_intel supervisor is defined as a StateGraph subclass, imports from the subgraph modules at the top, and uses a custom state class called ProductIntelState. The state includes a field called graph_meta, which itself is an instance of product_intel_graph_meta schema — that's where telemetry lives. After each subgraph call, the node function extracts the subgraph's graph_meta.telemetry and passes it to _fold_subgraph_telemetry. The result is then merged into the supervisor's graph_meta.telemetry using the reducer merge_node_telemetry.
There's also a sophisticated partial-failure tolerance built in. Look at the docstring from line one to line forty-seven: it explains that subgraph failures are recorded in state dot "subgraph_errors" and surfaced in the final report's graph_meta.partial_failures list. So if the pricing subgraph fails but the GTM subgraph succeeds, the supervisor does not abort. Instead, it continues, and the final report will note that the pricing section is missing. That is achieved by reducers like _merge_subgraph_errors and _merge_progress. The only fatal errors are supervisor-level ones — if load_and_profile cannot read the product row or synthesize_report cannot produce JSON — and those set the terminal _error channel, routing to notify_error_node.
So when you run the product_intel supervisor, you get a robust pipeline that composes multiple independent subgraphs, each with its own checkpointer and telemetry, folds all costs into a single namespace, and tolerates flaky subgraphs without losing the entire run. That is the essence of supervisor composition in production: orchestrate, fold, and continue.
Partial Failure Recovery
You are building a production-grade LangGraph pipeline, and you’ve probably spent most of your time thinking about sequential composition — which node runs after which, how data flows from one step to the next. But sequential composition matters less than the failure model that wraps it. Anyone can chain nodes together. What separates a toy prototype from a system that survives in the wild is how you handle the moment when one leg of a parallel fan-out throws an exception while the other leg succeeds. That is the subject of this chapter: partial failure recovery.
Let’s situate the scene. Your supervisor graph fans out work to two parallel subgraphs: one for pricing, one for GTM — go-to-market. In the ideal world both complete cleanly and the synthesis node merges their outputs. In the real world, maybe the pricing subgraph hits a rate limit on a competitor benchmark API, or the GTM subgraph’s channel-picking node receives malformed data. The old approach would catch any subgraph exception and surface a single _error key on the state, causing the entire supervisor run to abort. That is a hard failure: one bad leg takes down the whole report.
With partial failure recovery, the supervisor catches per-subgraph exceptions individually and records them in a subgraph_errors dictionary instead. The state type that carries this is _ProductIntelStateWithError, defined locally in product_intel_graph.py. It extends the base ProductIntelState with ad-hoc channels: _error annotated with _first_error for the first hard error that should abort everything, and crucially subgraph_errors annotated with _merge_subgraph_errors. The reducer function that merges those error records is the heart of this design.
Open your mental editor to apps/lead-gen/backend/leadgen_agent/product_intel_graph.py around line 90. There you will find _merge_subgraph_errors. It takes two optional dictionaries of string to string and returns a dictionary of string to string. The docstring tells you exactly what is going on: “Parallel fan-out (run_pricing ∥ run_gtm) can both emit a failure in the same step; each writes its own key (subgraph name) so merging is disjoint and last-write-wins is safe. Lets us record partial failures without collapsing them into a single _error and aborting the run.”
The function body is refreshingly simple. It copies the left dictionary, then if right is truthy it calls out.update(right). Because each subgraph writes under its own key — for example "pricing" or "gtm" — there is never a key collision. If the pricing subgraph fails and the GTM subgraph succeeds, the subgraph_errors dictionary contains one entry: {"pricing": "some error message"}. If both fail, it contains two entries. If neither fails, it remains an empty dictionary. Last-write-wins is safe because the keys are disjoint — no two subgraphs will ever try to overwrite the same entry. That simplicity buys you something powerful: pricing and GTM can race without any explicit coordination. They are free to finish in any order, and the reducer will merge their error reports correctly regardless.
Now picture the supervisor node that invokes these subgraphs. It is the node called run_pricing and run_gtm — those are async callables compiled from _PRICING_GRAPH and _GTM_GRAPH, built at module scope with checkpointer=None. Note the performance detail: these subgraphs are compiled exactly once at import time. The source comments explain that recompiling on every run used to cost about two hundred milliseconds per supervisor invocation, plus churned an async context for the saver. By compiling once and letting the subgraph inherit the parent checkpointer, you save both time and complexity. The subgraph-invocation nodes also update progress channels: pricing_subgraph_progress and gtm_subgraph_progress accumulate per-node execution events so the notify_complete node can render a surface like "4 out of 7 nodes done" without the UI needing any knowledge of graph topology. Those progress dictionaries use a separate reducer _merge_progress that extends lists — but that is a detail for another chapter.
When run_pricing or run_gtm encounters an exception, the supervisor catches it inside the node logic (not shown in the excerpt, but you can infer the pattern). Instead of letting the exception propagate and crash the entire run, it writes an entry into the subgraph_errors dictionary with the subgraph name as key and the error string as value. The subgraph node itself may also set an _error channel for hard failures, but the partial-failure path populates subgraph_errors. The state reducer _merge_subgraph_errors then merges these writes across steps. Because each subgraph writes exactly one key, and because the reducer does a simple dict.update, the order of execution does not matter.
After both parallel legs complete, the supervisor graph arrives at the synthesize node. That node has a conditional edge that checks whether subgraph_errors is non-empty. If the dictionary is empty, meaning no partial failures, the edge routes to notify_complete. If the dictionary is non-empty, the edge routes to notify_error. The conditional edge is the decision point: it does not abort at the first sign of trouble; it lets the subgraphs finish, collects all partial failures, and then decides how to inform the caller. This is the difference between brittle orchestration and resilient orchestration. The notify path can then include the partial failure details in the webhook payload, so the caller knows that pricing succeeded but GTM failed, or vice versa.
One more subtlety: the _ProductIntelStateWithError type also includes progress channels for pricing and GTM. Those progress keys are used by the notify_complete node to build rich progress messages. Even when a subgraph fails partially, the supervisor may still have useful telemetry from the subgraphs that succeeded. The _fold_subgraph_telemetry function (also in the same file) folds subgraph telemetry into a supervisor dictionary, preserving per-inner-node granularity under namespaced keys like "run_pricing/load_inputs" so that the compute_totals node can aggregate costs and tokens without collisions.
The entire design hinges on that dictionary reducer. Disjoint keys make the merge trivial. That trivial merge lets two concurrent subgraphs race without locks, without error queues, without any coordination. And that race-enabling property is what lets you keep your pipeline alive even when one path fails.
Take this line home and share it with your team: production-grade orchestration is mostly partial-failure design. You can chain nodes all day, but the resilience comes from how you handle the moments when a subgraph stumbles. Catching per-subgraph errors in a dictionary, merging with a key-disjoint reducer, and routing conditionally on its emptiness — that pattern turns an all-or-nothing abort into a recoverable partial failure. That is the difference between a demo and a deployment.
Remote Graph Adapters
The module lives at apps slash lead-gen slash backend slash core slash remote_graphs dot pie with a docstring that spans from line 1 to about line 100. You can hear that docstring now.
It says: RemoteGraph adapters for cross-container boundaries in leadgen-core. Every cross-container call from core into leadgen-ml or leadgen-research is wrapped here. Each adapter validates its input dict against the matching star Input Pydantic contract from leadgen_agent dot contracts before the Hypertext Transfer Protocol round-trip. It validates the returned dict against the matching star Output contract after the round-trip. It raises contracts dot ContractsVersionMismatch when the remote side responds with a schema underscore version the caller does not recognize. This surfaces shape drift at the very first call instead of at the leaf of some deep response stream. The public surface of every adapter is ainvoke with arguments state and config equals dot dot dot — the same shape a compiled in-process StateGraph exposes, so a core graph can register a remote node using import statements like from core dot remote_graphs import get underscore jobbert underscore ner underscore adapter and then builder dot add node with arguments extract underscore skills and get underscore jobbert underscore ner underscore adapter with parentheses.
URLs are read from the environment at adapter build time. M L underscore URL for the lead-gen-ml base URL, and R E S E A R C H underscore URL for lead-gen-research. The outer dispatcher Worker or service binding handles routing; the core container only needs an H T T P reachable hostname. Bearer auth is forwarded via M L underscore INTERNAL underscore AUTH underscore TOKEN and R E S E A R C H underscore INTERNAL underscore AUTH underscore TOKEN.
Those are the adapter contracts in the docstring. Now step into the mechanics. Every adapter implements a three stage pipeline. Stage one: before any network call, you pass a plain dictionary into the adapter. That dictionary is validated against a Pydantic Input model. The import list shows the full set of Input models: AgenticSearchInput, BgeM3EmbedInput, CommonCrawlInput, ConsultancyPipelineInput, GhPatternsInput, JobbertNerInput, LeadPapersInput, ResearchAgentInput, ScholarInput. Each of these is derived from Pydantic BaseModel. Stage two: after validation passes, the adapter sends the data over H T T P to the remote container. The remote container runs its own transform logic and replies with a JSON dictionary. Stage three: the adapter takes that response dictionary and validates it against the corresponding Output model — for example AgenticSearchOutput, BgeM3EmbedOutput, and so on. If the response dictionary does not match the Output shape, the adapter raises a ValidationError.
But there is a special failure path. If the remote side includes a field called schema underscore version in its response, and that value does not match the SCHEMA underscore VERSION constant known to the caller, the adapter raises ContractsVersionMismatch. That exception is imported from leadgen_agent dot contracts. The docstring already told you the rationale: shape drift gets caught at the very first call, not at some leaf deep in a response stream. In practice this means if you deploy a new version of the remote container that changes the contract, the core graph will fail fast on the very next invocation, instead of silently corrupting state downstream.
Now consider the implications for the broader graph. The adapter layer is the only place where cross-container types are checked. Once the Input contract clears and the Output contract clears, the rest of the graph treats the remote call as just another node return. You wire up a remote adapter exactly like you wire up a local node. The builder dot add node call does not care whether the node is a compiled state graph running in the same process or a RemoteGraph pointing at a different container. The adapter does all the heavy lifting of serialization, transport, authentication, and contract validation.
Let us look at the timeout defaults defined in the same file. The module defines a type alias TimeoutTuple as a four element tuple of floats representing connect, read, write, and pool seconds. The default is assigned to DEFAULT underscore TIMEOUT with value parentheses ten point zero, twenty eight point zero, ten point zero, five point zero. The twenty eight second read timeout is deliberately set just below the Cloudflare Worker thirty second wall clock limit. That way, if a remote node hangs, the client times out with a controlled H T T P X ReadTimeout instead of receiving an opaque five twenty four from the edge. The connect timeout is widened to ten seconds to accommodate the five to fifteen second cold start of a Cloudflare Container. Research adapters, which perform multi minute agentic crawls inside a Container, keep a large read timeout but stay under the five minute ceiling.
Environment variables control the base U R L and auth tokens. The two URL variables are M L underscore URL and R E S E A R C H underscore URL. The two token variables are M L underscore INTERNAL underscore AUTH underscore TOKEN and R E S E A R C H underscore INTERNAL underscore AUTH underscore TOKEN. The import from leadgen_agent dot contracts also brings in a SCHEMA underscore VERSION constant. That version number is compared on every response to detect contract drift.
When you write a new remote adapter, you follow a simple recipe. Create a class that inherits from RemoteGraph or from a custom base. Override ainvoke. In that method, import the correct Input and Output models. Call validate underscore remote call from contracts, which is a helper that performs both input and output validation and raises ContractsVersionMismatch if needed. Then use an H T T P X client to make the H T T P request. The module sets up a client with the default timeout and the appropriate bearer token. The rest of the method is just H T T P semantics.
Every adapter in this file follows that pattern. The eight cross-container boundaries — the docstring lists them implicitly through the import names — all use the same validation machinery. The core graph never sees raw H T T P data. It only sees validated Python dictionaries that match the contracts defined in leadgen_agent dot contracts. That means your developer experience when building a remote node is nearly identical to building an in-process node. You write the input dictionary, call ainvoke, and get back an output dictionary. The adapter handles the rest.
This design isolates contract enforcement to a thin, testable layer. If a remote service changes its schema, the first call fails with ContractsVersionMismatch and you can inspect the stack trace. You do not need to trace through a dozen nodes to find where data started deviating. And because every adapter exposes the same ainvoke interface, you can swap a remote node for a local one without changing the graph wiring. The only change is the import and the node registration.
To summarize the adapter contract in a single thought: the docstring you heard at the start. Input validated before transport. Output validated after transport. Contract version mismatch raised on schema drift. That is the remote graph adapter pattern. It is the only layer where cross container types are checked. After the contract clears, the remote call is just another node return in the state graph.
Circuit Breakers and Timeouts
Let’s walk through the timeout defaults first. You find them in the file apps/lead-gen/backend/core/remote_graphs.py, around line 50. The constant DEFAULT_TIMEOUT is a four-tuple named TimeoutTuple. It stands for connect, read, write, pool — all in seconds. The values are ten point zero for connect, twenty-eight point zero for read, ten point zero for write, and five point zero for pool. That read value, twenty-eight seconds, is the key number. Cloudflare Workers have a thirty-second wall clock limit. If your upstream runs behind a Worker and takes longer than thirty seconds, Cloudflare emits a five-two-four — a generic gateway error that tells you nothing about what went wrong. By capping read at twenty-eight seconds, you give yourself a two-second margin. When a read takes too long, the httpx client raises a clear httpx.ReadTimeout exception. You catch that, log it with adapter context, and your caller knows exactly what happened. The alternative is a confusing five-two-four that leaves everyone guessing.
Now the other timeout tuples. ML_TIMEOUT is ten point zero, ninety point zero, ten point zero, five point zero. The ninety-second read is generous for streaming embedding batches from a Cloudflare Container, which has a five-minute ceiling. RESEARCH_TIMEOUT and SCRAPE_TIMEOUT both use ten point zero, two hundred forty point zero, ten point zero, five point zero. Research adapters run agentic web crawls inside a Container; two hundred forty seconds stays well under the five-minute container cap. Scrape adapter, which walks over twenty-two thousand profile URLs through Playwright, gets the same ceiling. Every timeout is a conscious trade-off: give the remote enough room to do legitimate work, but not so much that a hung socket stalls your graph node for minutes.
Now move to the circuit breaker. Look at the _CircuitBreaker class, also in remote_graphs.py. This is a per-adapter breaker. The state is stored on the class itself in a dictionary called _registry, keyed by adapter name. Multiple adapter instances pointing at the same remote share one breaker. That matters because a cold-start outage tends to affect every adapter routed at the same container. You don't want ten adapter instances each independently trying the same dead socket.
The breaker opens after failure_threshold consecutive failures. In your code, the threshold is three. Once open, it stays open for cool_down_s seconds. That value is sixty seconds. While the breaker is open, every call to allow() returns False. The caller never attempts a real network request. Instead, it raises a RemoteUnavailable exception. That exception is a subclass of RuntimeError, defined right above the breaker. It is distinct from network errors or HTTP errors. That distinction lets your callers choose to degrade gracefully rather than retry. They can fall back to a cache, return a partial result, or just log the short-circuit and move on.
When the cool-down period elapses, the next call to allow() returns True — that is a half-open probe. But note the implementation: it sets opened_at to None and returns True. It does not reset consecutive_failures. Only a successful response, recorded via record_success(), resets both consecutive_failures and opened_at. If the probe fails, record_failure() increments the counter. If the counter reaches the threshold again, the breaker re-opens immediately. The method for_name creates or retrieves a breaker from the registry and also allows newer adapter instances to update the thresholds. There is a reset_all class method, marked as a test hook, that clears the entire registry.
Finally, the retry budget. Think about what operations are safe to retry. Idempotent operations — those that produce the same result no matter how many times you execute them — can be retried automatically. Non-idempotent operations, like submitting a lead or creating a record, must not be retried blindly because you could duplicate side effects. Your circuit breaker does not enforce that distinction; it just short-circuits all calls when open. The caller is responsible for deciding whether to retry based on the exception type. If the breaker is open, you get a RemoteUnavailable — treat that as a signal to back off, not to retry. If the breaker is closed but a timeout occurs, you get an httpx.ReadTimeout. For idempotent calls, you might retry once after a brief delay. For non-idempotent calls, you surface the error to the user. The budget is implicit: you have only as many attempts as the breaker's failure threshold before the circuit opens. After that, you wait sixty seconds before you can even try again.
So the whole system works in layers. Timeouts give you a controlled failure mode at the transport level. The circuit breaker gives you fast failure isolation when the remote is clearly down. And the retry logic, combined with the RemoteUnavailable exception, lets your application degrade sensibly without hammering a dead endpoint. Every number is chosen to align with Cloudflare infrastructure: the twenty-eight-second read avoids the five-two-four, the ten-second connect allows for container cold starts, the sixty-second cool-down gives the remote time to recover, and the three-failure threshold catches transient glitches without being too trigger-happy. You have a resilient mesh of knobs, each tuned to a specific failure mode.
Human in the Loop
Two graphs are gated by a human: pipeline_graph and email_outreach. The gate lives inside a node called outreach_queue, defined in the file apps slash lead-gen slash backend slash leadgen_agent slash pipeline_graph.py, on lines seven sixty through eight thirty. You are going to walk through that node step by step.
First, picture the pipeline_graph. Its state contains a key called auto_confirm. That's a feature flag, not a separate graph. When auto_confirm is true, the gate opens automatically; when false, it hands control to a person. Synthetic load tests and background dispatch paths short-circuit the gate by setting auto_confirm to true, but the production graph is identical. There is no alternate graph, no duplicated logic. The same code path just takes a different branch inside the node.
Now enter the outreach_queue function. It is async and accepts a PipelineGraphState parameter. Inside, it calls underscore blocked underscore domains underscore set via asyncio.to thread to build a set of domain names you never want to contact. It reads limit from state dot get max_per underscore stage, and it reads auto_confirm as a boolean from state dot get auto_confirm. If auto_confirm is true, the function returns immediately with two keys: outreach_candidates set to the full queue, and approved_outreach_ids built by pulling the contact_id integer from each candidate. No human sees this step. The graph continues.
But if auto_confirm is false, the real gate mechanism engages. The function first tries to build the queue by calling underscore queue underscore outreach asynchronously. If the database call raises a psycopg dot Error, the queue becomes an empty list and the function appends an error message. After that, if the queue is empty, the node records a StageReport with stage outreach_gate, status SKIP, processed zero, created zero, an empty errors list, and duration underscore ms zero. It appends that report to the existing reports list and returns with empty outreach_candidates and approved_outreach_ids. No human intervention needed because there is nothing to approve.
If the queue is not empty, the function calls the langgraph.types.interrupt function. This is the core of the human-in-the-loop pattern. You pass a payload—a dictionary with three keys: stage set to the string outreach_approval, queue holding the full list of candidates, and instructions providing a human-readable resume prompt. That prompt says: Resume with one of: approve_all, reject, a list of approved contact_ids, or a dictionary with a key approved_ids mapping to a list of integers. When LangGraph encounters this interrupt, it checkpoints the entire thread state to Neon, your Postgres-compatible database. The thread pauses, waiting for a Command.
Meanwhile, the frontend polls the thread status continuously. It sees the interrupt, surfaces the queue to a human operator in a UI, and collects their decision. That decision is packaged into a Command object with a resume field set to the approval payload—for example Command open parenthesis resume equals approve_all close parenthesis. The frontend sends that as a POST request to the graph’s API. The stored checkpoint is loaded, and the interrupt call returns the exact value you passed as resume.
Back inside outreach_queue, the returned decision lands in a variable called decision. You then call underscore resolve underscore approval with decision and the original queue. That function, defined on line seven sixty through eight thirty, builds a set of allowed contact IDs from the decision and returns a filtered list plus a note string like approved 5 out of 10. The returned tuple fills approved and note. After that, the function constructs a StageReport with stage outreach_gate, status set to OK if approved is non-empty or SKIP if not, processed set to the length of the original queue, created set to the length of approved, and errors list kept from earlier. It appends that report to the state’s reports list and returns outreach_candidates as the full queue—even rejected contacts may be stored—and approved_outreach_ids as the approved integer IDs. A log line records the note.
This pattern only works because pipeline_graph runs in core. There is a contract rule, defined in contracts.py, that forbids interrupts inside any graph reachable through RemoteGraph. RemoteGraph is the mechanism that lets one graph call another across service boundaries. If a subgraph is invoked via RemoteGraph, it cannot contain an interrupt because the caller has no way to surface the approval UI to a human. Interrupts are allowed only in the root graph that runs in core—the graph where the frontend directly interacts. For email_outreach, the interrupt is either placed in the same core graph or the gate is moved to a separate pipeline graph that the user’s system invokes directly. The rule is absolute: no interrupt in RemoteGraph-reachable graphs.
So remember: auto_confirm is a flip of a boolean, not a rewrite of the graph. The same pipeline_graph code serves both automated and human-gated flows. When a person must approve, the interrupt checkpoint threads to Neon, the frontend polls, and the human’s decision resumes the graph. That is the human in the loop.
Connection Pool Engineering
You are building a lead generation system. The core of it is a fleet of over sixty graphs, and each graph needs to checkpoint its state to a Postgres database. That database is Neon, a serverless Postgres service. But Neon does something tricky: it sits behind PgBouncer, a lightweight connection pooler that enforces a strict idle timeout. After roughly five minutes of silence, PgBouncer tears down the TCP connection. If your application’s own pool is not engineered to account for that behavior, you will get an OperationalError on the next checkout.
Let me read the pool construction from the source code, exactly as it appears in apps/lead-gen/backend/core/app.py, lines 200 through 240. The code says:
async with AsyncConnectionPool( db_url, min_size=1, max_size=8, open=False, kwargs={"autocommit": True, "prepare_threshold": 0}, check=AsyncConnectionPool.check_connection, max_idle=180, ) as pool: await pool.open(wait=True, timeout=15) checkpointer = AsyncPostgresSaver(pool) app.state.checkpointer_alive = True _boot("checkpointer_context_entered") await checkpointer.setup() _boot("checkpointer_setup_ok") graphs, compile_failures = _compile_all(checkpointer) app.state.graphs = graphs app.state.graph_compile_failures = compile_failures _boot( f"graphs_compiled count={len(graphs)} " f"failed={len(compile_failures)}" ) if compile_failures: log.warning( "%d graphs failed to compile: %s", len(compile_failures), compile_failures, )
Now, step through each knob and understand why it is set where it is.
First, min_size=1. That means the pool will always keep at least one connection alive, even when no graph is actively using it. You might think a single idle connection is wasteful. But the alternative is worse: a naive pool that lets the count drop to zero. After the pool drains, the very next checkout must create a fresh connection from scratch. That create involves a TLS handshake, authentication, and the overhead of establishing a brand new TCP socket. Meanwhile, your graph’s checkpointer.aget_tuple call is waiting. On a cold path, you have observed failures exactly like that: OperationalError: consuming input failed: SSL error: unexpected eof while reading. So one idle connection is the price you pay to keep the path warm.
Next, max_size=8. This is an upper bound on the number of concurrent connections the pool will hold. Neon’s pooler itself caps a single project at around twenty connections by default. Here, you share that pool with the per-branch psycopg.connect calls that happen inside individual graph nodes. Those are one-offs, not part of the checkpointer’s pool. So you keep the checkpointer’s pool conservative. Eight connections means you have headroom for bursty checkpointing traffic across dozens of graph executions, but you never risk starving the rest of your application’s connections to the same Neon project.
The open=False parameter, paired with the explicit await pool.open(wait=True, timeout=15), is a workaround for a deprecation warning. Without it, the pool constructor would try to open itself automatically, which triggers a warning in newer versions of the asyncpg library. You suppress that warning by delaying the open and calling it yourself. The wait=True means the call will block until the pool has at least one healthy connection. The timeout=15 means you give it fifteen seconds; if it cannot establish a single connection by then, you will get a timeout error, which is better than silently starting with an empty pool.
Now the most critical setting: max_idle=180. That is one hundred eighty seconds. It tells the pool to recycle any connection that has been idle for three minutes. Why three minutes? Because Neon’s PgBouncer drops TCP connections after roughly five minutes of idle time. With max_idle at 180 seconds, you proactively evict idle connections before PgBouncer can tear them down. That way, you never hold a connection that Neon considers stale. If you set max_idle to something longer than five minutes, say 400 seconds, you would occasionally hand out a connection that PgBouncer already killed. The next query would fail with that SSL error. So 180 seconds is a deliberate safety margin.
The check=AsyncConnectionPool.check_connection is your insurance policy. Every time you ask the pool for a connection, it runs a lightweight SELECT 1 to verify that the connection is still alive. Without this check, you would rely solely on the timeout mechanism. But timeouts are not instantaneous: a connection can be idle for four minutes, still appear valid in the pool, but then fail at checkout because PgBouncer is about to close it. The check_connection function catches those cases and evicts the stale connection before you ever try to use it. The overhead is trivial — a single round trip to the database — and it eliminates an entire class of randomly failing checkpoints.
The kwargs={"autocommit": True, "prepare_threshold": 0} ensures that every query runs in an implicit transaction, and that no prepared statements are cached. Prepared statements are useful for repeated queries, but they hold server-side state. In a pool that shares connections across graphs, you do not want stale prepared statements leaking from one execution context into another. Setting prepare_threshold to zero disables them entirely.
After the pool is created, you call await checkpointer.setup(). This creates the checkpointer tables if they do not exist. It is idempotent: on first run it creates them, on subsequent runs it is a no-op. You log the success with _boot("checkpointer_setup_ok").
Now, here is the crucial fact that makes this pool invisible until something flaps: the whole fleet of over sixty graphs shares this single pool. Every graph instance, every execution, every checkpoint — all of them call checkpointer.aget_tuple or checkpointer.aput through the same AsyncConnectionPool object. As long as connections remain fresh and valid, you never notice the pool. It just works. But the moment a connection goes stale, every graph that tries to use that connection fails. The error propagates up as an unhandled OperationalError, and your graph execution crashes.
That is why the pool is the only thing that matters when something flaps. You can tune graph logic all day, but if the pool hands out a dead connection, your entire system becomes unreliable. The engineering you see in these thirty lines of code is defensive by design: a conservative minimum, a modest maximum, a proactive idle timeout shorter than the external pooler’s cutoff, and a health check on every checkout. Together they make the checkpointer’s backing pool resilient to the quirks of Neon’s PgBouncer.
Lifespan and Shutdown
The FastAPI lifespan context manager owns the entire harness lifecycle. It is a single async generator decorated with at sign asynccontextmanager, and it lives in the file apps/lead-gen/backend/core/app.py starting around line 120. When your application boots, ASGI, the Asynchronous Server Gateway Interface, hands control to this generator, and it stays open until the container receives a shutdown signal. You never call it directly; the framework drives it. Understanding what it does on entry and on exit is the difference between a system that survives a Cloudflare deployment and one that silently drops work.
On the way in, the lifespan body initialises diagnostic state so the debug endpoint, slash slash debug slash state, is reachable even if something fails later. It sets three attributes on app dot state: boot underscore log, an empty list; lifespan underscore error, initialised to None; and checkpointer underscore alive, set to False. These live through the entire process, and they let you interrogate the boot sequence remotely. A helper function called underscore boot appends a timestamped message to the log and writes it to the application logger. You will see calls like underscore boot open quote lifespan underscore enter close quote as the very first entry.
Then comes LangSmith configuration. The code tries to import init underscore langsmith from leadgen underscore agent dot langsmith underscore setup and calls it. It records either langsmith underscore ready or langsmith underscore disabled in the boot log. If the import or call raises an exception, it logs langsmith underscore init underscore failed followed by the exception type and message. The reason is simple: LangSmith tracing activates per call through the environment variable LANGSMITH underscore TRACING, but the API key and endpoint must be valid before the first agent run. You want that failure visible in the boot log, not buried in a runtime trace.
Next the lifespan body creates the connection pool. In production, on Cloudflare Containers, the database URL arrives from the entrypoint JavaScript constructor in core slash src slash index dot js. That entrypoint reads the secret NEON underscore DATABASE URL from the wrangler plain secret, or from the Cloudflare Secrets Store under store ID ec nine two eight f four seven seven one f b four five seven seven a six zero seven a zero b one two two e eight zero eight seven e. Locally it comes from dot env local, which is gitignored. The lifespan body opens a connection pool to that Neon PostgreSQL database, and it does so inside a try block that can capture failures and surface them without crashing Uvicorn.
After the pool is ready, the checkpointer setup runs. The checkpointer is an AsyncPostgresSaver. Its setup is idempotent: you can call it many times, and it only creates the schema if it does not already exist. This matters during rolling deployments where two instances might initialise simultaneously. The call returns quickly and the lifespan body continues.
Then each graph is compiled. The code iterates over every assistant specification, compiles the corresponding state graph, and stashes the compiled graph objects on app dot state dot graphs. If a graph fails to compile, it logs an error with the assistant ID and the exception: graph compile failed colon whatever the spec dot assistant ID is. The failures are collected and returned separately, but the container does not crash. You can inspect the boot log later to see which graphs are missing.
Finally, the adapter handles for cached RemoteGraph clients are stashed on app dot state dot remote underscore adapters. These are httpx-based pooled connections to remote graph servers. If building the adapters fails, the code catches the exception, logs remote underscore adapters underscore failed, and sets the dictionary to empty so subsequent calls fail gracefully instead of hitting a nil reference.
At this point the logs record graphs compiled with AsyncPostgresSaver and list the graph names. The boot log gets lifespan underscore ready. Then the generator yields. That yield is the live window of your application. While it runs, incoming HTTP requests arrive, agents execute, checkpoints write to PostgreSQL, and background tasks accumulate in three collections: underscore async underscore run tasks, underscore positioning underscore inflight, and underscore lead gen underscore inflight. Those are lists of asyncio Task objects that represent operations launched by the agents. They run concurrently with your request handling.
Now the critical part: shutdown. When the ASGI server decides it is time to stop, it resumes the lifespan generator after the yield, entering the finally block. The first thing this block does is drain every in-flight background task. It reads the environment variable GRACEFUL underscore SHUTDOWN underscore S and converts it to a float. The default is twenty seconds. That value is not arbitrary. Cloudflare Containers send a SIGTERM signal on abandonment, and Uvicorn translates that signal into a lifespan shutdown. If you simply yielded back without draining, those pending asyncio tasks would be cancelled mid execution. A task that is halfway through an AsyncPostgresSaver write would raise CancelledError before its commit completes. The partially-applied graph state would be lost. You would see no error, no warning, just missing data.
The drain logic is explicit. It builds a list of every task in the three collections that is not yet done. If the list is non-empty, it logs the count and starts gathering them with asyncio dot wait underscore for and a timeout equal to the shutdown grace period. If all tasks complete before the timeout, the gather returns and the code proceeds. If the timeout fires, it logs a warning with the number of tasks that remain incomplete. Then it iterates over those leftover tasks and calls t dot cancel on each one. After cancellation it awaits them again with return exceptions set to True. That second await lets the CancelledError propagate inside each task, which triggers any finally clauses in the graph code to release pool connections. You do not want connection leaks dangling behind a cancelled task.
Once the tasks are handled, the code closes the cached RemoteGraph adapters by calling await aclose underscore all adapters. This is best-effort. Each adapter close already swallows per-client errors, so a misbehaving remote cannot block your shutdown. If something still raises, the exception is logged but not re-raised.
The outermost try block ensures that if any part of the startup body fails, the exception is captured in app dot state dot lifespan underscore error and the container stays alive. Without that catch, Uvicorn would see an ASGI lifespan startup failed event and crash the process. On Cloudflare that would give you a bare Internal Server Error in the browser with no log line. With the try block, the debug endpoint remains reachable, you can inspect the boot log, and you can see exactly what went wrong.
So the pattern is deliberate. On startup you open resources, compile graphs, and verify configuration. On shutdown you drain work with a hard deadline, close pooled connections, and never let an exception escape. The twenty-second default matches Cloudflare's grace period of roughly thirty seconds, leaving a small buffer for final logging and process exit. Without the explicit drain, you cancel graph runs mid commit. With it, every pending write lands in PostgreSQL before the checkpointer context exits. That is how you keep your data safe in a serverless environment where containers can vanish at any moment.
Cost Accounting
You have a cost dictionary called MODEL_PRICING, defined in apps slash lead-gen slash backend slash leadgen_agent slash llm dot py, lines 380 through 430. This dictionary is your single source of truth for United States dollar accounting. Every entry maps a model identifier to a dictionary of per-million-token rates. The dictionary begins by unpacking rates from a catalog named DEEPSEEK_MODELS, then adds several hard-coded entries for legacy and third-party models.
You have four legacy model entries retained for historical compatibility. DeepSeek chat has an input rate of twenty-seven cents per million tokens and an output rate of one dollar ten per million. DeepSeek reasoner has fifty-five cents per million input and two dollars nineteen per million output. These identifiers were deprecated in July 2026 but remain so that any in-flight calls and historical records cost-account correctly.
The constant SONNET_4_6 holds the rates for Anthropic Claude Sonnet 4.6. The base input rate is three dollars per million tokens and the base output rate is fifteen dollars per million. Additional cache-tier rates let you discount or premium-price depending on cache behavior. For a cache write with a five-minute time to live, the rate is three dollars seventy-five per million. For a cache write with a one-hour time to live, it is six dollars per million. Cache reads are heavily discounted at thirty cents per million. The code comments note that cache-hit and off-peak discounts are not factored in; this is the pessimistic price, so real spend is less than or equal to the computed cost underscore USD. That trade-off is close enough for answering which node is expensive, and you can swap in actual billed amounts when you ingest the provider's billing API.
You also have rates for Cloudflare Workers AI. The model identifier mistral email lora carries an input rate of eleven cents per million and an output rate of nineteen cents per million. Finally, OpenRouter-routed Claude models are included. Anthropic slash claude sonnet 4 dash 6 matches the Anthropic list price at three dollars input and fifteen dollars output per million. Anthropic slash claude haiku 4 dash 5 has an input rate of eighty cents and an output rate of four dollars per million.
The function that computes cost from these rates is called underscore cost underscore USD. It takes the model identifier, the number of input tokens, the number of output tokens, and optionally cache creation input tokens and cache read input tokens. It rounds the result to seven decimal places so that accumulated totals do not carry float noise into the database. If the model is unknown, it returns zero dollars and logs a single warning.
Now look at the main async function that makes large language model calls and records cost. It is called ainvoke underscore json underscore with telemetry, defined at line 620 of the same file. It wraps a standard LLM invoke call and returns a tuple containing the parsed response and a telemetry dictionary. Before the call, it sets up retry configuration. It measures the wall-clock time and enters a loop with up to LLM_MAX_RETRIES attempts. If the call succeeds, it breaks. If it encounters a transient failure, such as a connection reset or a five hundred error, it calls a helper named underscore sleep underscore with backoff. This helper first checks if the exception provides a retry-after hint. If so, it sleeps for that many seconds. Otherwise, it computes a delay with exponential backoff and full jitter: for each attempt, it calculates a high bound as the minimum of a cap and the base multiplied by two to the power of attempt minus one, then sleeps for a random duration between zero and that high bound. The latency recorded in the telemetry reflects the total wall-clock time across all retries.
After the response is received, it extracts the input and output token counts using a helper called underscore extract underscore usage. It also retrieves the model name from the large language model object. It then builds the telemetry dictionary with these fields: model, input tokens, output tokens, total tokens, cost underscore USD, and latency underscore milliseconds. The cost underscore USD field is computed by calling underscore cost underscore USD with the model and token counts. The dictionary also emits calls equal to one to make the single-call case explicit.
Consider a typical pipeline run in your lead generation flow. For each company, your node processes roughly two thousand input tokens and eight hundred output tokens. If you are using the DeepSeek chat model with its rates of twenty-seven cents per million input and one dollar ten per million output, the cost works out to about one point four two tenths of a cent. That is well under one United States cent.
Now contrast this with a heavier graph that uses Anthropic Claude Sonnet 4.6. If the same node processes twenty thousand input tokens and five thousand output tokens, the cost at the base rate is about thirteen and a half cents. Depending on whether you use cache writes or cache reads, the rate changes, but this gives you a sense of the range. The telemetry dictionary for each node includes the exact cost in dollars, allowing you to compare expenses across different models and pipeline stages.
Concurrency Knobs
LangGraph itself does not cap the number of Sends, so unbounded fan-out is the default. To prevent a single dispatch from overwhelming your system, lead-gen gates the upstream Hypertext Transfer Protocol layer instead, using asyncio Semaphores tuned by environment variables. You configure these concurrency limits at deployment time, not in the graph definition.
Open the file apps/lead-gen/backend/core/app.py around lines 895 to 930. There you find a Pydantic model called DispatchPositioningRequest. It has two fields: force which defaults to false, and limit which is an optional integer. This model is the entry point for a positioning request. But the real concurrency control lives a few lines earlier.
Two module-level constants control the positioning fan-out. First, _POSITIONING_CONCURRENCY reads the environment variable POSITIONING_CONCURRENCY, defaulting to the string "3", and casts it to an integer. Second, _POSITIONING_TIMEOUT_S reads POSITIONING_TIMEOUT_S and defaults to the string "600", which you then parse as a float. Six hundred seconds means ten minutes – a generous timeout for a positioning run.
From _POSITIONING_CONCURRENCY you build an asyncio.Semaphore called _positioning_sem. This semaphore ensures that at most three positioning invocations run concurrently. The comment in the code explains why: deepseek-v4-pro tolerates a few concurrent requests, but a one-hundred-product dispatch with no cap would rate-limit your large language model pool and starve other graphs that share the same pool. So you impose an explicit throttle.
Now look at the set that appears right after the semaphore: _positioning_inflight. This is a Python set that holds strong references to asyncio.Task objects. Why is it necessary? The comment references CPython three-point-eleven-plus behavior and bug report BPO-44665. Here is the mechanics. When you call asyncio.create_task, the returned task object holds only a weak reference to the underlying coroutine in some internal structures. If you do not keep a strong reference to that task, the garbage collector may free it while the task is still running. In CPython three-point-eleven and later, a garbage collection pass mid-dispatch can cancel a fire-and-forget run. The set _positioning_inflight prevents that. Every background positioning task you create gets added to this set. The set itself is a plain module-level set of asyncio.Task[None]. As long as the set lives, each task stays alive. When the task finishes, you must remove it yourself – typically in a callback or by using task.add_done_callback to discard the reference. Without this set, a sudden garbage collection cycle could silently kill your dispatched runs.
Let us follow the code of _run_positioning_bg. This is an async function that takes a product_id integer and a thread_id string. Inside, it first retrieves the positioning graph from the application state. It checks if app.state has an attribute graphs, and if so, gets the graph keyed by the string "positioning". If no graph is found, it logs an error and returns early – a graceful no-op rather than a crash.
Next, you build a configuration dictionary with a configurable key that sets thread_id. Then comes the core concurrency control block: async with _positioning_sem. This acquires the semaphore before entering the critical section. Inside the critical section, you call asyncio.wait_for with the graph invocation. asyncio.wait_for takes a coroutine and a timeout. Here the timeout is _POSITIONING_TIMEOUT_S, the six hundred seconds. If the graph invocation takes longer than that, asyncio.wait_for raises a TimeoutError, which you must handle elsewhere. The graph invocation itself is graph.ainvoke({"product_id": product_id}, config=config). So the product identifier is passed as part of the input state.
Notice that the semaphore acquisition happens inside _run_positioning_bg, not outside. This means each background task acquires the semaphore on its own, and multiple tasks can queue up at the semaphore. The environment variable POSITIONING_CONCURRENCY directly determines how many tasks can run simultaneously. For positioning, that default is three. A similar semaphore exists for lead-gen, built from the _LEAD_GEN_CONCURRENCY environment variable – but that code lives in a different part of the same file.
The remote graph timeout knob is the second environment variable here: POSITIONING_TIMEOUT_S. You control how long each individual positioning run can take before being aborted. If you expect slower responses from the large language model, you can increase this timeout. If you want faster failure, you decrease it. This timeout works together with the semaphore: the semaphore limits how many tasks can be in-flight, and the timeout limits how long each task can hold the semaphore. Together they prevent resource starvation.
One more detail: the DispatchPositioningRequest model has a limit field that can override the default concurrency on a per-request basis. You might use that for ad-hoc testing. But the normal flow relies on the environment-variable-driven semaphore.
In summary, you have learned how lead-gen controls concurrency without relying on LangGraph's built-in mechanisms. You set POSITIONING_CONCURRENCY and POSITIONING_TIMEOUT_S in your environment. You use an asyncio.Semaphore to gate the number of concurrent positioning runs. You maintain a strong reference set to prevent garbage collection from killing fire-and-forget tasks. And you wrap each run in asyncio.wait_for to enforce a timeout. These knobs give you fine-grained control over your system's load profile.
Langfuse Integration
Lead-gen plugs Langfuse in at two layers. The first layer is the per-large-language-model-call layer. It is the telemetry helper from earlier chapters. The runnable configuration carries a Langfuse callback that attaches to every chat invocation. Traces appear in the Langfuse Application Programming Interface without per-graph code changes. Open the file apps/lead-gen/backend/leadgen_agent/llm.py at line six hundred twenty to seven hundred. You see the function ainvoke_json_with_telemetry. Inside, the variable invoke_config is a dictionary optionally holding the configuration. Initially set to None, in production this dictionary includes a callbacks key. The value is a list with the Langfuse callback object. When the function calls llm.ainvoke(messages, config=invoke_config, **kwargs), the callback is invoked automatically. Every language model call is traced.
The docstring of ainvoke_json_with_telemetry explains how callers use it. They call the function with the language model and messages, optionally specifying provider and max_tokens. The function returns a tuple containing the parsed response and a telemetry dictionary. The caller then includes that telemetry in the node's output under graph_meta. The _merge_graph_meta_telemetry reducer in state.py folds per-node entries together without clobbering parallel writes. This is crucial for correctness in parallel node execution.
The same excerpt shows the retry logic. The function _sleep_with_backoff implements exponential backoff with full jitter. It first checks if an exception is not None. If so, it calls _retry_after_seconds to see if a specific retry hint exists. If a hint is provided, it sleeps for that many seconds and returns. If base is less than or equal to zero, it returns immediately. Otherwise, it computes high as the minimum of cap and base multiplied by two raised to the power of attempt minus one. Then delay is a random value between zero and high. It sleeps for that many seconds.
The retry configuration comes from _retry_config. It returns three values: max_attempts, base, and cap. The loop iterates from one to max_attempts. Inside, if a BaseException is caught and it is not retryable, or if the attempt count equals max_attempts, the exception is re-raised. Otherwise, the code logs a warning. The log message reads: "LLM call transient failure (attempt %d/%d): %s: %s — retrying". The placeholders are the current attempt, the total max_attempts, the exception type name, and the string representation truncated to two hundred characters. This gives you full visibility into transient failures during trace inspection.
The telemetry record captures the total latency_ms as the wall-clock time difference between time.perf_counter() at the start and after the loop, multiplied by one thousand and cast to integer. This total includes all retries. So even if a call is slow due to retries, the diagnostics surface the true latency. The function _extract_usage pulls input_tokens and output_tokens from the response. The model name is obtained from llm.model_name or llm.model, falling back to the string "unknown". The telemetry dictionary includes model, input_tokens, output_tokens, total_tokens, cost_usd, and latency_ms. The cost_usd is computed by _cost_usd using the model name and token counts. The docstring mentions that emitting calls=1 makes the single-call case explicit. All of these details are emitted per call because the Langfuse callback is in the invoke config.
That is the first layer. Now consider the second layer. It operates at the graph level. At the FastAPI entry point, when you invoke the graph using graph.ainvoke, you can pass an optional Langfuse callback in the runnable configuration. This elevates tracing from only the language model calls to the entire data flow. State transitions, tool calls, fan-in, and fan-out are all recorded. The per-LLM layer gives you granular cost and latency per node. The graph layer gives you end-to-end visibility. They are complementary and both contribute to observability.
A key piece of infrastructure for this second layer is the singleton function get_langfuse_callback. It returns None when the public key is unset. The design is intentional: when the integration is not configured, the callback is simply absent. No error is raised, and no overhead is incurred. The function is cheap to call on every node. You can invoke it inside each node's logic without worrying about performance impact. This makes the graph-level integration optional and resilient. When the public key is set, the callback is present and traces flow. When it is unset, the callback is None, and the system runs without Langfuse. No conditional logic is required in your nodes. The design is clean.
So you have two layers. The per-LLM layer uses the runnable configuration to attach the Langfuse callback automatically. The graph layer uses an explicit callback passed into the graph invocation. Both rely on the same underlying Langfuse object, managed by the singleton. When you implement a new node, you do not need to add any Langfuse-specific code. The per-LLM layer is automatic because the invoke config is built centrally. The graph layer is optional but available whenever you need end-to-end tracing. This two-pronged approach gives you cost and latency visibility at the call level and full execution traces at the graph level.
Observability and Tracing
Imagine you’re debugging a multi-container incident. A user reports a bad response. You pull logs from the Cloudflare dispatcher Worker, then from the core container, then from the ml container, then from the research container. Every container speaks its own dialect of correlation. One uses a timestamp plus a random number, another uses a legacy transaction ID, a third logs nothing at all. You cannot connect the dots. What should have been a one-hour investigation turns into a day of guesswork. You’re left grepping for timestamps within a second window and praying the containers are synced to the same clock.
The fix is boring, deterministic, and worth doing on day one. It’s a shared middleware that stamps every request with a single identifier that all your containers agree on. The implementation lives in a file called apps/lead-gen/backend/leadgen_agent/observability.py, lines 1 through 90. Its central artifact is a factory function named make_request_id_middleware. Every HTTP binary in your fleet — leadgen_agent/custom_app.py, core/app.py, ml/app.py, research/app.py — calls app.add_middleware(make_request_id_middleware()). That one line of wiring ensures that every inbound request carries an x-request-id header, and every outbound response echoes it back. Log lines from that request, in every container, will share that same identifier.
The contract is simple. If the upstream caller already sent an x-request-id header, you trust it. You never overwrite a parent trace ID. If no header arrived, you mint one using uuid.uuid4().hex. That’s a standard library call — no new dependencies. The middleware then stashes the ID on request.state.request_id, so your handlers can read it without re-parsing the headers. And it sets the response header to the same value, so the calling Worker can stitch its log line to yours.
But raw trust is dangerous. A hostile or buggy upstream could send a multi-kilobyte header and bloat your logs. The middleware includes a coercion function, _coerce_request_id, which caps the length at 128 characters and strips whitespace. If the inbound raw value is None or fails the sanity check, the function replaces it with a fresh UUID hex. Observability never breaks the request itself; a bad ID is replaced, not rejected.
Under the hood, make_request_id_middleware returns a new class each time it’s called. That’s intentional: it avoids mutable state when multiple FastAPI apps live in the same process, such as during tests. The factory shape mirrors make_bearer_token_middleware, so the wiring sites look symmetric. The returned class is a subclass of Starlette’s BaseHTTPMiddleware. Its dispatch method is the heart of the operation. It reads the incoming header, coerces the ID, stores it on request.state, calls the next middleware or handler, and sets the response header. If an unhandled exception occurs, the middleware logs the failure with the request ID and the URL path, so even an exception has a join key back to the caller.
Because the implementation is stdlib-only — just uuid and logging — it adds zero operational complexity. No new packages, no orchestrator magic, no service mesh dependency. You import one helper and wire it in four places. The result: every HTTP request that enters your system, no matter which container handles it first, gets a deterministic trace identifier. That identifier appears in every log line emitted by that container, and in the response header that the next container reads. The chain propagates from the dispatcher Worker through core, ml, and research.
Now picture the incident again. A user reports a bad response. You search your logging system for x-request-id values near the time of the failure. Every line from every container with that ID surfaces in one query. You can see the full lifecycle: the dispatcher received the request, forwarded it to core, core called ml, ml returned an error, core raised an exception, and the dispatcher returned a 500. You can pinpoint the exact ml endpoint that misbehaved and the input that triggered it — all because every container spoke the same request-id contract. The incident takes minutes, not hours.
The middleware is boring on purpose. It doesn’t need to be clever. It follows a single rule: never invent your own ID when a parent one exists, always generate one when none exists, and always propagate it downstream. Chasing a missing trace across three containers without this layer is the kind of thing that turns a one-hour incident into a one-day incident. Do it on day one, and you’ll never know the pain you avoided.
When This Pays Off
So when does this architecture pay off? You need four conditions to be true, and if any one of them is missing, the complexity you just walked through — the shared connection pool, the background task drain, the per-node telemetry accumulators — becomes dead weight.
First condition: you have many small graphs that share one persistence layer and one telemetry plane. In the lead-gen system, over sixty graphs share a single AsyncPostgresSaver attached to a Neon database. The pool is configured in core/app.py around lines 212 to 222: min_size=1, max_size=8, max_idle=180 seconds, with a check=AsyncConnectionPool.check_connection to verify each checkout. That pool is invisible until something flaps, and then it is the only thing that matters. If you only have one graph, you do not need this engineering.
Second condition: your workloads fan out across many companies or contacts and merge through reducer dictionaries. The lead-gen graph sends parallel workers to discover and enrich dozens of companies, each producing a graph_meta.telemetry dict. Because a merge reducer is attached, telemetry from every node accumulates without conflict. A terminal node calls compute_totals() to roll the per-node numbers into total_tokens and total_cost_usd, which then land in the product_intel_runs table (migration 0066) and the API response. If your workload is a single linear chain that finishes in under a minute, this kind of fan-out and merge is overkill.
Third condition: some steps live in another container behind a circuit breaker. The lead-gen system delegates remote work to containers like leadgen-research and leadgen-scrape, each with a configurable timeout — default 240 seconds — controlled by environment variables like REMOTE_GRAPH_TIMEOUT_*. The circuit breaker pattern means that if one of those containers goes silent, the parent graph does not hang indefinitely. It logs, it retries, it moves on. You need this when your agent fleet has to keep running while one of its arms is on fire.
Fourth condition: a human approves a subset of the work before money or messages move. The lead-gen system has a human-in-the-loop stage — after enrichment, before email generation, someone reviews the contact list and the personalized draft. That approval gate is what makes the architecture trustworthy for production revenue flows.
Now the anti-patterns. Do not reach for this playbook if you have a single linear graph that runs in under a minute. Do not use it if your workload does not need shared state across runs — if every invocation is stateless and independent, the persistence layer and the checkpoint machinery add latency with zero benefit. And do not use it if you are a one-developer prototype that has not hit production yet. Prematurely building a multi-agent fleet with circuit breakers, background drains, and Langfuse integration will slow you down before you need to be fast. The cost to operate is real: per discovered company, a full pipeline run typically lands at roughly two thousand input tokens and eight hundred output tokens, split across discovery, enrichment, scoring, and contact extraction. At DeepSeek V4 Flash prices that is well under one US cent. The pricier graphs — deep_competitor, product_intel, email_compose — sit on DeepSeek V4 Pro with thinking mode enabled and cost ten to twenty US cents per run depending on how much the model deliberates. The per-node telemetry is what makes that observable.
The right time for this architecture is when your agent fleet has to keep running while one of its arms is on fire. That is the playbook.