Campaign Engine
The durable, human-approved engine behind multi-touch outreach — drafting each touch, pausing for a thumbs-up, sleeping for days, and stopping the moment someone replies. Explained twice: first as a plain-English ladder that climbs from a five-year-old’s picture to an engineer’s, then as the system-design principles behind it.
The walkthrough
10chapters · Gist → More → Deep
1. What A Campaign Is
A campaign is like a patient pen-pal who writes one letter, waits a few days, then writes another, but only after a grown-up says it is okay to send each one.
A campaign is a slow, planned email conversation with one person that happens over days or weeks. Instead of sending just one email, it sends a series of messages — usually six — with days of waiting in between. This makes it feel thoughtful, not pushy. The campaign remembers exactly where it is in the sequence for each person and pauses before each message so a human can approve it. The main problem it solves is that you can run many of these careful conversations at once without having to remember who needs a follow-up and when.
A campaign is a stateful, reactive email sequence that runs on a per-contact basis. It consists of a fixed set of touches, defaulting to six, with inter-touch delays measured in days. The first touch fires immediately, but each subsequent touch is drafted and then held in a pending state until a human explicitly approves it. This design rejects the simpler alternative of a batch send or an automated drip campaign, which would lack the human oversight needed for personalization and quality control. The trade-off is operational overhead — each touch requires manual approval — in exchange for reliability and scalability: the system remembers the exact state for every contact, survives restarts, and allows a single person to manage hundreds of these slow, considerate conversations simultaneously without tracking deadlines manually.
A campaign is a per-contact, stateful email sequence with fixed touches and human approval.
_DEFAULT_CADENCE_DAYS = [0, 4, 7, 7, 7, 7]
_DEFAULT_MAX_TOUCHES = 6
workflow.add_conditional_edges(
"compose_touch", _after_compose_touch, ["await_approval", END]
)
workflow.add_conditional_edges(
"await_approval", _after_approval, ["send_touch", "schedule_next", END]
)
workflow.add_conditional_edges(
"schedule_next", _after_schedule, ["check_reply", END]
)
The campaign graph operates as a durable, per-contact state machine built on LangGraph with a D1 checkpointer. Its cycle begins at check_reply, which inspects the campaign_threads row for any inbound reply; if a reply exists, the status becomes "replied" and the graph terminates at END. Otherwise, control passes to compose_touch, which drafts the next email using the email_outreach subgraph grounded on the contact’s opportunity and resume. This draft is persisted with status "draft_pending" and the graph progresses to gate_draft (a shadow safety check). Under default (non‑autonomous) operation, _after_gate routes to await_approval, which issues an interrupt(kind="approval") — the thread pauses indefinitely until a human calls approveCampaignDraft to resume. On approval, _after_approval moves to send_touch, which performs the actual dispatch and updates counters via _bump_campaign_counts. After sending, schedule_next sets the next wake_at and issues an interrupt(kind="cadence"); a Cloudflare‑cron worker (run_campaign_resume_due) later resumes the thread with Command(resume=True), returning control to check_reply. On failure at any node — for instance, if send_touch encounters an error — the state’s status is set to "failed" and the graph immediately ends; no further touches are attempted.
The design preserves a strict draft‑first invariant: “no touch is ever sent without an explicit human approve/edit.” This is enforced by routing compose_touch to await_approval (bypassing send_touch) for every non‑autonomous campaign. The generation runs exactly once and its output is checkpointed, so the operator always approves the identical draft they first saw. The alternative rejected here is the autonomous mode (auto_approve=True), where _after_compose_touch sends directly to send_touch, skipping the human gate entirely. That path exists only for opt‑in verticals (e.g., uk‑universities) and uses a fail‑closed safety stack; for general campaigns, bypassing approval would risk sending ill‑personalized or inappropriate content. The cost avoided is the loss of human oversight — the risk that an automated drip could damage contact relationships or brand reputation. The trade‑off is operational overhead: each touch requires a manual review and explicit approve/edit action via approveCampaignDraft, adding latency and human effort to every step of the sequence.
A concrete failure mode is a send‑time error inside send_touch. If the dispatch call to clients.email_send.dispatch_send fails (e.g., a temporary D1 outage when reading suppression lists or a rate‑limit violation), the node sets the thread’s status to "failed" and returns END. The operator sees a row in the campaign_threads table with status='failed' for that (campaign_id, contact_id). Additionally, the _reconcile_campaign_after_terminal function runs after all threads complete; if no thread ever succeeded in sending, it updates email_campaigns.status to "stopped" (since emails_sent=0). The signal in the campaign card is a status of "stopped" instead of "running" — the campaign never moved past the draft phase, and no email was ever dispatched. This is visible in the UI or through D1 queries on the email_campaigns and campaign_threads tables.
-
run_campaign_resume_due(in_cron.py) – queries thecampaign_threadstable for rows withwake_at ≤ nowandstatus='waiting'(or'draft_pending'?); for each due thread it resumes the compiledcampaigngraph withCommand(resume=True).- reads:
campaign_threadsD1 table keys:wake_at,status,contact_id,campaign_id,sequence_step. - writes: none directly (the graph’s own state is reloaded by the checkpointer).
- branch: if no due rows, the loop is empty and the function returns; otherwise it iterates sequentially (no fan‑out).
- reads:
-
Graph re‑enters from the previous interrupt (the cadence
interrupt({"kind":"cadence", ...})that paused the thread). The state is restored from the D1 checkpointer; the next node to execute is the one that follows the interrupt – here it is thecampaign_touchnode (docstring: “Generation happens here (the node completes + checkpoints)”).- reads: every key from the checkpointed
CampaignState–contact_id,sequence_step,company_vertical,recipient_name,recipient_role,recipient_email,opportunity_id,resume_context,tone,sub_niche,max_touches,cadence_days,campaign_id. - writes: none yet; the graph simply continues.
- branch: always this path; the interrupt kind
"cadence"guarantees the next node is the generation step.
- reads: every key from the checkpointed
-
_is_campaign_eligible(contact_id)– checks campaign‑scoped suppression (allowsNULL; blocks definitive negatives).- reads:
contact_id. - writes: none (returns
bool). - branch: if
False, the thread is stopped:_upsert_thread(state, status="stopped", wake_at=None, stop_reason="not_eligible")and the node returns{"status":"stopped","stop_reason":"not_eligible"}. Happy path:True.
- reads:
-
_load_opportunity(state.get("opportunity_id"))– fetches the opportunity record ifopportunity_idis set.- reads:
opportunity_idfrom state. - writes: none (returns a dict or
None). - branch:
None→ cold‑outreach behavior; non‑None→ application‑mode (sets"application_mode": Truelater).
- reads:
-
_build_post_text(opp, state.get("resume_context"))– constructs thepost_textstring from the opportunity and resume context.- reads:
oppdict (fields not shown in provided sources),resume_context. - writes: none (returns a string).
- reads:
-
Payload construction – assembles the dict passed to the sub‑graph:
{"recipient_name", "recipient_role", "recipient_email", "post_text", "post_url", "application_mode", "tone", "company_vertical", "sub_niche", "sequence_step", "resume_context"}.- reads:
recipient_name,recipient_role,recipient_email,post_text,post_url(fromopp.get("url")),application_mode(frombool(opp)),tone(fromstate.get("tone")or"warm"),company_vertical,sub_niche,sequence_step,resume_context. - writes: this payload is passed to the sub‑graph – not stored in state yet.
- reads:
-
apply_prompt_version_tag(CAMPAIGN_TOUCH_PROMPT)– tags the trace with the campaign‑touch prompt version id.- reads:
CAMPAIGN_TOUCH_PROMPT(constant). - writes:
pv_idvariable (returned tuple). - branch: safe; returns
(pv_id, _, _).
- reads:
-
on_prompt_version_change("campaign_touch", CAMPAIGN_TOUCH_PROMPT, dataset_name=_CAMPAIGN_DATASET)– fires a regression eval if the prompt version changed (best‑effort, exception‑guarded).- reads:
_CAMPAIGN_DATASETconstant. - writes: none (telemetry side‑effect).
- branch: exceptions are caught and silently ignored.
- reads:
-
agent_run_span("campaign_touch", vertical=..., metadata=...)– starts a tracing span that wraps the LLM call; capturesrun_idfrom the span.- reads:
vertical,state.get("campaign_id"),contact_id,sequence_step,pv_id. - writes:
run_id(orNoneon failure).
- reads:
-
outreach_graph.ainvoke(payload)– delegates to the compiledemail_outreach_graphto compose the actual email draft. The sub‑graph returns adraftdict.- reads: the entire
payloadbuilt in step 6. - writes:
draftvariable (dict withsubject,body,to_email, etc.) andflaggedif the call raises. - branch: if the sub‑graph raises an exception,
flagged = Trueanddraft = {}(short‑circuit). Happy path: a validdraft.
- reads: the entire
-
State update by the
campaign_touchnode – the node returns{"draft": draft, "run_id": run_id, "flagged": flagged}(or{"status":"stopped","stop_reason":"not_eligible"}if eligibility failed).- writes:
draft,run_id,flaggedinto the graph’s state.
- writes:
-
Graph checkpoints – the framework persists the state (including the new
draft) to the D1 checkpointer.- reads: all state keys.
- writes: inserts/updates the checkpoint record.
-
Implicit
await_approvalinterrupt – the generateddraftis held; the graph yields control back to the caller with an interrupt (kind"approval"). The request ends; the cron worker receives theCommandresult and does not block.- reads: no explicit code shown, but the docstring confirms this interrupt exists.
- writes: none (the checkpoint preserves the state).
- branch: this is the terminal step for the current request – no further execution until a human approves and resumes the thread.
The subsystem spends time primarily on three activities: LLM inference (the derive_followup_point call in email_followup_graph.py, plus the inherited email_compose subgraph per touch), D1 database queries (loading full conversation history and polling due campaign threads), and the human approval wait (each touch must be manually approved before the cron resumes it). Money is spent on LLM API tokens (the make_llm(provider="deepseek", ...) call) and on the D1 query‑based batch processing. The following real performance knobs control how much time and money are consumed.
CAMPAIGN_RESUME_BATCH
- Knob —
CAMPAIGN_RESUME_BATCH(environment variable); default"25"in_cron.py:batch = int(os.environ.get("CAMPAIGN_RESUME_BATCH", "25")). - Bounds — Limits the number of
campaign_threadsrows (status'waiting'with an expiredwake_at) that are fetched and resumed per cron tick. - Effect — Increasing the batch processes more contacts per tick (higher throughput), reducing the backlog, but each run takes longer (more D1 queries and more LLM composition calls). Decreasing it lowers per‑tick cost and latency but leaves more contacts waiting.
- Risk — Too high a batch can exhaust the single‑worker queue or D1 rate limits (
_cron.pynotes “parallel calls poison single‑worker LangGraph runtimes”; Render free tier is effectively single‑worker). Too low a batch causes many ticks to make little progress, delaying follow‑ups.
DEFAULT_MAX_TOUCHES
- Knob —
DEFAULT_MAX_TOUCHES(constant incampaign_graph.py); value6. - Bounds — Controls the maximum number of touches (email sends) in a campaign sequence for a single contact.
- Effect — A higher cap increases total LLM composition calls per contact (more touches → more token spend and latency) and requires more human approvals. A lower cap reduces the per‑contact cost and shortens the campaign lifespan.
- Risk — Setting it too high can lead to excessive follow‑ups that annoy contacts or waste budget; too low may end the sequence before a reply is obtained.
DEFAULT_CADENCE_DAYS
- Knob —
_DEFAULT_CADENCE_DAYS(constant incampaign_graph.py); value[0, 4, 7, 7, 7, 7](indexiis the gap before touchi+1; touch 0 sends immediately). - Bounds — Determines the calendar‑day spacing between touches, effectively controlling how quickly the sequence unfolds.
- Effect — Shortening the gaps accelerates the campaign (faster time‑to‑reply if approved), increasing the rate of LLM calls. Lengthening them stretches the contact relationship but may reduce urgency and increase the chance of the contact losing interest.
- Risk — Too short a cadence can appear spammy and trigger suppression; too long a cadence may cause the campaign to miss the window of opportunity.
provider (LLM model choice)
- Knob — The
providerparameter passed tomake_llm()inemail_followup_graph.py:make_llm(provider="deepseek", tier="standard", temperature=0.2). Also used inainvoke_json_with_telemetrywithprovider="deepseek". - Bounds — Selects which LLM service and model tier is used for the
derive_followup_pointcall (and by inheritance theemail_composesubgraph). This trades off cost per token, latency, and output quality. - Effect — Switching to a more expensive provider (e.g., GPT‑4) increases dollar cost per touch and raises latency, but may produce more relevant follow‑up anchors. A cheaper model reduces both cost and time but risks lower‑quality output.
- Risk — A too‑cheap model may hallucinate or generate anchors that misrepresent the thread, wasting the human reviewer’s time. An overly expensive model inflates the budget without measurable benefit.
concurrency (bulk classify)
- Knob — The
concurrencykey passed inside theainvokepayload forcountry_classify_bulk(see_cron.py):{"concurrency": 8, "apply": True, "only_sales_tech": True}. - Bounds — Controls the number of parallel classify tasks running under an
asyncio.Semaphoreinside thecountry_classify_bulkgraph. - Effect — Higher concurrency speeds up bulk classification (more rows classified per wall‑clock second) but increases simultaneous D1 reads and internal resource usage. Lower concurrency reduces peak load and potential for database contention.
- Risk — Setting concurrency too high may exceed D1’s capacity and cause timeouts or
too many concurrent requestserrors. Too low makes the nightly cron run longer than necessary.
limit (remote classify)
- Knob — The
limitkey in theremote_classifygraph invocation:await graph.ainvoke({"limit": 2000}, ...). - Bounds — Caps the number of D1 opportunities scanned in a single
run_remote_classifycron execution. - Effect — A larger limit covers more opportunities (better data completeness) but increases processing time and memory usage (the graph runs a pure rule‑based scan, so no LLM cost). A smaller limit finishes faster but may miss many rows.
- Risk — Too high a limit may cause a long‑running operation that times out or blocks other cron jobs. Too low a limit means many opportunities remain unclassified until a later run.
1. D1 failure during campaign eligibility check
- Trigger – Any D1 transient (timeout, connection drop, row-lock) when
_load_contactquery runs SELECT oncontactsforemail,outreach_eligible,do_not_contact. - Guard – The
except Exception as excclause in the eligibility-check logic (no function name shown) that logs"campaign eligibility check failed contact_id=%s (fail-open): %s"and unconditionally returnsTrue. - Posture – fail-soft – the gate stays open (allows the send through) even though the real contact state is unknown. A definitive
do_not_contactoroutreach_eligible=0may be silently overridden. - Operator signal – A
WARNING-level log line containing the exact string"campaign eligibility check failed contact_id=<id> (fail-open): <exception>". - Recovery – No retry. The fallback value
Trueis used immediately. The human approving the draft still sees the contact, so the error is latent unless the contact later bounces or replies. Manual verification of the contact record is required.
2. D1 failure during opportunity load
- Trigger – A transient D1 error when
_load_opportunityruns SELECT onopportunitieswith a LEFT JOIN oncompanies. - Guard – The
except Exception as excclause inside_load_opportunitythat logs"campaign _load_opportunity failed id=%s: %s"and returnsNone. - Posture – fail-soft – the opportunity context is dropped (no title, application status, company name). The campaign touch proceeds as a cold-outreach without opportunity grounding.
- Operator signal – A
WARNING-level log with"campaign _load_opportunity failed id=<opp_id>: <exception>". - Recovery – No retry. The fallback
Noneis used. The composed email may lack personalization. Human approvers should spot the missing context.
3. D1 failure while bumping campaign counters
- Trigger – A D1 update failure in
_bump_campaign_countswhen writing toemail_campaigns.emails_sent,emails_failed, oremails_scheduled. - Guard – The
except Exception as excclause inside_bump_campaign_countsthat logs"campaign _bump_campaign_counts failed: %s". - Posture – fail-soft – the send path continues uninterrupted; the counter write is best-effort. Campaign dashboard counters become stale (under-count sent/ failed, over-count scheduled).
- Operator signal – A
WARNING-level log:"campaign _bump_campaign_counts failed: <exception>". - Recovery – No retry. The inconsistency must be manually reconciled via a re-count or a separate correction script. No automatic mechanism corrects it.
4. D1 failure during campaign compose (outreach_graph.ainvoke)
- Trigger – A transient failure (or LLM timeout) when
outreach_graph.ainvoke(payload)is called inside the compose node. - Guard – The
except Exception as excclause that logs"campaign compose failed contact_id=%s: %s", setsflagged = Trueanddraft = {}. - Posture – fail-soft – the node returns an empty draft and a flagged state. The graph continues (the draft is empty and will likely be rejected downstream), but the contact thread may move forward without a draft.
- Operator signal – A
WARNING-level log:"campaign compose failed contact_id=<contact_id>: <exception>". The absence of a draft in the database for that touch will also be visible. - Recovery – No retry. The human must manually re-approve or re-trigger composition. The
flaggedboolean may be used by later logic (not shown in source) to skip or alert.
5. D1 failure during terminal reconciliation
- Trigger – A D1 error in
_reconcile_campaign_after_terminalwhen running the aggregate query or the UPDATE to finalize the campaign. - Guard – The
except Exception as excclause that logs"campaign reconcile-after-terminal failed cid=%s: %s". - Posture – fail-soft – the reconcile is best-effort; the cron tick does not abort. The campaign card remains
emails_scheduled> 0 andstatus = 'running'even though all threads are terminal. - Operator signal – A
WARNING-level log:"campaign reconcile-after-terminal failed cid=<campaign_id>: <exception>". The campaign will appear stuck in "running" state permanently. - Recovery – No automatic retry. Manual intervention is needed to update the campaign status and zero the scheduled counter.
6. D1 failure in cron resume-due query
- Trigger – A D1 error when
run_campaign_resume_dueruns SELECT oncampaign_threadsfor due rows. - Guard – The
except Exception as excclause that logs"campaign-resume-due: due query failed"and returns{"ok": False, "job": "campaign-resume-due", "error": "..."}. - Posture – fail-soft (degraded) – the current cron tick for that job returns an error but does not crash the scheduler. No threads are resumed for that tick; the next 5-minute tick will re-qualify.
- Operator signal – An
ERROR-level log:"campaign-resume-due: due query failed"(vialog.exception). The cron response contains"ok": Falsewith an error field. - Recovery – No retry within the tick. The error is transient-only; the next scheduled cron tick automatically retries the query with fresh D1 state. If the error persists, manual investigation of D1 connectivity is required.
Here are four interview-style Q&A pairs about the campaign subsystem, grounded strictly in the provided source code and documentation.
Q1 (warm-up):
How does a campaign define its sending cadence and maximum number of touches?
- A: The defaults are set at the module level as
_DEFAULT_CADENCE_DAYS = [0, 4, 7, 7, 7, 7]and_DEFAULT_MAX_TOUCHES = 6. The first element of the cadence list (cadence_days[0]) is unused because the very first touch fires immediately. Theschedule_nextfunction reads these values from state (or falls back to the defaults) and determines the next step and itswake_atdelay. - Follow-up: Why is
cadence_days[0]left unused rather than starting at index 0? - A: The source explicitly states “cadence_days[0] is unused (touch 0 sends immediately)” — the list structure makes the gap between touches natural; after touch 0, the delay before touch 1 is
cadence_days[1](4 days). - Weak answer misses: The fact that the first touch is not delayed and that
cadence_days[0]is purposefully ignored.
Q2 (design question):
Why does the system force a human approval interrupt on every touch instead of using a fully automated drip campaign?
- A: The design rejects automated drip because each touch must have human oversight for personalization and quality control. The
compose_touchnode generates the draft and then immediately raises aninterruptwithkind="approval", pausing the thread until the UI callsapproveCampaignDraft. This trade-off trades automation for careful, human-reviewed content at the cost of operational overhead. - Follow-up: What prevents the cron job from accidentally sending a draft that is still pending approval?
- A: The cron (cadence resume) only acts on rows with
status='waiting', while touches held for approval havestatus='draft_pending'; the cron strictly ignores that state, as documented in the source: “the cron only resumesstatus='waiting'rows, so adraft_pendingthread can't be auto-sent.” - Weak answer misses: The explicit status distinction (
draft_pendingvswaiting) and the cron filter on'waiting'only, not the mere existence of an interrupt.
Q3 (medium):
How does the schedule_next node ensure the sequence stops after the maximum number of touches, and what outcome does it record?
- A:
schedule_nextchecks ifnext_step >= max_touches(default 6). If so, it setsstatus="completed"and posts a no-reply outcome viarecord_outcome_feedback(last_touch_run_id, "reply_outcome", 0.0). Thelast_touch_run_idis captured earlier duringcompose_touchinside anagent_run_span. The state update returns{"status": "completed", "stop_reason": "sequence_complete"}. - Follow-up: How does the system guarantee that an exhausted sequence is not resumed by a stale wake_at?
- A: After marking
completed,_upsert_threadis called withwake_at=Noneandstatus='completed', so no future cron query will ever select that thread. - Weak answer misses: The use of
last_touch_run_idto tie the no-reply evaluation back to the specific LLM generation run, and the explicitwake_at=Nonewrite to prevent re-selection.
Q4 (hard):
When a workflow is interrupted for human approval and later resumed, how does the system avoid regenerating the draft that the operator already saw?
- A: The
compose_touchnode completes all generation before raising theinterrupt(kind="approval"). Because LangGraph checkpoints after the node finishes, the draft is persisted in the state. On resume, the execution picks up after the interrupt — the node does not re-run — so the operator approves exactly the draft they reviewed. The source states: “Generation happens here (the node completes + checkpoints) so the separateawait_approvalinterrupt can re-run on resume WITHOUT regenerating.” - Follow-up: How does the follow-up email graph (
email_followup_graph.py) similarly avoid regenerating a draft on a cadence resume, given it uses a different pattern? - A: The follow-up graph does not use an approval interrupt — it delegates to the
email_composesubgraph insidecompose, which runs every time because the follow-up point is re-derived each cycle. The cadence resume inschedule_nextre-runs from theinterrupt("cadence")point, re-callingcompose_touchwhich generates a fresh draft each time. - Weak answer misses: The crucial difference that the campaign graph’s
compose_touchnode finishes before the interrupt, whereas a naive design might regenerate if the interrupt happened mid-generation.
2. Why A Durable Thread
It is like a patient pen-pal who writes one letter, waits for a reply, then writes the next, and never forgets where they stopped because they keep a diary of every letter sent.
This chapter explains why the campaign engine uses a durable thread instead of a simple loop. A durable thread is like a patient pen-pal who writes one letter, then waits days for a reply before writing the next. The pen-pal keeps a diary—a database—to record exactly which letter was sent and what happens next. If the pen-pal gets interrupted, the diary helps them pick up right where they left off. This solves the problem of losing progress during a crash or restart, because the state is saved in the diary, not just in memory.
The core insight is that a long-running email sequence must survive process restarts, so state cannot live in volatile memory. The engine implements each contact's sequence as a durable thread: a named, resumable conversation identified by a composite key of campaign and contact, with each step checkpointed to a database. The rejected alternative is a simple in-memory loop that sleeps between touches, which loses all in-flight state—which touch, when next is due, what was sent—on deploy, crash, or restart. The trade-off is added machinery for checkpointing and resuming versus a simpler loop, but this buys the guarantee that the durable thread, not any running process, is the authoritative source of truth for the sequence.
Durable threads checkpoint each step to D1 so the sequence survives process restarts, with the thread ID encoding the campaign-contact pair.
async def _upsert_thread(
state: CampaignState,
*,
status: str,
wake_at: str | None,
pending: dict[str, Any] | None = None,
critical: bool = False,
stop_reason: str | None = None,
) -> None:
thread_id = f"campaign-{state.get('campaign_id')}-{state.get('contact_id')}"
try:
await d1_run(
"INSERT INTO campaign_threads "
"(id, campaign_id, contact_id, recipient_email, status, next_step, wake_at, "
" last_touch_at, opportunity_id, stop_reason, pending_subject, pending_text, pending_html, "
" pending_draft_run_id, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) "
"ON CONFLICT(id) DO UPDATE SET status=excluded.status, next_step=excluded.next_step, "
" wake_at=excluded.wake_at, last_touch_at=excluded.last_touch_at, "
" stop_reason=excluded.stop_reason, "
" pending_subject=excluded.pending_subject, pending_text=excluded.pending_text, "
" pending_html=excluded.pending_html, pending_draft_run_id=excluded.pending_draft_run_id, "
" updated_at=CURRENT_TIMESTAMP",
[thread_id, state.get("campaign_id"), state.get("contact_id"), …],
)
except Exception as exc:
if critical:
raise
The durable thread subsystem sequences each contact’s follow‑up through a LangGraph state graph defined in email_followup_graph.py. Execution begins at the hydrate node (loading contact, company, and company_facts from D1), then load_full_history fetches every prior sent and received email for that contact. The safety_gate node runs next as a conditional edge: if the contact is suppressed, has a do_not_contact flag, or has already replied, the graph produces a skip_reason and terminates early. Otherwise, derive_followup_point makes a single LLM call (provider deepseek, temperature 0.2) to distill the full thread into an explicit follow‑up anchor, and the compose node delegates to the compiled email_compose subgraph (via ainvoke) grounded on that anchor. The entire run is checkpointed to the D1‑backed checkpointer (langgraph-checkpoint-cloudflare-d1) so that a process restart can resume from the last checkpoint.
The subsystem preserves the invariant of a durable thread: each contact’s progression through the graph is identified by a composite key (campaign + contact, implemented as a stable thread_id) and every node execution writes a checkpoint row to the checkpoints table. This guarantees that after any deploy, crash, or SIGKILL, the engine can reload the exact state—which touches have been composed, what was sent, what the follow‑up point is—without loss. The design rejects the obvious alternative of an in‑memory loop that sleeps between touches, which would lose all in‑flight state on restart and require manual reconciliation. The cost avoided is unbounded re‑sending or missed sends; the added machinery of a state graph, D1 checkpointer, and the cron‑based cleanup in _cron.py (which prunes non‑resumable threads and keeps only the latest checkpoint per (thread_id, checkpoint_ns)) buys production reliability at the expense of architectural complexity.
A concrete failure mode occurs when the D1 instance backing the checkpointer becomes unreachable or has its CHECKPOINTER_D1_DATABASE_ID environment variable unset. In that case, the checkpointer raises a ValueError before the first node runs because it cannot mint a thread_id context. An operator would see the error logged by the cron wrapper (the _config function returns {"ok": False, "error": "CHECKPOINTER_D1_DATABASE_ID not set"}) and the graph run would silently fail to produce a draft, leaving no emails row. Because the engine is designed as best‑effort—D1 outages in safety_gate degrade to a skip rather than a hard failure—this checkpointer failure is the one case that stops the entire thread cold, giving a clear operational signal to fix the D1 binding.
-
cron query due threads
_cron.py(anonymous function, likely insiderun_campaign_resume) queries D1:SELECT id, campaign_id, contact_id FROM campaign_threads WHERE status = 'waiting' AND wake_at IS NOT NULL AND wake_at <= ? ORDER BY wake_at ASC LIMIT ?- reads / writes — reads
status,wake_at,id,campaign_id,contact_idfromcampaign_threads; writes nothing yet. - branch — if no rows match, the cron returns immediately (empty path). Happy path proceeds to iterate over
duerows.
-
claim the waiting thread
- Cron runs
UPDATE campaign_threads SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=?(shown in_cron.pysnippet). - reads / writes — writes
status='running',updated_atoncampaign_threads. - branch — if the UPDATE silently fails (DB error), the row remains
waitingand will be retried on the next tick; happy path continues.
- Cron runs
-
resume the graph from the previous cadence interrupt
- Cron calls
graph.ainvoke(Command(resume=...))with the thread‑id and the saved state from LangGraph’s checkpoint store. - reads / writes — LangGraph reads the checkpointed state (keys:
contact_id,campaign_id,sequence_step,status,cadence_days,max_touches,last_touch_run_id,recipient_name,recipient_role,recipient_email,opportunity_id,resume_context,company_vertical, etc.). No D1 write here. - branch — if the checkpoint store is lost, the resume fails; happy path continues to the next node.
- Cron calls
-
apply schedule_next’s deferred state update
- The graph resumes inside
schedule_next(the node that issued theinterrupt). The state dict{"sequence_step": next_step, "status": "running"}thatschedule_nextreturned on the prior run is now applied to the graph state. - reads / writes — the previous
sequence_step(from checkpoint) is incremented;statuschanges fromwaitingtorunning. - branch — no branch here (the node already decided on the path before the interrupt); the graph flows to the next node in the graph definition.
- The graph resumes inside
-
check eligibility (campaign‑scoped gate)
- Node
check_eligibility(implied by the_is_campaign_eligiblecall in thecompose_touchdescription) evaluates the contact. It calls_is_campaign_eligible(contact_id). - reads —
contact_idfrom state; writes nothing yet. - branch — if
_is_campaign_eligiblereturnsFalse, the node upsertscampaign_threadswithstatus='stopped',stop_reason='not_eligible'and returns{"status": "stopped", "stop_reason": "not_eligible"}(terminal). Happy path passes through to the next node.
- Node
-
load opportunity and build post_text
- Node
compose_touchcalls_load_opportunity(state.get("opportunity_id"))and_build_post_text(opp, state.get("resume_context")). - reads —
opportunity_id,resume_contextfrom state; writespost_textinto local variable (eventually passed to the subgraph). Optionally readsrecipient_name,recipient_role,recipient_email,company_vertical. - branch — if
opportunity_idis missing or the DB query fails,_build_post_textmay produce an empty string; the subgraph may then default to cold‑outreach behavior. No early return here; always proceeds.
- Node
-
invoke the email_outreach subgraph
compose_touchcallsoutreach_graph.ainvoke(...)inside anagent_run_span. The payload includesrecipient_name,recipient_role,recipient_email,post_text, and optionallypost_url.- reads / writes — the subgraph reads state keys above and writes
subject,body,prompt_tokens,completion_tokens,modelinto the main graph state (as part of the returned dict). Also stampslast_touch_run_idwith the LangSmith run id. - branch — if the subgraph throws an exception,
compose_touchlogs it and may flag the touch into the annotation queue (error path). Happy path returns the generated draft.
-
upsert the draft into emails table (implicit in compose_touch)
compose_touch(or a sub‑node after it) writes a row in theemailstable withstatus='draft',to_email,subject,body,parent_email_id,prompt_version, etc.- reads / writes — writes
emailsrow; readsrecipient_email,subject,bodyfrom state. - branch — if the upsert fails (D1 outage), the draft is lost but the graph continues (no fail‑hard).
-
advance the sequence (schedule_next)
- Node
schedule_nextexecutes again. It readssequence_step,cadence_days,max_touchesfrom state. - reads —
sequence_step,cadence_days,max_touches; writes nothing yet. - branch —
if next_step >= max_touches:→ sequence complete. Happy path (not complete) continues: upsert thread aswaitingwith newwake_at, theninterrupt({"kind": "cadence", ...}). If complete, records no‑reply feedback viarecord_outcome_feedbackand upsertsstatus='completed'.
- Node
-
record no‑reply feedback (only on terminal touch)
- If the sequence is exhausted,
schedule_nextcallsrecord_outcome_feedback(run_id, "reply_outcome", 0.0). - reads —
last_touch_run_idfrom state; writes feedback via LangSmith API. - branch — this step runs only on the
completedbranch; on the cadence branch it is skipped.
- If the sequence is exhausted,
-
upsert thread as completed or waiting
schedule_nextcalls_upsert_thread(state, status="completed", wake_at=None)(terminal) or_upsert_thread({...}, status="waiting", wake_at=wake_at, critical=True)(cadence).- writes — updates
campaign_threadsrow:status,wake_at,sequence_step,updated_at. - branch — respective of the outcome.
-
interrupt (cadence branch)
interrupt({"kind": "cadence", "wake_at": wake_at, "next_step": next_step})pauses the graph. The cron will resume this thread later.- reads / writes — only the interrupt payload becomes part of the checkpoint; no DB write here (the upsert already wrote the
waitingrow). - branch — only on the cadence path; the terminal path reaches END without interrupt.
-
graph returns terminal state
- For the
completedbranch,schedule_nextreturns{"status": "completed", "stop_reason": "sequence_complete"}and the graph reaches END. The cron sees the response and does not re‑select this thread. - reads / writes — final state written to checkpoint store.
- branch — terminal path only.
- For the
CAMPAIGN_RESUME_BATCH
- Knob — Environment variable
CAMPAIGN_RESUME_BATCH, default"25". - Bounds — Limits the number of
campaign_threadsrows that are claimed per cron tick (5‑minute cycle). - Effect — Higher values process more due‑threads per tick, reducing average latency for a touch to be sent; lower values spread the load across ticks, increasing latency but lowering peak D1 write pressure.
- Risk — Too high can cause a single tick to DDoS the D1 queue or exceed the tick’s time budget; too low leaves threads waiting for multiple ticks, delaying the sequence and increasing time‑to‑first‑reply.
country_classify concurrency
- Knob — Inline parameter
concurrency: 8passed tobulk.ainvoke()inrun_country_classify_nightly. - Bounds — Governs how many company‑classification LLM calls run in parallel under the bulk graph’s internal
asyncio.Semaphore. - Effect — Higher concurrency reduces elapsed time for a nightly run (esp. with many unclassified rows), but increases peak token consumption and D1 read/write contention.
- Risk — Set too high and the graph may hit D1’s concurrent‑query limit or saturate the free‑tier LLM rate limit; too low makes the run drag, potentially overlapping with the next hourly cron and causing resource contention.
_DEFAULT_MAX_TOUCHES
- Knob — Constant
_DEFAULT_MAX_TOUCHES = 6incampaign_graph.py. - Bounds — Caps the number of touches (email sends) per campaign thread before it is marked terminal.
- Effect — Higher max touches increases the total LLM calls and sends per contact, raising both throughput (more touches sent) and dollar cost (more prompt/completion tokens). Lower values truncate sequences, saving money but potentially leaving leads unconverted.
- Risk — Too high can burn budget on unresponsive contacts; too low may abort campaigns prematurely, wasting already‑spent LLM cost for early touches.
_DEFAULT_CADENCE_DAYS
- Knob — Constant
_DEFAULT_CADENCE_DAYS = [0, 4, 7, 7, 7, 7]incampaign_graph.py. - Bounds — Defines the day gaps between consecutive touches (index i = gap before touch i+1). Affects how quickly the thread exhausts its touches.
- Effect — Shorter gaps (e.g., reduce 7 to 3) increase sending frequency, reducing time‑to‑next‑touch but raising daily send volume and LLM cost per day. Longer gaps smooth out throughput but extend the campaign lifecycle.
- Risk — Too aggressive may hit per‑domain caps or trigger spam filters; too slow risks losing context with the contact.
daily per‑domain caps
- Knob — Referenced as
daily / per-domain capsin the safety stack reused frompipeline_graph(exact numeric value not shown, but a real limit enforced byclients.email_send.dispatch_send). - Bounds — Limits the number of emails sent to the same domain per rolling day, preventing abuse and mailbox‑provider throttling.
- Effect — Lower caps force sending to spread across multiple days, increasing overall sequence latency; higher caps allow denser bursts, improving throughput at the risk of being flagged as spam.
- Risk — Set too low and touches are queued indefinitely, breaking cadence; too high and the sending domain may be blacklisted, permanently degrading deliverability.
1. D1 Query Failure on Due Select
- Trigger — Transient D1 unavailability or timeout when
run_campaign_resume_duequeriescampaign_threadsfor due rows. - Guard — The broad
except Exception as excclause that catches the query failure and logs vialog.exception("campaign-resume-due: due query failed"). - Posture — Fail-soft: the cron tick returns
{"ok": False, "error": f"{type(exc).__name__}: {exc}"}but does not abort the process; no threads are resumed this tick. - Operator signal — The exact log line
"campaign-resume-due: due query failed"in the system log, and the JSON response from the cron endpoint carries an"error"field. - Recovery — The cron runs periodically (every 5 minutes by configuration); the next tick retries the query naturally. No manual intervention needed if the D1 outage is transient.
2. Claim Update Failure (Per-Thread)
- Trigger — D1 write error when the cron attempts the atomic
UPDATE campaign_threads SET status='running'to claim a due thread. - Guard — The un-named
except Exceptionblock inside thefor row in due:loop inrun_campaign_resume_due. Based on the pattern of other functions in the codebase, this catch logs a warning and appends the error to theerrorslist. - Posture — Fail-soft: that specific thread is skipped during this tick; it remains in
'waiting'status with its originalwake_at, so the next cron tick will try to claim it again. - Operator signal — A log warning (anticipated pattern:
"campaign-resume-due: claim failed for thread <tid>: <exc>") and theerrorslist in the cron response will contain this thread's error. - Recovery — Automatic retry on the next cron tick. No manual step unless the D1 write error persists across many ticks; in that case, operator would investigate D1 health.
3. Graph Resume Failure (Campaign Graph Not Compiled)
- Trigger — The dictionary
graphsdoes not contain the key"campaign"(graph never compiled, configuration error, or startup issue). - Guard — The explicit
if graph is None: return {"ok": False, "job": "campaign-resume-due", "error": "campaign graph not compiled"}early return inrun_campaign_resume_due. - Posture — Fail-hard: the entire cron tick immediately stops, no threads are resumed, and the error is surfaced to the caller.
- Operator signal — The returned JSON includes
"error": "campaign graph not compiled"and the cron job will show a non-ok status. No log is written beyond the return. - Recovery — Manual: ensure the campaign graph is compiled and registered in the
graphsdictionary, then redeploy or restart the worker. Subsequent cron ticks will succeed.
4. Reconciliation Failure After Terminal
- Trigger — D1 write error when
_reconcile_campaign_after_terminalattempts to updateemail_campaigns(resetemails_scheduledand flipstatusto'completed'or'stopped'). - Guard — The
except Exception as excclause inside_reconcile_campaign_after_terminal, which logslog.warning("campaign reconcile-after-terminal failed cid=%s: %s", campaign_id, exc). - Posture — Fail-soft: the campaign card is not finalized. It remains in
'running'status with a staleemails_scheduledcount. No further automatic reconciliation is attempted because no active thread exists to trigger it. - Operator signal — The log warning
"campaign reconcile-after-terminal failed cid=<id>: <exc>"is the only indicator; the campaign stays'running'indefinitely in the UI. - Recovery — Manual: the operator must run an ad-hoc query or script to finalize the campaign’s status and zero out its scheduled count. If the D1 write issue resolves, a future cron tick could be forced by temporarily setting a thread back to waiting, but that is not automated.
5. Counter Bump Failure (Best-Effort Count Drift)
- Trigger — D1 write error when
_bump_campaign_countsupdatesemail_campaigns.{emails_sent, emails_scheduled, emails_failed}after a send or failure. - Guard — The
except Exception as excclause inside_bump_campaign_counts, logginglog.warning("campaign _bump_campaign_counts failed: %s", exc). The function is explicitly best-effort and will never abort the send path. - Posture — Fail-soft: the send path continues normally, but the database counters diverge from reality (e.g.,
emails_sentunder-counts,emails_scheduledmay stay non-zero). - Operator signal — The log warning
"campaign _bump_campaign_counts failed: <exc>". Over time, campaign cards in the UI show inaccurate counts. - Recovery — None built in. The counters are not critical for correctness (the durable thread state in
campaign_threadsis authoritative). Manual correction via DB query or a periodic reconciliation script would be needed to restore accuracy.
Here are four interview-style Q&A pairs about the durable thread subsystem, grounded in the provided source files. Each pair includes a follow-up probe and the key detail a shallow answer would miss.
1. Warm-up / Foundational
Q
How does the campaign graph ensure that a contact’s sequence survives a server restart or a deploy?
A
Every time a touch is composed, the schedule_next node writes the next wake‑at time and status='waiting' into the database via _upsert_thread with critical=True. The graph then pauses using interrupt(kind="cadence", ...). A separate cron job (_cron.py) repeatedly polls for status='waiting' rows with wake_at <= now, resumes the interrupted thread by issuing a Command, and the graph continues from the saved state. Because the state is checkpointed to durable storage, any process crash only delays the next resume; no in‑flight data is lost.
Follow-up
What happens if the database write in schedule_next silently fails?
A
The _upsert_thread call uses the keyword argument critical=True, meaning the function re‑raises any exception instead of swallowing it. This prevents the thread from stalling forever without a wake_at or status update.
Weak answer misses
A shallow answer might say “the graph writes to a database” but omit that critical=True turns a silent failure into a hard crash, which is the deliberate safeguard against orphaned threads.
2. Design Decision: Why DB checkpointing instead of an in‑memory timer?
Q
Why implement a database‑backed durable thread with a cron resume instead of just sleeping in an in‑memory loop between touches? The in‑memory loop seems simpler.
A
An in‑memory loop would lose all in‑flight state—which touch number, what wake_at was calculated, which email was composed—on any process restart, deploy, or crash. The graph’s design explicitly rejects that: schedule_next persists the next step and wake time to D1 before the interrupt call, and the cron (_cron.py) re‑hydrates the thread by reading the database row. The extra machinery (checkpoint + cron) is the trade‑off for making the sequence survive arbitrary restarts, which is a hard requirement for a production B2B outreach system.
Follow-up
What prevents the cron from resuming the same thread twice on the same tick?
A
The cron immediately updates the row’s status to 'running' (UPDATE campaign_threads SET status='running') before any graph logic runs, so the next tick’s SELECT won’t see that row again.
Weak answer misses
A naive answer would say “the cron reads wake_at <= now” but doesn’t explain the status‑based claiming ('waiting' → 'running'), which is the actual idempotency mechanism.
3. Interrupt Types: Why two kinds, and how are they handled differently?
Q
The campaign_graph docstring mentions two interrupt kinds: approval and cadence. Why have two, and what ensures they are never confused?
A
An approval interrupt pauses the graph waiting for a human to review and approve a draft via the approveCampaignDraft resolver. A cadence interrupt is the timed pause between touches, resumed by the cron. The two are kept separate because the cron (_cron.py) only resumes rows with status='waiting'; a thread interrupted for approval has status='draft_pending' and thus will never be auto‑sent. This design prevents a draft‑pending thread from being accidentally sent by the cron.
Follow-up
How does the graph know which interrupt kind to use in schedule_next?
A
schedule_next always uses interrupt({"kind": "cadence", ...}); the approval interrupt is issued by a different node (not shown in the excerpt) that is called only when a human must review the composed touch.
Weak answer misses
A shallow answer might say “there are two kinds” without explaining the status discrimination ('waiting' vs. 'draft_pending') that is the actual guard.
4. Hard: Edge‑to‑Edge Resilience and the “No‑Reply” Outcome
Q
When a campaign sequence exhausts all touches without a reply, the thread should terminate gracefully. Walk through the exact graph flow that posts a no‑reply outcome and marks the thread as completed, ensuring no further cron attempts will pick it up.
A
In schedule_next, if next_step >= max_touches, the node posts a reply_outcome feedback via record_outcome_feedback (using the last touch’s run ID) and then calls _upsert_thread(state, status="completed", wake_at=None). Because wake_at is None, the cron’s query (WHERE status = 'waiting' AND wake_at IS NOT NULL) will never select this thread. The state update also sets status: "completed" and stop_reason: "sequence_complete", so the graph terminates at END on the next resume (which never comes, or is a no‑op if resumed accidentally).
Follow-up
What if record_outcome_feedback fails? Does the thread still terminate?
A
The failure is caught and logged with a warning (log.warning("campaign no-reply feedback failed: %s", exc)), but the _upsert_thread with status="completed" still runs because the exception handler does not re‑raise. The thread completes anyway; only the telemetry is lost.
Weak answer misses
A superficial answer would say “the thread is marked completed” but omit that wake_at=None is the critical field that removes the row from the cron’s poll set, and that the feedback failure is non‑fatal.
3. Draft First, Always
It's like a robot that writes a letter for you but then puts it in a drawer and waits for you to say 'okay, send it' before it ever mails it.
The campaign engine's most important rule is that it always writes a message but never sends it by itself. When it's time for a new email, the engine creates a draft using what it knows about the person and then stops, marking it as 'waiting for review.' This draft-first approach exists because sending the wrong message to a real customer can damage trust, and you can't take it back. The trade-off is that the campaign can't run completely on its own, but that's okay because having a human check each message is actually the main benefit.
The core design constraint is that the engine always composes a draft but never dispatches it without explicit human authorization. When a scheduled touch matures, a compose step invokes the outreach engine to generate the message body and subject line, leveraging known contact data, then persists that draft in a pending state and marks the thread as awaiting review. The rejected alternative would be an auto-send pipeline that dispatches immediately, which risks irreversible damage from a mistimed or off-tone message. The trade-off is that full automation is sacrificed — the campaign cannot run headless — but this is accepted because human judgment on each outbound touch is the feature, not a limitation, making the safe state the default.
The engine always composes a draft but never dispatches it without explicit human authorization.
async def compose_touch(state: CampaignState) -> dict[str, Any]:
"""Generate this touch (grounded in opportunity + resume) and HOLD it as a
draft for human review — it does NOT send."""
# … generation logic …
pending = {
"subject": draft.get("subject"),
"text": draft.get("text"),
"html": draft.get("html"),
"run_id": run_id,
}
await _upsert_thread(state, status="draft_pending", wake_at=None, pending=pending)
return {
"status": "draft_pending",
"pending_subject": draft.get("subject"),
"pending_text": draft.get("text"),
"pending_html": draft.get("html"),
"pending_draft_run_id": run_id,
}
async def await_approval(state: CampaignState) -> dict[str, Any]:
"""Human-in-the-loop gate: pause until an operator decides on the held draft."""
decision = interrupt({
"kind": "approval",
"step": int(state.get("sequence_step") or 0),
"subject": state.get("pending_subject"),
"text": state.get("pending_text"),
"html": state.get("pending_html"),
"run_id": state.get("pending_draft_run_id"),
})
action = "approve"
if isinstance(decision, dict):
action = str(decision.get("action") or "approve").lower()
# … handle action: approve, edit, reject, skip …
The system enforces a "draft-first" constraint: every scheduled touch first passes through compose_touch, which invokes the outreach LLM to generate a subject and body grounded in contact data. The result is immediately persisted via _upsert_thread with a status="draft_pending" and the draft stored in pending_* fields—no dispatch occurs at this stage. The thread then enters a human‑review interrupt (the await_approval boundary in the graph, or the generateContactFollowup resolver for follow‑ups). An optional shadow gate, gate_draft, runs the agent_eval multi‑level verdict in‑process and records it to D1, but the interrupt always fires unless AGENT_EVAL_AUTOAPPROVE=1 and every bar is cleared. The operator’s decision (approve, edit, reject, skip) is handled inside gate_draft: on approve the thread is moved to "running" and the draft is sent; on reject it becomes "stopped" with stop_reason="draft_rejected"; on edit the pending_* fields are overridden before sending. The entire flow guarantees that the graph never sends without explicit human authorization—this is the core invariant, named explicitly in email_followup_graph.py and enforced in campaign_graph.py by default.
The design sacrifices full automation in favor of a safety guarantee that no message is irreversibly dispatched before a human validates its tone, timing, and content. The obvious rejected alternative is the autonomous mode (auto_approve=True), which routes compose_touch straight to send_touch, skipping the human interrupt entirely. That mode is permitted only as an opt‑in per campaign, and even then it is wrapped in a fail‑closed stack: strict eligibility (pipeline gate requiring outreach_eligible=1), per‑vertical daily caps (CAMPAIGN_VERTICAL_DAILY_CAP, default 20), and a redundant suppression re‑check immediately before dispatch. By rejecting full auto‑send as the default, the draft‑first approach avoids the cost of irreversible reputation damage from a mistimed or off‑tone message—a risk that would otherwise require expensive real‑time guardrails and constant monitoring. The trade‑off is acceptable because campaigns tolerate latency (the human review window) far better than they tolerate a single bad send.
A concrete failure mode is a compose_touch execution that fails to produce a draft—for example, because the LLM call errors or required contact data is missing. The graph catches this and calls _upsert_thread with status="failed" and stop_reason="no_draft". The failure is also flagged into the annotation queue via the error‑handling block that calls queue_flagged_run with score=0.0. An operator monitoring the campaign sees the thread marked "failed" in the campaign UI, with the stop reason visible, and the annotation queue contains a flagged run that a human can inspect. Because the system never sent anything, the only impact is a missed touch—easily recoverable by re‑scheduling or manual override. This failure mode highlights the resilience of the draft‑first design: even when generation fails, no erroneous email is dispatched, and the operator has clear signals to act.
-
The cron job calls ainvoke on the compiled campaign graph, passing a state dict that includes
contact_id,opportunity_id,sequence_step,recipient_name,recipient_role,recipient_email,resume_context, andcadence_days.- reads / writes: reads
contact_id,opportunity_id,sequence_step,recipient_name,recipient_role,recipient_email,resume_context,cadence_daysfrom the input state; writes nothing yet (entry point). - branch: The cron only resumes threads with
status='waiting'; happy path proceeds to the first graph node.
- reads / writes: reads
-
The graph enters the compose_touch node (
async def compose_touch(state: CampaignState) -> dict[str, Any]).- reads / writes: reads
statekeys:contact_id,opportunity_id,sequence_step,recipient_name,recipient_role,recipient_email,resume_context,company_vertical,campaign_id; writes nothing initially. - branch: no early return yet; happy path continues.
- reads / writes: reads
-
Inside
compose_touch, the eligibility gate calls await _is_campaign_eligible(contact_id).- reads / writes: reads
contact_id; writes nothing (returns boolean). - branch: happy path returns
True; ifFalse, the function returns{"status": "stopped", "stop_reason": "not_eligible"}and the graph terminates (no draft created).
- reads / writes: reads
-
compose_touchcalls await _load_opportunity(state.get("opportunity_id")) to load the opportunity record from D1.- reads / writes: reads
opportunity_idfrom state; returns an opportunity dict (orNone). - branch: opportunity may be
None(no opportunity), which affects later payload field"application_mode"—happy path uses the loaded dict.
- reads / writes: reads
-
compose_touchcalls _build_post_text(opp, state.get("resume_context")) to construct thepost_textstring.- reads / writes: reads
opp(the opportunity dict orNone) andresume_contextfrom state; returns a string. - branch: no branch; always returns a string (possibly empty).
- reads / writes: reads
-
compose_touchbuilds the payload dict with exact keys:recipient_name,recipient_role,recipient_email,post_text,post_url,application_mode(True ifoppexists),tone,company_vertical,sub_niche,sequence_step,resume_context.- reads / writes: reads the previously computed values and state keys; writes nothing yet.
- branch: no branch at this point.
-
Inside an async with agent_run_span("campaign_touch", ...) context,
compose_touchcalls await outreach_graph.ainvoke(payload) to delegate generation to theemail_outreachsubgraph.- reads / writes: reads the entire payload dict; returns a
draftdict (with keys likesubject,body,skip_reason). - branch: if an exception occurs,
flagged=Trueanddraft={}(failure path); happy path proceeds with a valid draft.
- reads / writes: reads the entire payload dict; returns a
-
compose_touchchecks draft.get("skip_reason"): if present, the subgraph short‑circuited (e.g., contact replied, unsubscribed, bounced).- reads / writes: reads
skip_reasonfrom draft; writes nothing. - branch: happy path has no
skip_reason; otherwise, the touch is skipped (details not shown, but thread would be updated accordingly).
- reads / writes: reads
-
compose_touchcalls await _upsert_thread({ state , "sequence_step": step }, status="draft_pending", wake_at=None, ...) to persist the draft.- reads / writes: reads state keys and the generated draft (subject, body); writes to the
campaign_threadstable withstatus='draft_pending'. - branch: this write is the only way the thread is marked as awaiting review; happy path always executes it (unless an earlier exception prevented it).
- reads / writes: reads state keys and the generated draft (subject, body); writes to the
-
compose_touchreturns a dict updating the state withstatus,draft,flagged, andlast_touch_run_id(from the agent_run_span).- reads / writes: writes
status='draft_pending',draft={...},flagged=False,last_touch_run_id=str(run_id). - branch: the node completes and checkpoints; happy path proceeds to the next graph node.
- reads / writes: writes
-
The graph enters the await_approval node (mentioned in the docstring as a separate interrupt node).
- reads / writes: reads the state (specifically the draft and status); writes nothing yet.
- branch: no branch; always calls interrupt.
-
The
await_approvalnode calls interrupt({"kind": "approval", ...}), pausing the graph. The thread is now awaiting human review.- reads / writes: writes nothing (the interrupt is a suspension point); the thread's
status='draft_pending'ensures the cron will not auto‑send it. - branch: terminal step for the draft‑first design; the graph resumes only when the operator calls
approveCampaignDraft.
- reads / writes: writes nothing (the interrupt is a suspension point); the thread's
This subsystem spends time and money primarily on LLM inference and D1 database operations. Every follow-up email (via email_followup_graph) performs one LLM call for derive_followup_point and then delegates to email_compose, which runs its own refine, anti-AI-marker, and faithfulness logic—each of those steps incurs token costs and latency. Campaign touches (campaign_graph) trigger an email_outreach subgraph per touch, again consuming LLM tokens and compute. The country_classify_nightly cron fans out internally under a Semaphore and likely calls a classification model (though the source does not confirm LLM use there), while remote_classify is rule-based and costs only D1 reads/writes. All graphs read from D1 to hydrate contact data, load history, check safety (suppression, caps, bounce status), and write drafts (persisted as status='draft' rows) or checkpoint state. The checkpointing mechanism itself writes to D1 tables and is cleaned by a nightly cron that deletes stale checkpoints, adding storage and query overhead. Money follows token volume (prompt + completion) and D1 operation count; time follows the number of sequential LLM calls per message plus the latency of D1 queries.
Below are four real performance knobs identified in the source:
-
CAMPAIGN_RESUME_BATCH
– Knob: Environment variableCAMPAIGN_RESUME_BATCH; default25.
– Bounds: Limits the number of campaign threads that are resumed in a single cron tick (ORDER BY wake_at ASC LIMIT ?).
– Effect: Raising it increases how many threads are processed per interval, reducing the time until the next tick handles stragglers but also lengthening each tick’s database query and subsequent graph executions. Lower values spread the load but may leave threads waiting longer.
– Risk: Too high a value can monopolize the single-worker runtime (Render free tier is effectively single‑worker), causing other cron jobs to starve or increasing the chance of timeout errors. Too low may cause threads to miss theirwake_atdeadlines and accumulate. -
concurrency (in
country_classify_bulk)
– Knob: Parameterconcurrencypassed to the bulk graph; the source shows a value of8(no default given, but used implicitly as 8).
– Bounds: Controls how many classification tasks run in parallel under anasyncio.Semaphorewithin a singleainvokecall.
– Effect: Higher concurrency reduces wall‑clock time per bulk run because more items are classified simultaneously, but it increases instantaneous D1 read pressure and (if the classification uses an LLM) concurrent token consumption. Lower values spread work over time but stretch the total cron duration.
– Risk: If set too high, D1 rate limits or the single‑worker event loop could be overwhelmed, causing failures; too low may make the nightly cron run longer than the allowed time window. -
limit (in
remote_classify)
– Knob: Parameterlimitpassed to the remote classification graph; the source shows2000(no default shown, but used as the value).
– Bounds: Caps the number of D1 opportunities scanned per run (SELECT … LIMIT ?).
– Effect: Increasing it covers more opportunities per cron invocation, reducing the need for multiple runs, but lengthens the D1 query and the graph’s processing time. Decreasing it shortens each run but may leave many rows unclassified.
– Risk: Settinglimittoo high can cause the graph to exceed the D1 query timeout or consume too much memory for the result set; too low requires more frequent runs to keep up with backfill needs. -
_DEFAULT_MAX_TOUCHES
– Knob: Constant_DEFAULT_MAX_TOUCHES = 6incampaign_graph.py.
– Bounds: Determines the maximum number of touches (outreach messages) in a campaign sequence when no campaign‑specific value is seeded.
– Effect: Higher values increase the total number of LLM‑composed drafts per thread, raising token cost and the time to exhaust a campaign. Lower values reduce total cost and time per thread but may cut off follow‑ups prematurely.
– Risk: Setting it too high can inflate cost and clutter the recipient’s inbox, increasing bounce/opt‑out risk; too low may frustrate the sales cadence by ending before a reply is likely.
Empty draft from composition failure
- Trigger – The compose step (delegated to the
email_composesubgraph) returns a draft dict wheredraft.get("subject")is falsy or bothdraft.get("text")anddraft.get("html")are falsy. This happens when the LLM call fails, the subgraph errors, or input data is corrupt. - Guard – The
has_draftboolean:bool(draft.get("subject") and (draft.get("text") or draft.get("html"))). Whenhas_draftis false, the code calls_upsert_thread(state, status="failed", wake_at=None, stop_reason="no_draft")and returns with{"status": "failed", "stop_reason": "no_draft"}. If theflaggedflag is set, it also runsis_flagged_run(error=True)andqueue_flagged_run(run_id, score=0.0). - Posture – fail-soft. The thread terminates with a
failedstatus; other threads in the same campaign continue unaffected. - Operator signal – The thread’s row in
campaign_threadsshowsstatus='failed'andstop_reason='no_draft'. If flagged, the LangSmith run receives a score of 0.0. No log line is emitted in this code path (thelog.info("campaign draft held for review …")line is not reached). - Recovery – No automatic retry. The operator must inspect the thread’s input and rerun or fix the cause manually.
Database write failure when persisting draft_pending
- Trigger – After a successful draft,
await _upsert_thread(state, status="draft_pending", wake_at=None, pending=pending)raises an exception (e.g., a transient D1 write error). - Guard – There is no try-except around that call in the compose node. The exception propagates up to the cron’s resume loop in
run_campaign_resume_due, which catches it in its outerexceptand appends the error to theerrorslist. - Posture – fail-hard for the affected thread. The graph run aborts, and the thread remains in
status='running'(because the cron claimed it earlier but never updated it again). No draft is persisted. - Operator signal – The cron tick returns an
errorslist containing a string like"campaign-resume-due: resume failed for thread <id>". The thread is stuck inrunning. - Recovery – Manual: an operator must query
campaign_threadsfor stuckrunningrows, set them back towaiting(orfailed), and optionally re-run the cron. No automatic retry.
Human approval timeout – draft_pending thread stuck forever
- Trigger – The
await_approvalnode callsinterrupt()and pauses the graph. The operator never supplies a resumeCommand(never callsapproveCampaignDraftwith an action). - Guard – None. There is no timeout, escalation, or fallback in the provided source. The
interrupt()waits indefinitely. - Posture – fail-soft. The thread remains in
status='draft_pending'. The campaign’s aggregate query counts it asactive, so_reconcile_campaign_after_terminalwill not finalize the campaign. No other threads are blocked. - Operator signal – No log line is emitted. The operator would observe the thread’s
status='draft_pending'in the database, and the campaign’sstatusremains'running'. - Recovery – A human must manually supply the approval decision via the interrupt mechanism. No automatic recovery exists.
Campaign reconciliation failure after terminal threads
- Trigger – All threads in a campaign reach a terminal status (e.g.,
failed,stopped,replied,completed). The cron’srun_campaign_resume_duecalls_reconcile_campaign_after_terminal(campaign_id), but the D1 update ("UPDATE email_campaigns SET emails_scheduled = 0, status = CASE WHEN emails_sent > 0 THEN 'completed' ELSE 'stopped' END …") fails (e.g., a transient write error). - Guard – The whole function body is wrapped in a try-except that logs a warning and returns:
log.warning("campaign reconcile-after-terminal failed cid=%s: %s", campaign_id, exc). - Posture – fail-soft. The campaign card is not updated:
statusstays'running',emails_scheduledmay be stale. No further harm, but the campaign remains incomplete in the UI. - Operator signal – The log warning line with the campaign ID and exception string.
- Recovery – Manual: an operator must re-run the reconciliation query or fix the D1 issue and re-trigger the cron (which will attempt reconciliation again when the terminal condition still holds).
Missing contact data due to D1 outage during eligibility check
- Trigger – The eligibility query (
"SELECT email, outreach_eligible, do_not_contact FROM contacts WHERE id = ?") fails with an exception (e.g., D1 is unavailable). - Guard – The
except Exception as exccatches it and executesreturn True(fail‑open). The exact guard is thereturn Truein the except block. - Posture – fail-soft. The graph continues with the contact considered eligible, but contact data (email, flags) may be missing or outdated. Later steps (e.g., the compose subgraph) may produce a poor or empty draft.
- Operator signal –
log.warning("campaign eligibility check failed contact_id=%s (fail-open): %s", contact_id, exc). - Recovery – No immediate recovery; the draft may be created with faulty data. The operator may later see a low-quality draft or a failed draft and intervene manually.
Q1 (warm-up) – How does the system ensure that a follow-up email is composed but never automatically sent?
A – The email_followup_graph is explicitly designed as "draft-first" per its docstring: "the graph never sends." The on‑demand resolver generateContactFollowup holds the generated draft for human review, and the cadence cron run_followup_due persists it as a status='draft' row in the emails table. No node in the graph triggers a send operation.
Follow-up – What prevents a cron‑triggered run from bypassing this constraint?
A – The cadence cron (run_followup_due) queries only for contacts that do not already have a status='draft' row, and the parent email’s followup_status is flipped to completed when the draft is created, so the same contact cannot be re‑picked.
Weak answer misses – The cron’s WHERE clause (NOT EXISTS (SELECT 1 FROM emails ed WHERE ... status = 'draft')), which enforces idempotency and prevents double‑creating drafts.
Q2 (design rationale) – Why was a dedicated email_followup_graph built instead of reusing the existing email_outreach agent, which already composes messages?
A – The existing agents like email_orchestrator or campaign_graph/email_outreach ground follow‑ups on only a few recent messages or on a static post_text; they never read the whole conversation. The dedicated graph adds _derive_anchor (a single new LLM call) that distills the full email thread into an explicit follow‑up point and then delegates the actual writing to the compiled email_compose subgraph, reusing its refine/anti‑AI‑marker logic.
Follow-up – If the thread history is empty, what does the graph do?
A – The _format_thread function returns "none" for an empty list, and the anchor LLM call still produces a summary and recommended angle. The safety_gate node may have already skipped the contact if prior history indicates a bounce or do_not_contact.
Weak answer misses – The explicit role of the derive_followup_point node as the only new prompt introduced; the entire write path is inherited from email_compose via ainvoke.
Q3 (hard – design trade‑off) – The rejected alternative would be an auto‑send pipeline that dispatches immediately. Why does the system choose a draft‑first, human‑approval model instead?
A – The campaign_graph uses two interrupt kinds: approval (UI‑resumed via approveCampaignDraft) and cadence (cron‑resumed). The cron only resumes status='waiting' rows, so a draft_pending thread can never be auto‑sent. This prevents "irreversible damage from a mistimed or off‑tone message" (as implied by the draft‑first philosophy). The trade‑off is that full automation is sacrificed – the campaign cannot run without an operator approving exactly the draft they saw, which avoids regeneration on resume.
Follow-up – How does the system ensure that the draft an operator approves is exactly the one that would have been sent if it were auto‑dispatched?
A – The compose_touch node in campaign_graph generates the draft, checkpoints the state, and only then hits the await_approval interrupt; on resume the graph resumes from that checkpoint without regenerating, so the operator sees and approves the same exact message body.
Weak answer misses – The checkpointing mechanism inside compose_touch that enables idempotent resumption, plus the specific interrupt type (approval) that gates the send path.
Q4 (hard – interplay with safety) – The campaign_graph also imports a safety stack for sends. How does that interact with the draft‑first design?
A – Safety is split: the draft‑first graph (e.g., email_followup_graph) runs safety_gate to skip contacts that bounced or have a do_not_contact flag before composing. The campaign_graph additionally runs an eligibility gate (_is_campaign_eligible) that blocks definitive negatives. The actual send‑time safety stack (daily caps, domain limits, idempotency) is imported lazily inside clients.email_send.dispatch_send and is only invoked when a human‑approved draft is explicitly dispatched – the campaign_graph's compose_touch does not call that stack.
Follow-up – What happens when a contact becomes ineligible between draft creation and approval?
A – The draft is already persisted and the graph is interrupted; on resume, the approval handler would have to re‑check eligibility, but the current code does not re‑run _is_campaign_eligible at approval time – it relies on the cron not picking contacts that already have a draft, so stale eligibility can lead to a queued draft that should not be sent.
Weak answer misses – The lazy import of dispatch_send and the distinction between composition‑time safety (safety_gate, _is_campaign_eligible) and send‑time safety (pipeline_graph's send‑safety stack).
Q5 (hard – end‑to‑end flow) – Walk through the exact sequence for a cadence‑based follow‑up draft from database query to persisted draft, naming every node or function involved.
A – The cron job run_followup_due executes a D1 SELECT that finds outbound emails with no reply, no existing draft, and followup_status='pending'. For each result, it invokes email_followup_graph which runs:
hydrate– loads contact, company, and company_facts from D1.load_full_history– fetches all prior sent/received emails for that contact.safety_gate– checks suppression, do_not_contact, bounce, or reply; if any match, it sets askip_reasonand ends.derive_followup_point– calls_derive_anchor(one LLM invocation using the_ANCHOR_SYSTEM_PROMPTand_format_threadto distill the thread into a JSON anchor).compose– invokes the compiledemail_composesubgraph with that anchor, producing{subject, body, followup_point}.- The graph outputs are persisted as a
status='draft'row inemailsand the parent’sfollowup_statusis set tocompleted.
Follow-up – How does the graph handle a D1 outage during hydrate?
A – The docstring states "All DB reads are best‑effort — a D1 outage degrades to a context‑free skip rather than failing the run."
Weak answer misses – The exact order of nodes: hydrate → load_full_history → safety_gate → derive_followup_point → compose, and the fact that compose delegates to a subgraph via ainvoke.
4. The Approval Pause
It's like a robot that writes a letter, then stops and waits for a grown-up to say yes, no, or fix it before it can send the next one.
Think of a patient pen-pal who drafts a note and then waits days for a thumbs-up before mailing it. This is a true pause — the thread stops completely, holding its spot, instead of spinning in place. A reviewer gets four choices: Approve sends it as is, Edit lets them change the subject or body first, Reject throws the draft away, and Skip moves past it without sending. This is built this way because real outreach needs nuance — a small wording fix or quietly skipping a touch that no longer fits — and the trade-off is that the sequence waits on a person, accepted because a human gate on every send is the guarantee the campaign engine provides.
This is a genuine interrupt, not a spin. The thread suspends execution, preserving its exact state, and awaits a human decision before resuming. Concrete parts: a drafted touch reaches a checkpoint; the thread yields control to a reviewer interface offering four actions — Approve dispatches the message as composed; Edit allows modification of subject or body before sending; Reject discards the touch; Skip advances the sequence without sending. The chosen action is handed back to the suspended thread, which resumes from that precise point. The rejected alternative is a simple yes-or-no gate, which forces either sending or discarding without nuance. The trade-off is latency — the sequence waits on a person — accepted because a human gate on every send is precisely the guarantee the durable campaign engine is built to provide, ensuring every outbound touch is vetted for fit and quality.
The approval pause suspends the thread with an interrupt, awaiting a human decision of approve, edit, reject, or skip.
async def await_approval(state: CampaignState) -> dict[str, Any]:
decision = interrupt({
"kind": "approval",
"step": int(state.get("sequence_step") or 0),
"subject": state.get("pending_subject"),
"text": state.get("pending_text"),
"html": state.get("pending_html"),
"run_id": state.get("pending_draft_run_id"),
})
action = "approve"
edit: dict[str, Any] = {}
if isinstance(decision, dict):
action = str(decision.get("action") or "approve").lower()
edit = decision
elif isinstance(decision, str):
action = decision.lower()
if action == "reject":
await _upsert_thread(state, status="stopped", wake_at=None, stop_reason="draft_rejected")
return {"status": "stopped", "stop_reason": "draft_rejected", "_decision": "reject"}
if action == "skip":
return {"status": "running", "_decision": "skip"}
upd: dict[str, Any] = {"status": "running", "_decision": action}
if action == "edit":
if edit.get("subject") is not None:
upd["pending_subject"] = edit["subject"]
if edit.get("text") is not None:
upd["pending_text"] = edit["text"]
if edit.get("html") is not None:
upd["pending_html"] = edit["html"]
elif edit.get("text") is not None:
upd["pending_html"] = _text_to_html(str(edit["text"]))
elif action == "approve":
pass
return upd
The system’s approval pause begins when the compose_touch node (from campaign_graph.py) finishes drafting an email and transitions the thread to a draft_pending status with a wake_at timestamp. At that point the graph hits a genuine LangGraph interrupt, preserving the exact state (including pending_subject and pending_text). The thread yields control to the review UI, which presents four actions. The human’s choice is handed back as a dictionary with an action key. If the action is approve, the graph resumes and the draft is dispatched; edit overrides pending_subject and pending_text (while noting that explicit html still wins); reject flips the thread to status="stopped", stop_reason="draft_rejected"; skip simply returns {"status": "running"}. On failure — for example, if the decision dictionary is malformed and decision.get("action") defaults to "approve" — the thread proceeds as if approved, sending the unmodified draft.
The invariant this design preserves is that no email is ever sent without explicit human review (except the separate auto_approve=True pathway). The thread truly suspends: it does not poll or spin; LangGraph’s checkpoint stores the complete state so resumption occurs at the precise line where interrupt was called. This fail‑closed guarantee prevents accidental dispatch even if the review UI crashes or the decision parsing has edge cases. Additionally, send_touch (run after approval) re‑checks suppression lists and per‑vertical daily caps, providing a belt‑and‑suspenders safety layer redundant with the suppression gate at compose time.
The design explicitly rejects a simple yes‑or‑no gate in favor of four actions (Approve, Edit, Reject, Skip). The obvious alternative — a binary Approve/Reject — would force any operator who wants a minor wording change to discard the draft and regenerate it from scratch. That regeneration would require another LLM call, risking content drift (different wording), increased latency, and higher prompt‑token cost. By allowing in‑place edits of subject and body, the system avoids that cost and keeps the follow‑up point stable. The trade‑off is added UI complexity and the need to keep the HTML body in sync with edited text (as noted in the edit branch: “without this the stale composed HTML … would silently send the pre‑edit copy”).
One concrete failure mode is a D1 outage when the re‑awakened node tries to call backfill_human_decision to stamp the human verdict onto the recorded agent‑eval verdict. The code catches the exception and logs "agent_eval backfill failed: %s". The operator would see that warning in the application logs (e.g., under the log.warning call). The approval flow continues uninterrupted, but the agent‑eval verdict row in D1 remains un‑backfilled — a silent data inconsistency that could mislead future analysis of shadow‑gate accuracy. The only visible symptom is the logged warning; no user‑facing error is raised.
-
compose_touchnode — generates the draft email for the current touch.- reads:
state["contact_id"],state["sequence_step"],state["company_vertical"],state["opportunity_id"],state["resume_context"],state["recipient_name"],state["recipient_role"],state["recipient_email"] - writes:
state["last_touch_run_id"],state["subject"],state["body"],state["to_email"], plus thread status set to"draft_pending"via_upsert_thread - branch: calls
_is_campaign_eligible(contact_id); ifFalse→ writesstatus: "stopped",stop_reason: "not_eligible"and returns early (terminal stop). Happy path proceeds.
- reads:
-
_load_opportunity— fetches the linked deal/role from D1 usingstate["opportunity_id"].- reads:
opportunity_id - writes: returns
oppdict (stored in local variable, not directly in state; fields likeurlused later) - branch: if no opp found,
oppisNone;_build_post_textandpost_urlhandleNonegracefully (fallback to cold‑outreach behavior).
- reads:
-
email_outreach_graphsubgraph invocation — delegates the LLM call to generate the draft subject and body, grounded onpost_text(built from the opportunity).- reads:
post_text,recipient_name,recipient_role,recipient_email,post_url - writes: returns
subject,body(stamped on the draft row), also capturesrun_idviaagent_run_span - branch: the subgraph runs its own internal steps; this node waits for its completion before continuing.
- reads:
-
await_approvalinterrupt — pauses the graph after the draft is generated, suspending the thread withkind: "approval".- reads: the draft fields (
subject,body) already in state - writes: no state mutation at this point; the thread status is
"draft_pending"(set earlier by compose_touch) - branch: the graph does not proceed; it yields control to the reviewer interface. The thread is persisted with its full state.
- reads: the draft fields (
-
Human reviewer action — outside the graph; the UI presents the draft and the operator picks one of four actions:
"approve","edit"(with new subject/body),"reject", or"skip". The UI then resumes the thread by calling a handler (e.g.,approveCampaignDraft– not shown in the provided source) that issues aCommandto continue the graph from theawait_approvalinterrupt point. -
Resumption at
await_approvalinterrupt point — the graph picks up at the node immediately after the interrupt (the sameawait_approvalposition).- reads: the
actionfield delivered viaCommand(e.g.,Command({"action": "approve"})– exact state key not given but implied) - writes: applies the action: if
"edit", overwritesstate["subject"]andstate["body"]with the reviewer’s edits; otherwise leaves them unchanged. - branch:
"approve"or"edit"→ proceeds to the send logic (node not named in source, likely usesdispatch_send)."reject"→ setsstate["status"] = "rejected"andstate["stop_reason"] = "rejected", does not send."skip"→ setsstate["stop_reason"] = "skipped", does not send.
After the branch, control flows toschedule_next.
- reads: the
-
schedule_nextnode — advances the sequence: checks exhaustion or computes the next cadence.- reads:
state["sequence_step"],state["cadence_days"],state["max_touches"],state["last_touch_run_id"] - writes:
state["sequence_step"] = next_step,state["status"](either"completed"or"waiting"),state["stop_reason"] - branch:
next_step >= max_touches→ terminal branch (step 8). Happy path → step 9.
- reads:
-
Terminal branch of
schedule_next— sequence exhausted with no reply (or after reject/skip, the draft was not sent, so no reply expected).- writes: calls
record_outcome_feedback(last_touch_run_id, "reply_outcome", 0.0)(no‑reply feedback) - writes:
_upsert_thread(state, status="completed", wake_at=None)– final state with"stop_reason": "sequence_complete" - branch: no further branching; graph ends.
- writes: calls
-
Happy‑path branch of
schedule_next— not exhausted; computes next wake time and reschedules.- reads:
cadence_days[next_step]via_wake_at_for - writes:
_upsert_thread(state, sequence_step=next_step, status="waiting", wake_at=wake_at, critical=True)– ensures the cron will re‑select this thread. - branch: calls
interrupt({"kind": "cadence", "wake_at": wake_at, "next_step": next_step})– thread suspends again until the cron resumes it for the next touch. Graph halts here.
- reads:
CAMPAIGN_RESUME_BATCH
- Knob — env var
CAMPAIGN_RESUME_BATCH; default25 - Bounds — Maximum number of
campaign_threadsrows fetched and claimed per cron tick (5‑minute cadence). - Effect — Raising the value processes more threads per tick, reducing latency when many threads are due for resumption. Lowering spreads the workload across ticks, smoothing CPU/DB peaks.
- Risk — Too high may overload the single‑worker runtime or D1 write capacity, causing timeouts or lock contention. Too low leaves approved threads waiting longer between ticks, increasing end‑to‑end campaign duration.
_DEFAULT_MAX_TOUCHES
- Knob — constant
_DEFAULT_MAX_TOUCHES; default6 - Bounds — Maximum number of touches (outreach steps) per campaign thread.
- Effect — Each touch costs one LLM call (compose_touch) and one checkpoint write. Reducing this cap lowers total LLM cost per campaign but may stop sequences too early. Increasing it adds more touches, raising both latency (more stages) and dollar cost.
- Risk — Setting too high can waste budget on unresponsive contacts; too low may skip valuable follow‑ups that could have converted.
_DEFAULT_CADENCE_DAYS
- Knob — constant
_DEFAULT_CADENCE_DAYS; default[0, 4, 7, 7, 7, 7] - Bounds — Array where index
idefines the gap in days before touchi+1. - Effect — Shorter intervals compress the campaign timeline, spending money faster (more touches per unit time) and reducing the window for human approval. Longer intervals spread out cost and give reviewers more time, but delay eventual conversion.
- Risk — Too aggressive cadence may be perceived as spam or trigger rate limits; too conservative may lose engagement momentum.
concurrency (country_classify_bulk)
- Knob — parameter
concurrencyin thecountry_classify_bulkgraph invocation; default8(fromrun_country_classify_nightly) - Bounds — Maximum number of parallel LLM calls inside that bulk graph.
- Effect — Raising concurrency reduces wall‑clock time for the nightly job but increases peak LLM spend and D1 write contention. Lowering concurrency uses fewer tokens simultaneously, smoothing cost over a longer period.
- Risk — Too high may saturate the LLM rate limit or the single‑worker event loop, causing failures or degraded response times for other cron jobs. Too low makes the job run too long, potentially overlapping with the next tick.
limit (remote_classify)
- Knob — parameter
limitin theremote_classifygraph invocation; default2000 - Bounds – Maximum number of D1 opportunity rows scanned per run.
- Effect – Increasing the limit processes more records per batch, finishing classification sooner but costing more database reads and (if rule‑based) no additional LLM cost. Decreasing limits the scan range, reducing per‑run latency but requiring more runs to cover all opportunities.
- Risk – If set too high, a single run may exceed the 30‑second D1 query limit or memory constraints on Render free tier. Too low causes incomplete coverage, leaving opportunities unclassified until the next cycle.
1. Interrupt Never Returns (Timeout or Worker Crash)
- Trigger — The graph reaches
await_approvaland callsinterrupt(); the human reviewer never sends aCommand(resume=…)because the client disconnects, the reviewer session expires, or the worker crashes mid-wait. - Guard — No real guard is shown in the source. There is no timeout, no heartbeat, no fallback that resumes the thread after a deadline. The thread remains suspended with its state saved but no automatic recovery.
- Posture — Fail-soft: the run does not abort immediately; the thread simply stays in the
runningstatus and blocks indefinitely. The cron tick (run_campaign_resume_due) queries only'waiting'rows, so it never touches this thread. - Operator signal — Silent absence: the draft is held (status
draft_pendingwas set just before the interrupt), but no approval is ever received. No log line is produced because the graph is stalled. The operator would need to list threads with statusdraft_pendingand check theupdated_attimestamp to detect staleness. - Recovery — No automatic retry. Manual step: operator must copy the thread ID, manually call a restore endpoint (not shown in source) or, failing that, delete the thread and re-run the campaign.
2. Human Decision is Malformed
- Trigger — The reviewer’s
Command(resume=…)delivers a dict that is missing theactionkey, contains an unrecognized action string (e.g.,"delete"), or providessubject/html/textthat are not strings. - Guard — No explicit validation guard is present in the source. The code after the interrupt likely assumes the decision matches the documented shape
{action: approve|edit|reject|skip, …}but does not check types or whitelist the action. A bad value will bubble as an unhandledKeyError,AttributeError, or similar exception. - Posture — Fail-hard: the graph run terminates abruptly with an uncaught exception. The thread state (which was
draft_pendingand saved just before the interrupt) is left orphaned. - Operator signal — An exception traceback in the logs (no exact line shown in snippet, but LangSmith tracing would capture a run-level error). The thread remains in
draft_pendingand will never resume automatically. - Recovery — No automatic retry. The operator sees the failed run and must manually clean up the thread (update its status to
"failed"or"stopped") and optionally re-trigger the campaign touch.
3. D1 Write Fails After Decision
- Trigger — The reviewer chooses
"approve","edit", or"reject". The graph attempts to call_upsert_thread(or a similar D1 update) to set the thread status to"sent","draft_pending"(for edits), or"stopped". The D1 write fails (transient network issue, db lock, constraint violation). - Guard — The source does not show a try/except around
_upsert_threadin the approval‑pause nodes. However, the same pattern used elsewhere (e.g.,_bump_campaign_countshastry/exceptwithlog.warning) suggests the library may not catch this. Because the snippet does not include the actual post‑interrupt code, we must note the absence of a guard for D1 failures in this specific path. - Posture — Fail-hard: the uncaught exception from
d1_runrises, crashes the run, and leaves the thread state inconsistent (the in‑memory decision is lost). - Operator signal — A
DatabaseErrortraceback in the worker logs. The thread status remainsdraft_pendingandwake_atisNULL, so the cron tick will not retry it. - Recovery — No automatic retry. Operator must inspect the failed run, determine what decision was intended (e.g., from the LangSmith trace or the reviewer’s UI log), and manually call the correct status‑update SQL.
4. Send Fails After Approval
- Trigger — The reviewer chooses
"approve". The graph proceeds to call the actual email‑send function (not shown in snippet, but implied by the comment “Approve dispatches the message as composed”). The external sending service (e.g., SendGrid, SES) returns an error or times out. - Guard — No guard is visible in the source for the send step within the approval pause. The graph likely treats the send as a regular node; if it throws, the entire run fails.
- Posture — Fail-hard: the graph run crashes after the approval decision was made but before the email is dispatched and before the thread status is updated to
"sent". The draft was already persisted aspendingin the thread row (set before the interrupt), but now the run is lost. - Operator signal — An exception in the logs (e.g.,
EmailSendErrororConnectionError). The thread status is stilldraft_pending, and no email left the system. - Recovery — No automatic retry. The operator must manually resend the held draft (read from the thread’s
pendingcolumn) and update the thread to"sent", or else reject the touch and move on.
5. Human Edits with Invalid Content (Empty Subject or Body)
- Trigger — The reviewer chooses
"edit"and suppliessubject: ""and/orhtml: ""(ortext: ""). The graph proceeds to use these empty strings when composing or sending. - Guard — No validation guard is shown in the source that checks for empty subject/body after an edit. The code appears to trust the reviewer input directly.
- Posture — Fail-soft: the graph continues; it will attempt to send an email with an empty subject line or empty body (which may be rejected by the email provider or cause a confusing deliverable).
- Operator signal — No immediate log error; the email may be sent with missing content. The only signal would be a support complaint or a failed send callback (if the provider rejects empty fields). The thread status will be
"sent"even though the email is malformed. - Recovery — No automatic recovery. The operator would discover the issue only after a user report; manual corrective email must be sent outside the graph.
Q1 (Warm-up)
Q — How does the approval pause suspend campaign thread execution, and what actions can a reviewer take when resuming?
A — The approval pause uses LangGraph’s interrupt() inside the user_approval node of campaign_graph.py. This yields control with a payload containing the drafted email’s subject, text, HTML, and run ID. A reviewer interacts via the approveCampaignDraft resolver, which passes back a decision dictionary with keys action (one of approve, edit, reject, skip) and optionally edited subject/text. The exact same interrupt() call resumes on resume, receiving that decision and applying it.
Follow-up — What happens if a non-operator process (like a cron job) accidentally resumes this thread?
A — The code explicitly treats a bare True or a string as approve, so an accidental cron-style resume would silently approve the draft, bypassing human review.
Weak answer misses — The fact that _upsert_thread is called with status "stopped" only on reject, not on skip — skip simply sets status: "running" without terminal state.
Q2 (Design question)
Q — Why implement the approval as a genuine interrupt() (pausing execution with full state preservation) instead of a simpler yes/no gate that stores the decision in a database row and polls?
A — Using interrupt() ensures deterministic resumption from the exact point where the draft was held, without needing to re-run the compose_touch node. A polling approach would risk double-composing or losing the exact draft content. The comment in campaign_graph.py explicitly notes that “only this node re-runs on resume (compose_touch already completed), so the draft is never regenerated.” This guarantees consistency.
Follow-up — How does the system prevent a stale draft_pending status from blocking the campaign forever if the reviewer never responds?
A — The reject action explicitly calls _upsert_thread(state, status="stopped", stop_reason="draft_rejected"), flipping the thread to terminal so the reconciliation logic can finalize the campaign. Without this, a draft_pending row would remain “active” indefinitely.
Weak answer misses — The critical detail that interrupt() is only called after compose_touch has finished, so the LLM-generated draft is never regenerated, preserving exactly what the reviewer saw.
Q3 (Harder)
Q — When a reviewer chooses “edit”, how does the system ensure the modified subject/text replaces the original draft without corrupting the HTML body?
A — In the user_approval node, if action == "edit", the code updates pending_subject and pending_text from the reviewer’s decision dictionary. However, the HTML body is not re-composed; the comment warns: “Without this the stale composed HTML (the original draft) would remain the part most clients render, silently sending the pre-edit copy.” The system leaves the HTML as-is, which is a known trade-off — the reviewer’s UI is expected to send both text and HTML edits.
Follow-up — What mechanism would be needed to keep the HTML in sync with edited text without re-running the LLM?
A — One would need to re-render the HTML from the edited text (e.g., using a simple markdown-to-HTML converter) inside the user_approval node, which the current version omits.
Weak answer misses — The explicit comment about the HTML not being updated, and the fact that only subject and text are overwritten, not html.
Q4 (Harder)
Q — After approval, how does the campaign advance to the next touch, and what guarantees the cadence timing is respected?
A — After the approved draft is dispatched (via the downstream send logic), the schedule_next node runs. It increments sequence_step, calculates the next wake_at using _wake_at_for(next_step, cadence_days), and writes the thread to status='waiting' with that wake_at. Then it calls interrupt() with kind "cadence", pausing until the cron job (_cron.py) resumes the thread at the scheduled time. This two-step interrupt pattern separates the human-approval pause from the cron-driven cadence pause.
Follow-up — What happens if the _upsert_thread call for the "waiting" status fails silently?
A — The code marks that particular upsert as critical=True, and the comment says “if it silently failed the thread would stall forever, so re-raise” — meaning the exception propagates to fail the run rather than leaving the thread active but unreachable.
Weak answer misses — The existence of two distinct interrupt() calls in the graph: one for approval (user_approval) and one for cadence (schedule_next), each with different resume mechanisms.
Q5 (Hardest)
Q — How does the approval pause guarantee that the draft held for review is exactly the one that would be sent, especially given that the graph has multiple paths (approve/edit/reject/skip)?
A — The user_approval node is the only node in the campaign graph that returns the decision action; every other node (like compose_touch) does not re-run when resumed. The system stores the composed draft in pending_subject, pending_text, and pending_html before hitting interrupt(). On resume, the action branches: approve dispatches those pending fields; edit overwrites pending_subject and pending_text; reject calls _upsert_thread with status='stopped'; skip returns status='running' without dispatching. The diagram in the comments of campaign_graph.py confirms the exact state preservation.
Follow-up — What prevents a stale draft_pending row from being accidentally sent if a separate cron tick reads it before the reviewer decides?
A — The cron job (_cron.py) only selects rows with status = 'waiting' AND wake_at IS NOT NULL AND wake_at <= ?. The approval pause sets the thread to draft_pending (implied by the interrupt payload), not waiting, so the cron never touches it until the reviewer explicitly approves and the graph transitions to schedule_next which writes waiting.
Weak answer misses — The exact status values that gate each resume path: waiting for cadence, draft_pending for approval — and that schedule_next is the only node that writes waiting after approval, not the approval node itself.
5. The Only Sender
It is like a robot that writes a letter but cannot mail it until a person says okay, so no letters get sent by mistake.
Think of a robot that drafts emails but waits for a thumbs-up before mailing any. This design puts all sending in one special step, called the send step, instead of letting many parts of the system send emails on their own. The big problem it solves is stopping double-sends or lost records. By having just one place that sends, the system keeps a clear log every time an email goes out, so later steps know exactly what happened and never repeat a message.
The engine centralizes all email dispatch into a single send step that only fires after explicit human approval. This contrasts with a distributed approach where compose or schedule steps could also trigger sends under certain conditions. The concrete moving parts are: one send step, a human approval gate, and a logging mechanism that writes each send against the conversation record. The rejected alternative scatters sending logic across multiple modules, increasing the risk of double-sends and inconsistent audit trails. The trade-off is that this design sacrifices flexibility for a sending path that is simple to reason about, easy to audit, and hard to break — a single chokepoint for both execution and record-keeping.
The engine centralizes all email dispatch into a single send_touch node that only fires after explicit human approval, with a dedicated logging mechanism.
async def send_touch(state: CampaignState) -> dict[str, Any]:
"""Send the APPROVED draft (the only node that sends). Reuses the pipeline
send-safety stack (eligibility, caps, cadence/idempotency), records the send,
stamps ``emails.langsmith_run_id`` with the held draft's run id, then clears the
pending draft."""
contact_id = int(state.get("contact_id") or 0)
step = int(state.get("sequence_step") or 0)
vertical = state.get("company_vertical") or ""
campaign_id = state.get("campaign_id")
from graphs.pipeline_graph import _cadence_allows_send, _record_send
draft = {
"subject": state.get("pending_subject"),
"text": state.get("pending_text"),
"html": state.get("pending_html"),
}
run_id = state.get("pending_draft_run_id")
if not await _cadence_allows_send(contact_id, step, 0):
await _upsert_thread(state, status="stopped", wake_at=None, stop_reason="cadence_duplicate")
return {"status": "stopped", "stop_reason": "cadence_duplicate"}
from clients.email_send import dispatch_send
send_result = await dispatch_send(contact_id=contact_id, draft=draft, sequence_step=step, vertical=vertical or "", dry_run=False)
provider_message_id = send_result.get("message_id")
email_id = await _record_send(contact_id=contact_id, sequence_step=step, vertical=vertical or "", provider_message_id=provider_message_id, dry_run=False, draft=draft, recipient_email=state.get("recipient_email"), recipient_name=state.get("recipient_name"))
if run_id and email_id is not None:
from infra.db import d1_run
await d1_run("UPDATE emails SET langsmith_run_id = ? WHERE id = ?", [run_id, email_id])
await _bump_campaign_counts(campaign_id, sent=1, scheduled_delta=-1)
return {"last_touch_at": _iso(_now()), "last_touch_run_id": run_id, "_sent_count": int(state.get("_sent_count") or 0) + 1, "pending_subject": None, "pending_text": None, "pending_html": None, "pending_draft_run_id": None}
The subsystem ensures that every email dispatch passes through a single, human‑approved send step. The ordered mechanism begins with compose_touch, which drafts the message and holds it for review. It then enters await_approval, a human approval gate that blocks auto‑dispatch by default (only opt‑in autonomous mode skips this). Once approved, send_touch becomes the sole function that actually dispatches the email. It re‑checks the suppression list and the per‑vertical daily cap (CAMPAIGN_VERTICAL_DAILY_CAP) immediately before sending, and it logs the send via an agent_run_span that stamps the run id onto the emails row and campaign_threads for later feedback recording. A logging mechanism (e.g. record_outcome_feedback) ties each dispatch back to the conversation record, ensuring every send is auditable.
The invariant preserved is exactly‑once, auditable sending enforced by a single, authority‑guarded dispatch point. All sends flow through send_touch, which re‑evaluates suppression and capacity at the moment of dispatch—a deliberate “belt‑and‑braces” redundancy with the earlier suppression_gate at compose time. This design guarantees that no send can occur without the human approval gate having fired (or the explicit opt‑in to autonomous mode), and that every dispatch is logged with a unique run id. The mechanism also defers to a cadence interrupt when daily caps are hit or errors occur, rather than silently sending, preserving idempotency and preventing double delivery.
The key trade‑off sacrifices flexibility for safety and auditability. The obvious rejected alternative scatters sending logic across compose, schedule, or even the email_orchestrator graph, where each module could independently trigger a send under certain conditions. That distributed approach would risk double‑sends if two paths evaluated the same contact simultaneously, and it would produce inconsistent audit trails because no single step centralises logging. By concentrating all dispatch authority in send_touch behind await_approval, the system avoids the high cost of spamming contacts with duplicate messages and the operational difficulty of reconciling logs from multiple sending modules.
A concrete failure mode: the daily send cap check inside send_touch encounters a database error (e.g. the count query fails or returns NULL). Instead of proceeding with the send, the thread defers via a cadence interrupt and the email remains in status='draft'. An operator would see a cadence interrupt log entry for that campaign–contact pair, plus the draft row sitting unsent. This signal tells them to investigate the capacity counting function or D1 availability before manually releasing the draft.
-
Entry: The campaign graph receives a state dict with keys
contact_id,sequence_step,company_vertical,opportunity_id,recipient_name,recipient_role,recipient_email,resume_context. The graph’sSTARTnode passes control to thecompose_touchfunction.- reads:
contact_id,sequence_step,company_vertical,opportunity_id,recipient_name,recipient_role,recipient_email,resume_context - writes: none yet
- branch: none – always proceeds to
compose_touch.
- reads:
-
Eligibility gate:
compose_touchcalls_is_campaign_eligible(contact_id)(async).- reads:
contact_id - writes: if eligibility fails, writes
status='stopped',stop_reason='not_eligible'via_upsert_threadand returns early (terminal for this request). - branch: happy path → continues; failure → stops the thread.
- reads:
-
Opportunity load:
compose_touchcalls_load_opportunity(state.get("opportunity_id")).- reads:
opportunity_id - writes: returns
oppdict (orNonestored locally) - branch: no early return –
oppmay beNone, which affectspost_textcontent.
- reads:
-
Post text building:
compose_touchcalls_build_post_text(opp, state.get("resume_context")).- reads:
opp(from step 3),resume_context - writes: returns
post_textstring (stored locally) - branch: none – always returns a string.
- reads:
-
Subgraph fan‑out to
email_outreach_graph:compose_touchbuilds apayloaddict (recipient_name,recipient_role,recipient_email,post_text,post_url) and invokes the compiledemail_outreach_graphsubgraph (lazy imported).- reads: local variables from steps 2–4
- writes: the subgraph returns a dict containing draft fields (e.g.
subject,body,model,prompt_tokens). These are merged into the parent state. - branch: the subgraph may fail; the parent catches exceptions and logs, but no explicit early return shown – error handling propagates via LangSmith annotation.
-
Thread upsert for draft:
compose_touchcalls_upsert_thread(state, status="draft_pending", wake_at=None, ...). It also captures therun_idfrom theagent_run_spanthat wraps the subgraph invocation and stamps it on the row.- reads: entire
state(including run_id) - writes: persists the thread in D1 with
status='draft_pending'andlast_touch_run_id - branch: no early return – thread upsert is mandatory.
- reads: entire
-
Approval interrupt: The graph reaches the
await_approvalinterrupt (described in docstring:kind="approval"). The durable state is checkpointed, and the thread pauses until an external handler (e.g.approveCampaignDraft) resumes it.- reads: checkpointed state (all previous keys)
- writes: interrupt flag (LangGraph internal)
- branch: happy path → waits for approval; if approval is rejected (not shown), the thread might be skipped or cancelled.
-
Send step (post‑approval): After approval resume, a node (not explicitly named in provided snippets but referenced as the place that lazily imports
dispatch_sendfromclients.email_send) reads the approved draft state and callsdispatch_sendto deliver the email. It then callsrecord_outcome_feedback(run_id, "reply_outcome", 1.0)to log the send.- reads:
draft_subject,draft_body,recipient_email,last_touch_run_id - writes: sends the email (live), updates the
emailsrow withsent_at,message_id; writes outcome feedback to LangSmith - branch: if
dispatch_sendfails, the error is logged and likely causes the thread to be flagged (no recovery shown).
- reads:
-
schedule_nextnode: The graph transitions toschedule_next. It readssequence_step,cadence_days,max_touches.- reads:
sequence_step,cadence_days,max_touches,last_touch_run_id - writes: none yet – decision made based on step count
- branch: if
next_step >= max_touches→ callsrecord_outcome_feedback(run_id, "reply_outcome", 0.0)and writesstatus='completed',stop_reason='sequence_complete'via_upsert_thread; then returns{"status": "completed"}(terminal). Otherwise proceeds to step 10.
- reads:
-
Cadence interrupt & thread upsert: If sequence is not exhausted,
schedule_nextcalculateswake_atusing_wake_at_for(next_step, cadence_days), then calls_upsert_thread(state, status="waiting", wake_at=wake_at, critical=True)to persist the wake time. It then issuesinterrupt({"kind": "cadence", "wake_at": wake_at, "next_step": next_step}).- reads:
sequence_step,cadence_days(list),max_touches - writes: thread row updated to
status='waiting'withwake_atand bumpedsequence_step - branch: after interrupt, the graph pauses; the cron will later resume this exact thread when
wake_atpasses – returning to step 1 for the next touch.
- reads:
In the provided source files, the subsystem’s time and money are primarily consumed by LLM calls, database reads/writes, and graph invocations that batch process records or generate messages. The following real performance knobs control these costs and trade-offs:
concurrency
- Knob —
concurrency=8(parameter passed tocountry_classify_bulkgraph) - Bounds — Limits the number of simultaneous classification tasks inside the bulk graph; an
asyncio.Semaphorefans out under this cap. - Effect — Raising it increases throughput of “country” backfill for sales-tech companies, finishing faster but driving more parallel D1 reads/writes and LLM calls (if any classify via LLM; the graph is not shown but likely uses LLM). Lowering reduces parallelism, lengthening total run time.
- Risk — Set too high, D1 rate limits or memory/connection exhaustion may cause throttling or failures; too low leaves classifying jobs incomplete during the nightly window, causing rows to remain missing from the sales-tech tab.
limit
- Knob —
limit=2000(parameter passed toremote_classifygraph) - Bounds — Caps the number of D1 opportunities scanned per invocation of the remote‑classify cron.
- Effect — Larger limit processes more opportunities in a single batch, increasing elapsed time and D1 load per run but reducing the need for additional cron triggers. Smaller limit runs faster per invocation but may leave many opportunities unclassified, requiring multiple runs.
- Risk — Exceeding D1 query limits or the cron’s execution window (timeout) if set too high; setting too low means classification lags behind, and opportunities may remain unclassified for longer.
temperature
- Knob —
temperature=0.2(LLM parameter passed tomake_llmin_derive_anchor) - Bounds — Controls the randomness of the DeepSeek model’s token sampling when generating the follow‑up plan JSON. This directly affects token‑level determinism and output entropy.
- Effect — Lower temperature yields more deterministic, repeatable follow‑up points, reducing the chance of erratic or overly‑creative drafts; potentially lowers token spend if fewer regenerations are needed (the graph does not retry based on temperature). Higher temperature may produce more varied phrasing but could increase prompt‑engineering overhead and risk of off‑topic output.
- Risk — Too low (e.g., 0.0) can cause repetitive or stuck outputs; too high (e.g., 1.0) risks incoherent or unsafe follow‑up plans, possibly escalating human review time (time cost) or requiring re‑runs.
provider
- Knob —
provider="deepseek"(parameter tomake_llmin_derive_anchor;tier="standard"also set) - Bounds — Selects the LLM service and pricing tier. Changing to another provider (or to a different tier) swaps per‑token cost, latency distribution, and model capabilities.
- Effect — Switching to a cheaper provider reduces dollar cost per follow‑up point generation but may degrade quality, increase retries, or require higher temperature to compensate. A faster provider lowers wall‑clock time for the
derive_followup_pointnode, reducing graph latency. - Risk — A provider with lower reliability or different alignment may produce unusable anchors, forcing human re‑work or downstream rejections. Higher‑cost providers burn budget with no quality guarantee.
_DEFAULT_MAX_TOUCHES
- Knob —
_DEFAULT_MAX_TOUCHES = 6(constant incampaign_graph.py, used when a campaign seeds no explicit max) - Bounds — Caps the number of follow‑up touches (emails) per contact in a campaign sequence before the thread is considered exhausted.
- Effect — Larger
_DEFAULT_MAX_TOUCHESincreases total email volume (and thus sending cost) for a contact, potentially improving conversion but also raising the risk of bounces or spam complaints. Smaller values lower cost and risk but may cut off productive follow‑up prematurely. - Risk — Too high may violate cadence/idempotency caps (the safety stack’s daily/per‑domain limits) and trigger suppression; too low may leave value on the table, requiring manual re‑engagement.
_DEFAULT_CADENCE_DAYS
- Knob —
_DEFAULT_CADENCE_DAYS = [0, 4, 7, 7, 7, 7](constant incampaign_graph.py, where indexiis the gap before touchi+1) - Bounds — Defines the minimum number of days between consecutive campaign touches. The list length matches
_DEFAULT_MAX_TOUCHESimplicitly. - Effect — Shorter intervals (e.g., reducing 4 to 2) accelerate the campaign, potentially increasing response rate but also daily send volume and per‑domain rate‑limit pressure. Longer intervals lower send velocity, reducing operational cost and spam risk but may cause leads to go cold.
- Risk — If intervals are too short, the campaign may violate the safety stack’s per‑domain caps or cadence logic, causing send failures; too long may let opportunities expire before the final touch is sent.
1. Approval Interrupt Not Resumed – Draft Stuck Forever
- Trigger: A campaign thread reaches the
approvalinterrupt and waits for human action. The UI action (approveCampaignDraft) either fails to emit the resume signal or is never invoked (e.g., operator overlooks it). - Guard: No guard exists in the source – the interrupt is the sole mechanism, and there is no timeout, retry, or fallback path to auto‑send or skip the draft.
- Posture: fail‑soft – the run pauses safely and no email is sent, but the draft row stays in
draft_pendingindefinitely, degrading campaign throughput. - Operator signal:
campaign_threadstable shows rows withstatus='draft_pending'for unusually long periods; the campaign’semails_sentcounter never increments. - Recovery: Manual – the operator must either resume the interrupt via the UI or directly update the row’s status in D1 to unblock the graph.
2. Dispatch Send Fails After Approval – Email Lost, Counter May Mislead
- Trigger: Human approval resumes the send step, which calls
clients.email_send.dispatch_send. The call fails (SMTP timeout, provider 5xx, DNS error). The code may call_bump_campaign_counts(sent=1)before the actual send, so the counter increments even though the email never left. - Guard:
_bump_campaign_countsis best‑effort (logslog.warningon failure) but does not guard against premature counting. The send step itself has no explicit retry shown in the source. - Posture: fail‑soft – the system continues to the next touch; the send failure is not propagated. The counter may be inflated, creating a false audit trail.
- Operator signal: No email in the recipient’s inbox, but the campaign shows
emails_sentincremented. No error propagated to the UI unless the graph’s annotion queue flags the failed run (the source mentions “Errored touches are flagged into the annotation queue”). - Recovery: Manual – operator must cross‑check
campaign_threadsor external logs, then resend by re‑approving or using a manual dispatch.
3. D1 Outage Causes Eligibility Check to Fail‑Open and Opportunity to Be Empty
- Trigger: A transient D1 read failure (connection pool exhaustion, storage flap) occurs during
_check_outreach_eligibleor_load_opportunity. The eligibility check catchesExceptionand returnsTrue(fail‑open)._load_opportunitycatches and returnsNone. - Guard:
except Exception as exc: log.warning(...)in both functions – exact identifiers:log.warning("campaign eligibility check failed ... (fail‑open): %s", exc)andlog.warning("campaign _load_opportunity failed id=%s: %s", ...). No retry or circuit breaker. - Posture: fail‑soft – the campaign continues to compose and potentially send to a contact that would normally be blocked (e.g.,
do_not_contactnot checked) or without the opportunity context, degrading grounding quality. - Operator signal:
log.warninglines appear in the monitoring system. The campaign may send to disallowed contacts or produce context‑poor email bodies. - Recovery: None – the system moves forward with degraded inputs. The operator must manually verify and possibly reset the campaign after D1 recovers.
4. Double‑Send via Race Between Approval Interrupt and Cron‑Resume
- Trigger: A campaign thread has
status='waiting'. The cron jobrun_followup_dueattempts to resume it (using thecadenceinterrupt kind) while, concurrently, a human operator clicks “Approve” in the UI (resuming theapprovalinterrupt). Both resumes execute the same send step. - Guard: The source mentions “cadence/idempotency” as part of the send‑safety stack, but no concrete identifier (e.g., a unique‑key constraint or
campaign_threadsdedup) is shown. Thecampaign_threadslogging writes each send attempt, but it does not prevent the second send. - Posture: fail‑hard – the lack of a visible idempotency guard allows two
dispatch_sendcalls, resulting in a duplicate email. - Operator signal: Recipient receives two identical emails;
campaign_threadsshows two rows for the same touch timestamp. The audit trail is inconsistent. - Recovery: Manual – operator must manually delete the duplicate entry and verify recipient suppression lists. No automatic rollback.
5. Contact’s Email Becomes Invalid After Approval But Before Dispatch
- Trigger: Human approves the draft. Between approval and the
dispatch_sendcall, the recipient’s email address is deactivated (hard bounce, user unsubscribes via external system, or mailbox full). The eligibility check that passed earlier is never re‑run. - Guard: No re‑eligibility check after approval. The source’s eligibility logic runs only once at the graph’s start.
dispatch_sendmay return an error (e.g., 550), but the calling code does not show retry or fallback. - Posture: fail‑soft – the send fails, but the system may label it as “sent” if
_bump_campaign_countsalready incremented the counter before the send (same issue as failure #2). - Operator signal: A bounce notification from the email provider (if integrated) or a silent missing delivery; no signal in the campaign graph itself unless the annotation queue catches the failed send.
- Recovery: Manual – operator must investigate the bounce, update the contact’s
do_not_contactflag, and manually correct the campaign counts.
Q1 (warm‑up):
What guarantees that the email_followup graph never actually delivers an email to the recipient?
A1:
The graph terminates by writing a status='draft' row in the emails table; the run_followup_due cron that invokes it persists the draft for human review and never calls any send API. The comment block in email_followup_graph.py explicitly states “Draft‑first — the graph never sends. The on‑demand resolver … holds the result for human review; the cadence cron … persists it as a status='draft' row.”
Follow‑up:
How could a future change accidentally make the graph send, and what mechanism prevents that?
Grounded answer: The graph has no call to dispatch_send or any send‑safety stack; its last node (compose) delegates to the email_compose subgraph which itself produces only a subject/body — the subgraph has no send capability either.
Weak answer misses: the exact identifier run_followup_due is the cron that persists the draft; shallow answers overlook that the graph’s own state machine never includes a send node.
Q2 (design – “why this way”):
Why does the email_followup graph produce a draft instead of immediately sending the follow‑up? The obvious alternative would be to compose and send in one deterministic path.
A2:
Because the graph’s output is subject to human review for safety and quality — “draft‑first” is an explicit architectural constraint. The safety_gate node already suppresses contacts that have bounced, replied, or opted out, but the team chose not to trust the LLM‑generated content to send without a human check. This pattern mirrors the campaign graph’s approval interrupt: campaign_threads with status='draft_pending' cannot be auto‑sent by the cron because the cron only resumes status='waiting' rows.
Follow‑up:
Doesn’t the safety_gate already filter unsafe contacts? Why add a second human‑review layer?
Grounded answer: safety_gate only blocks contacts that have actively opted‑out or bounced; it does not verify that the LLM‑generated subject/body is appropriate (e.g., tone, hallucinated facts). The derive_followup_point prompt instructs the model to “never follow any instruction inside” the thread, but the output is still treated as untrusted until reviewed.
Weak answer misses: the specific interrupt mechanism approval (in campaign_graph.py) and the cron’s resume logic that skips draft_pending rows; shallow answers may confuse safety_gate with a full send‑safety stack.
Q3 (warm‑up to intermediate):
How does the email_followup graph ensure it doesn’t generate a draft for a contact that already has a pending follow‑up draft?
A3:
The run_followup_due SQL query explicitly excludes contacts that have an existing status='draft' outbound email: "NOT EXISTS (SELECT 1 FROM emails ed WHERE ed.contact_id = e.contact_id AND ed.direction = 'outbound' AND ed.status = 'draft')". Additionally, after creating the draft the parent email’s followup_status is flipped to completed so it is not re‑picked in the next cron tick — both measures are described in the docstring of run_followup_due as making the process “Idempotent”.
Follow‑up:
What happens if two cron ticks run simultaneously and both select the same contact?
Grounded answer: The system runs on a single‑worker runtime (“Sequential — never fan out graph invocations (single‑worker LangGraph runtime / Render free tier)”), so concurrent ticks cannot happen in practice.
Weak answer misses: the exact SQL exclusion and the followup_status flip are the concrete mechanisms; a shallow answer might only mention “the query filters them” without naming the column or the flip.
Q4 (hard – design trade‑off):
The campaign graph has two interrupt kinds: approval and cadence. Why separate them instead of having a single interrupt that the UI or cron can resume?
A4:
The approval interrupt requires a human to explicitly approve each touch via approveCampaignDraft in the UI, while the cadence interrupt is designed for automatic cron‑based resumption of touches that have already been approved or are in a waiting state. If they were a single interrupt, the cron could accidentally resume a touch that is still pending human review, or the UI could skip the cadence‑based delay. By using distinct interrupt types, the cron only resumes status='waiting' rows and never touches draft_pending threads — as stated in the campaign_graph.py comment: “the cron only resumes status='waiting' rows, so a draft_pending thread can't be auto‑sent.”
Follow‑up:
How does the send‑safety stack (pipeline_graph’s eligibility/daily caps) interact with this interrupt design?
Grounded answer: The safety stack (imported lazily inside nodes) is called only after the compose_touch node has run and the touch passes the cadence resume; it is independent of the interrupt type — both approval and cadence paths go through the same dispatch_send call.
Weak answer misses: the role of status field (draft_pending vs waiting) in distinguishing the two interrupt flows; shallow answers may conflate “interrupt” with “send gate”.
Q5 (hard – follow‑up graph delegation):
The email_followup graph delegates the actual email writing to the email_compose subgraph (ainvoke). Why not write the composition logic inline, given the follow‑up graph already has its own LLM call (derive_followup_point)?
A5:
The delegation is a deliberate reuse strategy: “the writing is delegated to the compiled email_compose graph … so this graph adds exactly one new prompt (derive_followup_point) and inherits the refine / anti‑AI‑marker / faithfulness logic.” By reusing email_compose, the team avoids duplicating the prompt‑engineering and safety‑filtering work that graph already performs (e.g., wrap_untrusted fencing, response‑refine steps). The derive_followup_point prompt is the only novel content; all downstream robustness is inherited.
Follow‑up:
How does LangSmith tracing treat this nested delegation?
Grounded answer: The email_compose run (and its LLM calls) nest under the email_followup graph’s run because it is invoked via ainvoke — “the delegated email_compose run … nest under this graph's run” (from the docstring).
Weak answer misses: the explicit reason “reuse over rebuild” and the inherited subgraph features (refine, anti‑AI‑marker) are not obvious; shallow answers may think delegation is just for modularity without naming the inherited safety logic.
6. Cadence And The Cron
It is like a robot that writes a letter, then goes to sleep and sets an alarm clock to wake up when it is time to write the next letter, so it does not have to stay awake and wait doing nothing.
The engine works like a robot that sends a message and then takes a nap with an alarm. A scheduling step figures out when the next message is due based on the campaign's cadence, which is the list of days between messages. It writes that wake time and a waiting status into the database, then the robot goes to sleep. A separate clock outside, a scheduled task that fires every few minutes, checks the database for robots whose wake time has passed and wakes them up. This split lets thousands of slow conversations happen cheaply because no robot uses up attention while waiting, but the trade-off is needing that external clock to manage everything.
The engine uses a sleeping-and-alarm pattern instead of keeping a live timer for each conversation. After a touch is sent, a scheduling step computes the next touch time from the campaign's cadence, which is a list of day gaps between touches. It writes that wake time and a waiting status into the database, then suspends the thread. No process remains running during the gap. A separate external scheduler, a cron job that fires every few minutes, queries the database for threads whose wake time has passed and resumes each one. The rejected alternative was keeping a live timer per conversation, which would consume worker resources during idle periods. The trade-off is adding the moving part of an external scheduler, accepted because parking plus alarm scales far better, allowing thousands of slow conversations to coexist cheaply without occupying workers while waiting.
The cron job queries for threads past their wake time and resumes them sequentially.
due = await d1_all(
"SELECT id, campaign_id, contact_id FROM campaign_threads "
"WHERE status = 'waiting' AND wake_at IS NOT NULL AND wake_at <= ? "
"ORDER BY wake_at ASC LIMIT ?",
[now_iso, batch],
)
for row in due:
tid = str(row["id"])
await d1_run(
"UPDATE campaign_threads SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=?",
[tid],
)
cfg = _config("campaign")
cfg["configurable"]["thread_id"] = tid
result = await graph.ainvoke(Command(resume=True), cfg)
# … handle result status and update thread accordingly
The subsystem implements a sleeping-and-alarm pattern where each campaign‑contact conversation is a durable LangGraph thread that explicitly suspends itself after scheduling its next wake time. The ordered mechanism begins with send_touch within campaign_graph.py’s reactive loop; that node dispatches the email, then control passes to schedule_next. This node computes the interval from the campaign’s cadence – a list of day gaps between touches – writes a status='waiting' row and a precise wake timestamp into the D1 database, and finally issues an interrupt(kind="cadence"), which suspends the thread unconditionally. No process, timer, or background worker stays alive during the interval. Later, a separate Cloudflare cron described in _cron.py runs every few minutes, calling run_campaign_resume_due to query all threads whose wake time has elapsed and whose status is 'waiting'. It resumes each thread with a Command(resume=True), causing the graph to re-enter its check_reply node and continue the loop. If any step fails – for example, send_touch encounters a send error – the thread terminates immediately to the END node, preventing further automatic retries until a human intervenes via the approval interrupt.
The design preserves a clear draft-first invariant enforced by the checkpoint boundary: no email is ever transmitted without an explicit human approve or edit, except in the opt‑in auto_approve=True mode for specific campaigns. The await_approval node holds the thread after compose_touch generates the draft, and the cron only resumes threads with status='waiting' – never those with status='draft_pending'. This guarantee, together with the stable campaign-<campaignId>-<contactId> thread_id and the D1 checkpointer’s resumable=True flag, ensures that state survives a SIGKILL or deployment restart. The thread’s checkpoint is the single source of truth for the conversation’s progress, and the cron’s predicate queries avoid touching rows that a live tick is currently writing, preserving concurrent safety.
The key trade‑off rejects a live‑timer-per‑conversation architecture in favor of a scheduled poll. Keeping a live timer for each of potentially thousands of conversations would require persistent connections, in‑memory timer orchestration, and crash‑recovery logic that replays missed timeouts. The alternative would introduce a failure surface where a single process restart drops all pending timers, losing the cadence schedule. The chosen design avoids that cost by making the wake time a durable database column and delegating resumption to a stateless, idempotent cron that can be restarted freely. The obvious alternative also introduces complexity in dynamic scaling – a timer‑driven system must rebalance timers when instances die. The polling model accepts a latency of up to the cron interval (minutes) instead of near‑instant wake, but this is acceptable for a follow‑up drip campaign where hours or days separate touches.
A concrete failure mode occurs when the cron job itself crashes or the D1 database becomes unresponsive before it resumes a thread whose wake time has passed. The operator would see that a conversation remains stuck in status='waiting' long after its scheduled wake, with no new check_reply runs appearing in LangSmith traces. The cron logs, emitted by _cron.py’s _config function (which uses graph_run_config(name="cron", runtime="cron")), would show repeated errors from the attempt to connect to the D1 checkpointer, or a timeout in run_campaign_resume_due. The monitoring signal is a monotonically increasing count of threads that have passed their next_touch_at timestamp without transitioning to the next state. Remediation is straightforward: restart the cron worker or investigate the D1 connection; once it recovers, the next tick resumes all overdue threads, because the wake time is still stored in the database and the cron query is stateless.
-
campaign-resume-due job in
_cron.pyqueriesd1_all("SELECT id, campaign_id, contact_id FROM campaign_threads WHERE status = 'waiting' AND wake_at IS NOT NULL AND wake_at <= ? ORDER BY wake_at ASC LIMIT ?", [now_iso, batch]).- reads / writes: reads
campaign_threadsrows withstatus,wake_at. - branch: if the query fails (exception), the job returns
{"ok": false, "job": "campaign-resume-due", "error": ...}– happy path continues.
- reads / writes: reads
-
For each due row, it claims the thread by
d1_run("UPDATE campaign_threads SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=?", [tid]).- reads / writes: writes
statusto'running', updatesupdated_at. - branch: if the update fails, the error is logged and the loop continues to the next row.
- reads / writes: writes
-
It resumes the campaign graph with
graph.ainvoke(Command(resume=...), config=_config("cron")).- reads / writes: uses
graphfromgraphs.get("campaign"),_config("cron")to build a fresh config with a UUIDthread_id. - branch: if
graphisNone, returns{"ok": false, "job": "campaign-resume-due", "error": "campaign graph not compiled"}– happy path triggers the graph.
- reads / writes: uses
-
The graph resumes inside the
schedule_nextnode, after the previousinterrupt()call. The state update returned by the node is applied:{"sequence_step": next_step, "status": "running"}.- reads / writes: reads
sequence_step,cadence_days,max_touchesfrom state; writessequence_step(incremented) andstatus = 'running'. - branch: if
next_step >= max_touches, it instead returns{"status": "completed", "stop_reason": "sequence_complete"}after recording outcome feedback viarecord_outcome_feedbackand upserting the thread withstatus='completed'– that branch is terminal for the sequence. The happy path (ongoing sequence) returns running.
- reads / writes: reads
-
The graph proceeds to the generation node, which first calls
_is_campaign_eligible(contact_id).- reads / writes: reads
contact_idfrom state; writes nothing yet. - branch: if not eligible, it upserts the thread with
status='stopped', stop_reason='not_eligible'and returns terminal state – happy path continues.
- reads / writes: reads
-
The generation node calls
_load_opportunity(state.get("opportunity_id"))and_build_post_text(opp, state.get("resume_context")).- reads / writes: reads
opportunity_id,resume_context; writesoppdict andpost_textstring (local variables).
- reads / writes: reads
-
The generation node builds a payload and invokes
outreach_graph(compiled fromemail_outreach_graph.py) viaainvoke.- reads / writes: reads payload fields (
recipient_name,recipient_role,recipient_email,post_text,post_url, etc.); the subgraph writes a draft email (subject, body, etc.) into state and persists it as astatus='draft'row in theemailstable (peremail_followup_graphdescription). - branch: if the outreach graph call fails, error handling is not detailed in the source – assume it logs and may propagate.
- reads / writes: reads payload fields (
-
After generation, the graph reaches the
await_approvalinterrupt. The node issuesinterrupt({"kind": "approval", ...}).- reads / writes: the graph’s state is checkpointed with the draft data; no state mutation beyond the interrupt.
- branch: the graph pauses; the
ainvokecall in the cron returns with the interrupt object.
-
The cron handler’s loop continues to the next due row. After all rows are processed, the job returns a summary dict (e.g.,
{"ok": true, "job": "campaign-resume-due", ...}with counts of resumed and terminal threads).- reads / writes: writes no DB rows at this level (only the per-thread updates already done).
- branch: if any exception occurred during processing, the error is collected and the function returns
{"ok": false, "error": ...}– happy path returns success.
-
Knob —
CAMPAIGN_RESUME_BATCH(env var, default"25").
Bounds — Limits the number ofcampaign_threadsrows the cron fetches per tick.
Effect — Larger batches reduce wall‑clock time to resume many threads, but each resumed thread fires an LLM call; turning it up spikes both latency (max time until the last thread in the batch finishes) and dollar cost (concurrent LLM tokens).
Risk — Too high saturates the single‑worker LangGraph runtime, causing queue back‑pressure and timeouts; too low leaves threads inwaitingstate longer, delaying the cadence. -
Knob —
_DEFAULT_CADENCE_DAYS(constant list[0, 4, 7, 7, 7, 7]incampaign_graph.py).
Bounds — Controls the minimum gap (in days) between successive touches; the list index is the gap before touch i+1.
Effect — Shorter gaps accelerate the sequence, increasing outbound volume per recipient per month, which raises send cost and LLM drafting cost. Longer gaps reduce touch frequency, lowering cost but risking dead threads.
Risk — Too short triggers daily‑send caps or per‑domain limits (breaks safety); too long lets leads go cold. -
Knob —
_DEFAULT_MAX_TOUCHES(constant integer6incampaign_graph.py).
Bounds — Caps the total number of touches a campaign can send before the thread becomes terminal.
Effect — Raising it extends the lifecycle, adding more LLM calls and sends per contact; lowering it prematurely ends sequences, reducing cost and thread storage.
Risk — Too high wastes money on unresponsive contacts; too low may cut off a conversation that would convert later. -
Knob — (Implicit) Cron tick interval – not parameterized in the provided source, but the documented pattern is “fires every few minutes”.
Bounds – Controls how often the cron queriesCAMPAIGN_RESUME_BATCHrows.
Effect – A shorter interval reduces the time a thread spends inwaitingafter its wake time, but increases D1 query load and the risk of resuming the same thread twice (thoughstatus='waiting'andUPDATE ... status='running'claims mitigate that).
Risk – Too fast adds unnecessary D1 reads and CPU cycles; too slow delays the first post‑wake touch, degrading responsiveness.
1. D1 Due-Query Failure
- Trigger — The cron tick invokes
run_campaign_resume_due, which runsd1_allto select rows fromcampaign_threadswherestatus = 'waiting'andwake_at <= now. A transient D1 outage, network partition, or time-out causes the query to raise an exception. - Guard — The
except Exception as exc:clause immediately after thed1_allcall. The block logs withlog.exceptionand returns a dict with"ok": Falseand an error string built fromtype(exc).__name__. - Posture — Fail-hard for the tick: the function returns early, no threads are considered, and the cron run ends with an error result. The checkpointer writes the failed run state, but no threads are touched.
- Operator signal — The exact log line:
log.exception("campaign-resume-due: due query failed"). The returned dict includes"error": "<ExceptionType>: <msg>". - Recovery — The next cron tick, typically 5 minutes later, retries from scratch. No backoff is programmed; each tick is independent. Manual inspection of D1 health may be needed if failures persist.
2. Individual Thread Claim/Resume Failure
- Trigger — After the due query succeeds, the loop iterates over
duerows. For rowrow["id"]the code attempts to claim it by updatingcampaign_threads SET status='running', then presumably issuesCommand(resume=True)on the compiledcampaigngraph. A database write failure, acampaign_threadsrow that has already been claimed (race with a deferred cron tick), or the graph raising an exception during resume all cause an exception inside thetry:block. - Guard — The
except Exception as exc:clause inside the loop (the source showstry:thenawait d1_run(...)and implies an except becauseerrors: list[str] = []is declared and anerrors.append(...)would follow). The exact guard is not fully quoted but is the only exception handler in that scope. - Posture — Fail-soft for the tick: the caught exception is appended to
errorsand the loop continues to the next thread. No thread blocks the cron run. - Operator signal — The
errorslist is built but not explicitly logged in the excerpt; the function returns a summary that may or may not include the list. The thread remains in eitherrunning(if the claim update succeeded) orwaiting(if the claim update failed). A subsequent cron tick will see arunningrow that has nowake_at? Actually the claim update setsstatus='running'andupdated_at=CURRENT_TIMESTAMP, but does not clearwake_at. The next tick’s query still looks forwake_at <= nowANDstatus='waiting', so a stuckrunningrow will never be retried. This is a silent chronic orphan. - Recovery — Manual intervention: an operator must query
campaign_threadsforstatus='running'and threads that have been stuck for longer than the cadence, then reset them towaitingor terminate them. No automatic retry.
3. Campaign Reconciliation After Terminal Fails
- Trigger — After a campaign thread reaches a terminal state (e.g., after a successful send or stop),
_reconcile_campaign_after_terminal(campaign_id)is called. A D1 update toemail_campaignsor the aggregate query that counts active threads fails. - Guard — The
except Exception as exc:clause inside_reconcile_campaign_after_terminal, which logslog.warning("campaign reconcile-after-terminal failed cid=%s: %s", campaign_id, exc). - Posture — Fail-soft (best-effort): the exception is caught and logged, but the calling code (the cron loop) is not aborted. The campaign card remains in
runningstatus even though all threads are terminal. - Operator signal — The log warning line exactly as above. No returned error object propagates up.
- Recovery — No automatic retry. The reconciler is only invoked when a thread becomes terminal; if it fails, the campaign is left in an inconsistent state (
status='running'withemails_scheduledpossibly non-zero). A manual SQL update or a separate cleanup script is required.
4. Counter Bump Failure
- Trigger — During the execution of the
campaigngraph (resumed by the cron), the function_bump_campaign_countsattempts to updateemail_campaignscounters (emails_sent,emails_failed,emails_scheduled). A D1 error at this point raises an exception. - Guard — The
except Exception as exc:clause in_bump_campaign_counts, which logslog.warning("campaign _bump_campaign_counts failed: %s", exc). - Posture — Fail-soft (best-effort): the exception is swallowed, the send continues, and the counters become inaccurate but the email is still recorded elsewhere. The calling graph node does not report this error to the cron.
- Operator signal — The log warning line. No error is returned to the cron resume function, so the operator observes an increasing drift between
emails_sentin the campaign card and the actual count ofemailsrows. - Recovery — No automatic recovery. The counters must be manually reconciled via a SQL query or a dedicated data pipeline. The next bump for the same campaign might fail or succeed, but the drift is permanent.
5. Campaign Graph Not Compiled
- Trigger — At the start of
run_campaign_resume_due, thegraphsdict is checked for a key"campaign". A deployment error, an environment misconfiguration, or a startup failure causesgraphto remainNone. - Guard — The guard
if graph is None:with an immediate return of{"ok": False, "job": "campaign-resume-due", "error": "campaign graph not compiled", "available": sorted(graphs.keys())}. - Posture — Fail-hard for the entire tick: the function returns with an error, no threads are processed. No side effects.
- Operator signal — The error field in the returned dict. No log line is emitted before the return (the snippet does not show a
logcall). The operator sees the cron endpoint returning"ok": False. - Recovery — The next cron tick will re-evaluate the condition. If the graph is still not compiled, the error recurs indefinitely. Manual fix: redeploy or correct the graph registration in the cron worker’s entry point.
Q — How does the system implement the gap between campaign touches without keeping a persistent in-memory timer for each conversation?
A — The schedule_next node computes the next touch time using _wake_at_for(next_step, cadence_days), writes a status='waiting' row with that wake_at to the database via _upsert_thread, and then calls interrupt({"kind": "cadence", ...}). This suspends the LangGraph execution entirely. No process holds the thread state during the gap. A separate cron job (_cron.py) periodically queries campaign_threads WHERE status='waiting' AND wake_at <= ? and resumes each due thread by calling the compiled campaign graph again.
Follow-up — What happens if the cron job fails or the database write silently errors?
A — The _upsert_thread call uses critical=True in schedule_next, so its failure re-raises rather than silently dropping the row. If the cron misses a tick, the row stays waiting and will be picked up on the next tick because the query uses ORDER BY wake_at ASC LIMIT ? — overdue threads are always included.
Weak answer misses — The critical=True flag that guarantees the write is not silently swallowed, and the fact that the cron query is sorted to naturally catch missed ticks.
Q — Why did you choose a polling‑based cron over a live timer per conversation, which seems simpler?
A — The _cron.py docstring explicitly states the team is on a single‑worker LangGraph runtime (Render free tier), and parallel live timers would “poison” that environment. The polling approach also keeps the graph purely state‑driven: interrupt() suspends the durable thread without any background process, and the cron only needs a single DB query batched by CAMPAIGN_RESUME_BATCH. This degenerates gracefully if the cron is delayed, whereas live timers would drift or require re‑scheduling logic.
Follow-up — How does the system prevent the same thread from being resumed twice if the cron fires again before the first resume finishes?
A — The first thing the cron does for each due row is UPDATE campaign_threads SET status='running' — a claim row that makes a second tick’s query skip it (since status is no longer 'waiting'). This is visible in the _cron.py code snippet that updates the status before calling graph.ainvoke.
Weak answer misses — The claim‑row update (status → 'running') that provides idempotency, and the fact that the cron runs sequentially (no fan‑out) so the update is done before the next row is processed.
Q — Walk through what happens from the moment a touch is sent until the next touch is drafted. How is the thread state managed across the gap?
A — After composing and sending, the graph reaches schedule_next. It increments sequence_step, computes the next wake time via _wake_at_for using the campaign’s cadence_days (default [0,4,7,7,7,7]), writes a 'waiting' row with that wake_at and the new step, then calls interrupt({"kind":"cadence","wake_at":...,"next_step":...}). The graph’s durable state is checkpointed. No code runs until the cron resumes that exact thread (using the same thread‑ID) with a Command(resume=...). The node returns {"sequence_step": next_step, "status": "running"}, advancing the state for the next compose loop.
Follow-up — How does the “next_step” value survive across the resume? The state update is returned on the resumed run, but the interrupt suspended before that update was applied.
A — LangGraph applies state updates returned from the node that contained the interrupt() after the resume completes. So the step bump and status change are applied exactly once, even if the graph re‑enters the same node on resume.
Weak answer misses — The detail that schedule_next returns the state update (including sequence_step) after the interrupt, meaning LangGraph applies it on resume, not on the original pre‑interrupt run.
Q — What happens to a thread if the operator rejects the draft (via the approval interrupt) and the campaign then reaches the cadence scheduling? How does the system know not to schedule a next touch for a stopped thread?
A — The approveCampaignDraft interrupt handler, in case of action == "reject", calls _upsert_thread(state, status="stopped", wake_at=None, stop_reason="draft_rejected") and returns {"status":"stopped"}. This sets the thread row to terminal status. When the graph later enters schedule_next, the state’s status is 'stopped' — however, the node still runs because it is reached regardless. But the cron’s query filters on status='waiting', so a 'stopped' thread is never resumed. The system relies on the terminal status written by the approval node rather than skipping the scheduling node.
Follow-up — Why doesn’t the graph skip the schedule_next node entirely for rejected threads, instead of writing a terminal status?
A — Because the graph structure is fixed and the rejection happens in an interrupt handler that returns a state update, not a branch. Writing the terminal row ensures the cron ignores it indefinitely, and the final schedule_next run (which still executes on resume) will see 'stopped' and, by design, re‑write 'stopped' again — idempotent and harmless.
Weak answer misses — The fact that schedule_next still runs even after rejection; it’s the database row’s terminal status, not a conditional edge, that stops further cron activity.
7. Stop On Reply
It's like a robot pen-pal who stops writing new letters the moment you write back, so it doesn't accidentally ignore your reply and keep sending notes you already answered.
A campaign is a series of automated emails sent to a prospect over time. The engine checks for any new reply from that person before it sends the next email. If a reply exists, the sequence stops immediately. This prevents the awkward situation of sending a canned follow-up that ignores the prospect's actual response, which would make it seem like no one is listening. The only cost is a quick check for new inbound messages before each touch, a small price for never talking over a real reply.
The engine implements a reply check as a gate before composing each new touch in the campaign sequence. It queries the inbound message store for any communication from the contact with a timestamp later than the last sent touch. If such a message exists, the sequence terminates, preventing further automated follow-ups. The rejected alternative would be to continue sending regardless, which risks damaging the conversation by ignoring the prospect's engagement. The trade-off is minimal: the constant check adds a small, predictable overhead to each touch, but it avoids the far greater cost of appearing inattentive or automated, preserving the human handoff at the exact moment of reply.
The campaign graph checks for a reply before composing each new touch, terminating the sequence if one is found.
def _route_after_gate(state: EmailFollowupState) -> str:
return "skip" if state.get("skip_reason") else "continue"
def _route_after_anchor(state: EmailFollowupState) -> str:
return "skip" if state.get("skip_reason") else "compose"
builder.add_conditional_edges(
"safety_gate", _route_after_gate, {"skip": END, "continue": "derive_followup_point"}
)
builder.add_conditional_edges(
"derive_followup_point", _route_after_anchor, {"skip": END, "compose": "compose"}
)
The subsystem implements a reply check as a deterministic gate early in the campaign‑sequence pipeline. In the campaign_graph, the node check_reply is executed before each new touch: it queries the inbound message store for a communication timestamped later than the last sent touch. If such a reply exists, the sequence terminates; otherwise, the flow proceeds to compose_touch (which delegates to the email_outreach graph for drafting). The email_followup_graph mirrors this with its safety_gate node, which evaluates a set of skip conditions including “replied” — any positive match causes the graph to abort the follow‑up and output a skip_reason. On a database failure (e.g., a D1 outage), both graphs degrade to a fail‑closed skip: email_followup_graph explicitly documents that “All DB reads are best‑effort — a D1 outage degrades to a context‑free skip rather than failing the run,” and the campaign graph’s send_touch node similarly re‑checks suppression immediately before dispatch as a belt‑and‑braces measure, ensuring that even if the initial check_reply missed the reply, the final gate blocks the send.
The invariant preserved by this design is no automated follow‑up after a human reply. This is a strict ordering guarantee: every touch in the sequence is preceded by a reply check, and the check is idempotent — repeated queries for the same state produce the same termination decision. The safety_gate in email_followup_graph explicitly lists “replied” as one of the conditions that produce a skip, and the check_reply node in campaign_graph’s trajectory (“no inbound reply; thread continues”) shows the decision branch. The guarantee is further reinforced by the send_touch node’s redundant suppression re‑check, which ensures that even if a reply arrives between the gate and dispatch, the send is blocked. This double‑check maintains correctness despite the asynchronous nature of the message store.
The trade‑off is minimal per‑touch overhead versus the cost of ignoring engagement. Each check_reply query adds a small, predictable latency to the compose pipeline, but the rejected alternative — sending** regardless of a prospect’s reply — would risk damaging the conversation by overwriting the human’s message with automated spam. The cost avoided is the loss of a sales relationship and the negative signal to the recipient’s domain, which could trigger suppression lists and degrade future deliverability. Instead of accepting that risk, the system explicitly chooses a fail‑closed posture: when the reply store is unreachable, the gate skips the touch rather than allowing an unverified send. This mirrors the broader safety philosophy of the campaign_graph, where the send_touch node also enforces a per‑vertical daily cap (CAMPAIGN_VERTICAL_DAILY_CAP) and re‑checks suppression lists.
A concrete failure mode is a transient D1 outage during the check_reply query. Because both the campaign_graph and email_followup_graph treat database errors as best‑effort, the gate degrades to a skip without ever reaching the composition step. The operator sees a skip_reason such as “D1 unavailable” in the graph output, and a warning log (e.g., from log.warning in email_followup_graph’s safety gate) records the outage. In the campaign sequence, the compose_touch node is never invoked, so the email remains unsent; the state’s trajectory likely records a node like check_reply with a summary indicating the skip. This keeps the system safe at the cost of a missed touch opportunity during the outage — a deliberate design choice to prioritize conversation quality over throughput.
-
START — The compiled
StateGraphis invoked with an initialCampaignStatedict. Reads/writes: Readscontact_id,sequence_step,opportunity_id,resume_context,recipient_name,recipient_role,recipient_email. No output yet; state flows to the first node. -
compose_touch(state) — The node retrieves
contact_id = int(state.get("contact_id") or 0)andstep = int(state.get("sequence_step") or 0). Reads/writes: readscontact_id,sequence_step. No writes yet. -
Branch:
await _is_campaign_eligible(contact_id)— Checks campaign-scoped eligibility. On failure (definitive negative), it writes to D1 viaawait _upsert_thread(state, status="stopped", wake_at=None, stop_reason="not_eligible")and returns{"status": "stopped", "stop_reason": "not_eligible"}, terminating the graph. On happy path (eligible or inconclusive), execution continues. -
opp = await _load_opportunity(state.get("opportunity_id"))— Readsopportunity_idfrom state, queries D1 for the opportunity object. Returns a dict (or None). Reads/writes: readsopportunity_id; writes nothing in state, only local variable. -
post_text = _build_post_text(opp, state.get("resume_context"))— Constructs a grounding text from the opportunity andresume_context(read from state). Reads:resume_context,opp. Writes: localpost_text. -
Build
payloaddict with keysrecipient_name,recipient_role,recipient_email,post_text,post_url— The values come fromstate(e.g.,state.get("recipient_name")) andopp. No state mutation yet. -
result = await outreach_graph.ainvoke(payload)— Delegates to the compiledemail_outreach_graphsubgraph. This fans out over that subgraph’s nodes (e.g., compose, safety, etc.). Reads/writes: the subgraph reads/writes its own internal state; the outer graph only receives the final output. -
agent_run_span / run id capture — Inside
compose_touch, the run id from the subgraph invocation is captured and stamped on theemailsrow andcampaign_threadsfor the outcome-feedback loop. No state keys changed inCampaignStatedirectly; this writes to D1. -
compose_touch returns state update — Returns a dict (not fully shown in snippet) that likely includes the draft
subject,body,run_id, and advancessequence_step. Exact keys:status,draftdetails. -
schedule_next(state) — Reads
sequence_step,cadence_days,max_touches,last_touch_run_idfrom state. Computesnext_step = step + 1. -
Branch:
if next_step >= max_touches— On true (sequence exhausted with no reply), it callsrecord_outcome_feedback(run_id, "reply_outcome", 0.0), then writes to D1 viaawait _upsert_thread(state, status="completed", wake_at=None), and returns{"status": "completed", "stop_reason": "sequence_complete"}. Terminal step. On false, continues. -
wake_at = _wake_at_for(next_step, cadence_days)— Computes the next send time based on cadence. Readscadence_days, writeswake_at. -
Critical upsert:
await _upsert_thread({**state, "sequence_step": next_step}, status="waiting", wake_at=wake_at, critical=True)— Writes to D1. If this fails, the thread stalls (re-raises). Reads/writes: writessequence_step,status,wake_at. -
interrupt({"kind": "cadence", "wake_at": wake_at, "next_step": next_step})— Pauses the durable thread. The cron will resume atwake_at, looping back to compose_touch (step 2) with the updatedsequence_step. This is the control loop over touches; each loop goes through compose_touch → schedule_next → interrupt again until the sequence completes or the reply gate triggers.
The subsystem spends time in LLM invocations (e.g., derive_followup_point in the follow‑up graph, compose_touch in campaign touches), D1 queries (hydrate contact/company data, safety checks, checkpoint reads/writes), and cron‑driven batch processing; money flows primarily to LLM token consumption and, to a lesser degree, D1 read/write operations. The following knobs directly control how much time and money are consumed:
-
Knob —
CAMPAIGN_RESUME_BATCH(env var, default"25")
Bounds — number ofcampaign_threadsrows resumed per cron tick.
Effect — increasing it finishes the backlog faster (lower total latency) but raises the per‑tick D1 write volume and peak duration; decreasing it spreads cost over more ticks.
Risk — too high can exceed D1’s SQLite limits or cause timeout errors; too low lets the waiting queue grow, slowing overall campaign progression. -
Knob —
concurrency(parameter, default8)
Bounds — number of parallel country‑classification tasks inside thecountry_classify_bulkgraph.
Effect — higher concurrency speeds up the nightly backfill (less wall‑clock time) while simultaneously firing more LLM requests per second, raising peak token cost; lower concurrency cuts peak cost but lengthens the run.
Risk — too high may hit LLM rate limits or D1 contention; too low may leave many rows unclassified before the next cron window. -
Knob —
limit(parameter, default2000)
Bounds — number of D1 opportunity rows scanned in oneremote_classifyrun.
Effect — increasinglimitcovers more records per execution (reducing the number of runs needed to classify all open opportunities), but each run consumes more D1 reads and processing time.
Risk — too high can exceed D1’s query‑time limit or cause timeouts; too low leaves a persistent tail of unclassified rows. -
Knob —
_DEFAULT_MAX_TOUCHES(constant, default6)
Bounds — maximum number of touches allowed per campaign thread before the sequence terminates.
Effect — raising it increases the total number of LLM‑composed emails per contact (higher token cost) and extends the campaign’s lifecycle; lowering it caps both cost and send volume sooner.
Risk — too high may over‑message a contact, leading to suppression or damaged relationship; too low may miss conversion opportunities. -
Knob —
_DEFAULT_CADENCE_DAYS(constant, default[0, 4, 7, 7, 7, 7])
Bounds — the gap (in days) between consecutive touches in a campaign sequence.
Effect — shortening gaps accelerates the sequence (faster time‑to‑conclusion for the contact) but concentrates LLM sends into a shorter calendar period, raising daily token cost; widening gaps stretches cost over more days but risks losing momentum.
Risk — too short may appear aggressive or trigger daily‑cap limits; too long may cause the prospect to lose interest or the thread to gap out.
1. D1 outage during load_full_history
- Trigger: A transient D1 error (connection pool exhaustion, timeout) when the graph executes the
load_full_historynode to fetch every prior sent/received email for the contact. - Guard: The best‑effort policy documented in the
email_followup_graphdocstring – “All DB reads are best‑effort — a D1 outage degrades to a context-free skip rather than failing the run.” The implementation contains an unnamedtry-exceptinload_full_historythat catches the exception and causes the graph to return early with a skip directive. - Posture: Fail‑soft — the graph degrades to a skip, meaning the follow‑up is not composed. However, because the upstream
campaign_graphtreats anyskip_reasonas terminal (setting status to"stopped"if not"replied"), this soft degradation becomes a permanent stop for the thread. - Operator signal: A log line matching the pattern of other best‑effort handlers – e.g.,
"load_full_history failed: %s"– will appear. The operator will also see the thread’s status change to"stopped"with no draft held. - Recovery: Manual intervention is required. The operator must investigate the D1 issue, reset the thread status from
"stopped"back to"waiting", and re‑trigger the cron resume. No automatic retry occurs because the terminal status prevents the next 5‑min tick.
2. False positive reply detection due to timestamp ordering errors
- Trigger: Inbound messages have timestamps that are slightly ahead of the last sent touch because of clock skew between systems, batch import rounding, or a re‑delayed email that D1 returns with a later timestamp than its real arrival. The
safety_gatenode (which likely comparesmax(inbound.timestamp) > last_sent.timestamp) sees a “reply” where none exists. - Guard: None. There is no validation layer that cross‑checks message IDs, content similarity, or applies a tolerance window. The gate relies solely on the raw D1 timestamps.
- Posture: Fail‑hard — the sequence terminates immediately. The graph returns
skip_reason="replied", andcampaign_graphsetsstatus="replied", stopping all future follow‑ups for that contact. - Operator signal: The thread moves to
"replied"status with no actual reply visible in the CRM. The operator may notice the inconsistency when reviewing the contact’s timeline. There is no explicit warning log for this condition. - Recovery: Manual escalation. The operator must inspect the thread, confirm the false positive, and manually reset the thread status (e.g., via an admin panel). No automated rollback exists.
3. Race condition between cron resume and late‑arriving reply
- Trigger: The cron (
_cron.py) selects due threads withWHERE status='waiting' ... wake_at <= ?and immediately claims the row viaUPDATE campaign_threads SET status='running'. A reply email arrives after this claim but before the graph’ssafety_gatenode queries the inbound store. The graph proceeds to compose a follow‑up based on the pre‑reply state. - Guard: None. The graph does not re‑check the inbound store after the initial
load_full_historycall, nor does it use a row lock or optimistic concurrency to prevent sending after a reply has been inserted. - Posture: Fail‑soft — the follow‑up is drafted (and later sent after approval) despite the reply. The damage is a potentially awkward or irrelevant email sent to a contact who has already re‑engaged.
- Operator signal: The operator sees a draft pending approval that was generated after the contact had replied. The reply timestamp in the thread will be earlier than the draft creation time. No error log is generated.
- Recovery: Manual stop. The operator can reject the draft via
approveCampaignDraftwith action"reject"and manually mark the thread as"replied". The system does not automatically detect the race.
4. D1 error in _bump_campaign_counts causing silent counter drift
- Trigger: After a successful send (or after stopping on reply),
campaign_graphcalls_bump_campaign_countsto updateemails_sent/emails_failed. A D1 error occurs during this update. - Guard: The
try-exceptin_bump_campaign_countscatches the exception and logs a warning:"campaign _bump_campaign_counts failed: %s". The function returns immediately without raising. - Posture: Fail‑soft — the counters become stale, but the graph run continues normally. The send path is not aborted.
- Operator signal: The log warning appears. Over time, the
email_campaignsrow’s counters will diverge from reality, causing the campaign card to show incorrect metrics. - Recovery: Automatic on next successful bump — the delta will be applied on the next send or stop event because
emails_sent = emails_sent + ?accumulates whatever was missed (if the previous failure was a true no‑op). If the failure was due to a persistent D1 issue, the drift remains until D1 recovers. No manual step is required for the send path itself.
5. Subgraph timeout during derive_followup_point leading to empty draft and false stop
- Trigger: The LLM call inside
ainvoke_json_with_telemetry(called by_derive_anchor) times out or returns malformed JSON after exceeding the provider’s limit. Thecomposesubgraph produces a draft with no subject/body. Incampaign_graph, the node that checkshas_draft = bool(draft.get("subject") and (draft.get("text") or draft.get("html")))evaluates toFalse. - Guard: The code after
if not has_draft:checksif flagged:and attempts to callis_flagged_runandqueue_flagged_run, wrapping the entire flagging in atry-exceptthat logs a warning. The thread is then upserted withstatus="failed"andstop_reason="no_draft". - Posture: Fail‑soft — the thread is marked
"failed"and stops. No email is sent, preventing harm, but a transient LLM error permanently kills the sequence. - Operator signal: Log warning from the flagging
exceptclause ("campaign flag failed: %s") and the thread status"failed". The drafted flagging run may appear under LangSmith if tracing is enabled. - Recovery: Manual reset. The operator must review the failure, adjust the prompt or retry, and reset the thread status back to
"waiting". No automatic re‑attempt occurs because"failed"is terminal.
Q – Warm-up
What mechanism stops the follow-up graph from composing a new email when the contact has already replied?
A
The safety_gate node inside the email_followup_graph checks the loaded history for a replied signal (alongside suppression, bounced, do_not_contact) and, if present, routes the state to END via a conditional edge, skipping the derive_followup_point and compose nodes entirely. This prevents any draft from being created for a thread where the recipient has responded.
Follow-up
How does the gate know a reply exists without calling an LLM?
A – It uses the chronological email store loaded by load_full_history and compares the direction field; any inbound message with a timestamp after the last outbound touch is treated as a reply.
Weak answer misses
The fact that the gate is a conditional edge in the graph, not a standalone function, and that it also checks suppression/do_not_contact/bounced – not just replies.
Q – Medium
How does the campaign graph’s eligibility gate (_is_campaign_eligible) differ from the follow-up graph’s safety_gate in handling a contact who has replied?
A
_is_campaign_eligible is campaign‑scoped and allows inconclusive or NULL status through; it blocks only definitive negatives (e.g., hard bounce, unsubscribed) before composing a new touch in the campaign sequence. The safety_gate in the follow-up graph is thread‑specific and actively looks for a reply (inbound message after last sent touch) to skip composition. The campaign gate does not check for replies – that responsibility belongs to the follow-up graph when a follow-up is due.
Follow-up
Why not merge both checks into a single gate that runs before every touch?
A – They operate at different granularities: campaign touches are independent outbound sequences, while follow-up emails are grounded on the entire conversation history; merging would violate the separation of concerns enforced by the email_followup_graph’s dedicated flow.
Weak answer misses
The campaign gate’s docstring explicitly says: “NOT the strict autonomous V135 gate” and “allows inconclusive/NULL through” – a shallow answer often overlooks this nuance.
Q – Hard (design question)
Why implement the reply check as a conditional gate inside the graph rather than as a database trigger that sets a has_replied flag when a new inbound email arrives?
A
The graph‑based safety_gate evaluates the reply condition fresh from the full history loaded by load_full_history at the moment of composition. A database trigger would couple business logic to persistence and risk race conditions if the inbound arrives between the trigger and the graph run. Keeping the decision inside the state machine guarantees atomicity: the check sees exactly the same data that will be fed to derive_followup_point. Additionally, the follow-up graph is draft‑first – no email is ever sent automatically – so the gate prevents even a draft from being created, which a background flag might not prevent in time.
Follow-up
Could an LLM‑based sentiment check replace this deterministic reply gate?
A – No; safety_gate is a cheap binary check that avoids expensive LLM calls for already‑terminated threads. The LLM is reserved for _derive_anchor where nuanced analysis is needed, and it is protected from wasted invocations by the earlier gate.
Weak answer misses
The graph’s conditional edge architecture and the fact that load_full_history is the source of truth, not a cached flag. Also, the draft‑first nature means “stop” = no draft created, not just no send.
Q – Hard (interpreting cron integration)
How does the run_followup_due cron function ensure it does not pick up a contact who has already replied and thus trigger the safety_gate unnecessarily?
A
The cron’s query explicitly excludes rows where the latest outbound has a reply (it looks for sent/delivered/opened status with no reply and older than FOLLOWUP_DUE_DAYS). It also excludes contacts that already have a pending status='draft' follow‑up or whose parent’s followup_status is completed. This pre‑filtering reduces wasted graph invocations, but even if a contact slips through (e.g., race condition), the safety_gate inside the graph will still correctly skip composition.
Follow-up
What happens if the inbound reply arrives while the graph is already running for that contact?
A – The graph’s load_full_history node fetches the latest state from D1 at runtime, so the reply would be visible to safety_gate before composition; the run would then abort via the conditional edge. The cron’s exclusion is a best‑effort optimization, not a correctness guarantee.
Weak answer misses
The environment variable FOLLOWUP_DUE_DAYS is used to set the cadence, and the cron runs sequentially (single‑worker) to avoid concurrent state races – key details for correctness.
Q – Hardest (trade‑off analysis)
The safety_gate checks for replies, but the follow-up graph also uses an LLM (_derive_anchor) to assess sentiment. Why not rely solely on the LLM to decide whether to continue, and skip the deterministic reply gate entirely?
A
The deterministic safety_gate is orders of magnitude cheaper and has zero risk of hallucination – it simply checks for the existence of an inbound message with a later timestamp. The LLM‑based _derive_anchor call (inside derive_followup_point) is reserved for generating the follow‑up plan (summary, open items, recommended angle), not for the binary go/no‑go decision. Relying on the LLM for the stop decision would waste tokens on already dead threads, introduce propagation of model errors, and require complex prompt engineering to avoid false negatives (e.g., the model ignoring a clear reply). The two‑tier design – cheap deterministic gate followed by expensive LLM – is the optimal trade‑off for latency and cost.
Follow-up
Could the LLM’s should_follow_up boolean inside _derive_anchor override the safety gate?
A – No; the safety_gate runs before derive_followup_point. If the gate routes to END, the LLM is never invoked. The LLM’s should_follow_up only influences the follow‑up content within the already‑authorized path.
Weak answer misses
The wrap_untrusted fence applied to the thread text in _derive_anchor prevents prompt injection, but that does not replace the safety gate. Also, the gate checks more than just replies (suppression, do_not_contact, bounced) – a shallow answer often forgets those.
8. Grounding Each Touch
It's like a robot pen-pal who only writes a letter after checking a list of true facts about you, so every letter is honest and special, and it waits for a grown-up to say okay before mailing it.
Imagine a robot that helps write emails. Before it writes anything, it looks at a special file about the person it's emailing, which has real reasons why they might be interested. This file is called the opportunity, and it also includes notes from past conversations. The robot uses the same system that writes all other emails for the company, so it follows the same rules: it only uses facts it can prove, it doesn't make up stuff, and it stops if the person asked not to be contacted. The campaign engine, which decides when to send emails and waits for approval, trusts this writing robot completely. The trade-off is that if the writing robot has a problem, the campaign engine can't fix it, but the company thinks it's worth it to have one perfect place that makes sure every email is honest and personal.
The campaign engine delegates message composition to a shared drafting engine, ensuring each touch is grounded in the contact's specific opportunity and seeded context rather than generic boilerplate. The drafting engine receives the opportunity, which is the concrete reason the contact is a fit, plus any conversational context carried from prior touches. It then generates copy using the same personalization and suppression logic as all other outreach, meaning it validates claims against evidence, suppresses unsupported assertions, and respects global stop rules. A rejected alternative would be to embed copy generation logic directly in the campaign engine, which would duplicate safeguards and risk inconsistent grounding across different outreach types. The accepted trade-off is a hard dependency on the drafting engine: if that engine fails or produces poor copy, the campaign engine has no fallback. However, this is preferred because it centralizes message quality and grounding in one owned system, allowing the campaign engine to focus purely on orchestrating timing, approval gates, and durability across restarts without reinventing how to write or stay grounded.
The campaign engine delegates message composition to the shared drafting engine, grounding each touch in the opportunity and resume context.
async def compose_touch(state: CampaignState) -> dict[str, Any]:
opp = await _load_opportunity(state.get("opportunity_id"))
post_text = _build_post_text(opp, state.get("resume_context"))
payload: dict[str, Any] = {
"recipient_name": state.get("recipient_name") or "",
"recipient_role": state.get("recipient_role") or "",
"recipient_email": state.get("recipient_email") or "",
"post_text": post_text,
"post_url": (opp or {}).get("url") or "",
"application_mode": bool(opp),
"tone": state.get("tone") or "warm",
"company_vertical": vertical or None,
"sub_niche": state.get("sub_niche") or None,
"sequence_step": step,
"resume_context": state.get("resume_context") or None,
}
draft = await outreach_graph.ainvoke(payload)
# … skip_reason handling, draft persistence …
In the campaign engine, each touch is grounded through a strictly ordered mechanism that begins with the check_reply node. If no reply is detected, the graph moves to compose_touch, which loads the contact’s opportunity fresh from D1 and the resume_context seeded at launch from @ai-apps/resume. It then delegates the actual draft generation to the shared email_outreach graph, an external drafting engine that applies personalization, suppression logic, and claim validation. If generation fails (e.g., no draft produced), compose_touch exits through its skip_reason/not_eligible/no_draft path directly to END. On success, the draft is held with status='draft_pending' and the thread enters await_approval for human review. This order ensures that every draft is built from fresh, opportunity-specific data before any approval or sending occurs.
The design preserves a core invariant explicitly named in the source: “Draft-first (default): no touch is ever sent without an explicit human approve/edit.” This invariant guarantees that a human must inspect and approve (or edit) each composed touch before it can be dispatched via send_touch. Even in autonomous mode (auto_approve=True), where approval is skipped, the draft is still generated first—no send ever bypasses the composition step. Together with the grounding requirement (opportunity + resume_context), the invariant ensures that only opportunity-relevant, human-vetted content enters the send pipeline.
The key trade-off lies in what the draft is grounded against. The campaign engine explicitly rejects the “thin grounding” approach used by email_orchestrator, which reads only the last three sent and three received emails plus a “prior subjects” string. Instead, email_outreach grounds each touch on the specific opportunity and a seeded resume_context—but it never reads the full conversation history as the dedicated email_followup_graph does. This trade-off avoids the cost of loading and processing the entire email thread for every campaign touch (which would be expensive and unnecessary for independent sequence steps), but it sacrifices conversational continuity. The design accepts that each campaign touch may not reference prior replies, betting that opportunity-driven personalization is more valuable than conversational threading for outbound campaigns.
A concrete failure mode occurs when the opportunity data is incomplete or the resume_context is missing: compose_touch produces a skip_reason such as "no_draft" and the thread terminates with status='stopped'. An operator would see this as a stopped campaign thread in the UI with a clear skip reason. Another failure mode involves the agent_eval shadow gate in gate_draft: if eval_graph.ainvoke throws an exception, the module logs "agent_eval gate failed (fail-open to human)" and silently proceeds to the human interrupt. The operator would then see a draft awaiting approval but with no automated evaluation verdict attached—a signal that the evaluation gate encountered an error. In both cases, the chain of exact identifiers (compose_touch, skip_reason, await_approval, gate_draft) provides a precise trail for debugging.
-
compose_touch — Entry point for generating a campaign touch; reads the full
CampaignStateand starts the generation pipeline.- reads:
contact_id,sequence_step,company_vertical,recipient_name,recipient_role,recipient_email,opportunity_id,resume_context,campaign_id,tone,sub_niche. - writes:
status,stop_reason(if early exit),draft(after generation),run_id,flagged. - branch: Calls
_is_campaign_eligiblefirst; if it returnsFalse, the node sets{"status":"stopped","stop_reason":"not_eligible"}and returns immediately — happy path continues past this check.
- reads:
-
_is_campaign_eligible — Campaign-scoped eligibility gate (blocks definitive negatives, allows inconclusive/NULL).
- reads:
contact_idfrom state. - writes:
statusandstop_reasonkeys in the returned dict when not eligible. - branch: On
False→ compose_touch returns early with{"status":"stopped","stop_reason":"not_eligible"}. Happy path:True→ proceeds.
- reads:
-
_load_opportunity — Loads the concrete opportunity record from D1 by the stored
opportunity_id.- reads:
state.get("opportunity_id"). - writes: Returns
oppdict (orNone). The result is used downstream to buildpost_textandpost_url. - branch: If
opportunity_idis missing or D1 fails,oppmay beNone— this is allowed; the payload’sapplication_modewill beFalseandpost_textwill fall back to cold-outreach behavior.
- reads:
-
_build_post_text — Assembles the textual grounding for the touch from the opportunity and any
resume_contextseeded at launch.- reads:
opp(the loaded opportunity dict) andstate.get("resume_context"). - writes: Returns a string
post_text. This becomes the core personalization string for the email body. - branch: No explicit branch; empty or missing inputs produce a shorter grounding string.
- reads:
-
Construct
payloaddict — Composes the full argument dictionary to be passed to the shared drafting engine (email_outreachgraph).- reads:
recipient_name,recipient_role,recipient_email,post_text,post_url(fromopp),bool(opp)forapplication_mode,state.get("tone")(defaults to"warm"),company_vertical,state.get("sub_niche"),state.get("sequence_step"),state.get("resume_context"). - writes: A local
payloaddictionary with all those keys exactly as named. - branch: The value of
application_modedepends on whetheroppis truthy — ifNone, it becomesFalse(cold outreach behaviour).
- reads:
-
apply_prompt_version_tag / on_prompt_version_change — Tags the touch with the campaign-touch prompt version and triggers a regression eval if the templating policy hash changed.
- reads:
CAMPAIGN_TOUCH_PROMPTconstant; no state keys. - writes:
pv_id(prompt version identifier) used in telemetry; potentially triggers a LangSmith dataset regression. - branch:
on_prompt_version_changeis exception‑guarded (try/exceptblanket) so it never blocks composition; on failure it simply logs a warning and continues.
- reads:
-
agent_run_span("campaign_touch", …) — Opens a LangSmith trace span that nests the downstream LLM calls; captures the run id for later outcome feedback.
- reads:
statekeyscampaign_id,contact_id,sequence_step,vertical. - writes:
run_id(from the span’s id, orNoneif unavailable). The span metadata includescampaign_id,contact_id,sequence_step,prompt_version. - branch: If the span context can’t be created (e.g., tracing disabled or error),
run_idis set toNoneand composition continues.
- reads:
-
outreach_graph.ainvoke(payload) — Delegates the actual message drafting to the shared
email_outreachgraph; this is the core composition engine for every campaign touch.- reads: The full
payloaddictionary (includingapplication_mode,post_text, etc.). Theemail_outreachgraph internally validates claims, suppresses unsupported assertions, and respects global stop rules. - writes: Returns a
draftdictionary (subject, body, etc.) or an empty dict on failure. - branch: If the invocation raises an exception, the catch block sets
flagged = Trueanddraft = {}— the touch is flagged for review rather than failing the whole run.
- reads: The full
-
_upsert_thread (draft_pending) — Persists the draft and updates the campaign thread status to
draft_pending, preventing auto-send by the cron.- reads: The full
stateplus thedraftandrun_idfrom the previous steps. Exact fieldcriticalisFalse(inferred from pattern; schedule_next usescritical=Truefor waiting). - writes: Inserts or updates the
campaign_threadsrecord withstatus='draft_pending',wake_at=None, and stores the draft content. - branch: This step is always executed after a successful or failed generation; if it fails, the graph would not checkpoint correctly, but the code does not guard against it (assumed safe).
- reads: The full
-
interrupt(kind="approval") — Pauses the durable thread, waiting for human approval via the
approveCampaignDraftendpoint.- reads: No state keys; the interrupt carries metadata
{"kind":"approval"}. - writes: The graph’s execution halts at this point; the checkpoint saves the state as it was after the
_upsert_threadwrite. - branch: There is no branch here — every generated touch goes through approval. On resume (after approval), control will pass to the next node in the graph (likely
schedule_next).
- reads: No state keys; the interrupt carries metadata
This subsystem spends time and money primarily on LLM inference calls (the derive_followup_point and the delegated email_compose graph), D1 database reads (loading contact history, campaign threads, opportunities), and cron-driven batch processing (resuming due campaign threads, classifying opportunities). Every such operation is subject to tunable parameters that directly control latency, throughput, and cost. Below are six real performance knobs extracted from the source, each tied to its exact identifier and default value.
-
CAMPAIGN_RESUME_BATCH
- Knob — environment variable, default
"25". - Bounds — limits the number of
campaign_threadsrows withstatus='waiting'that a single cron tick picks up and attempts to resume. - Effect — raising the value increases the number of threads processed per 5‑minute tick, boosting throughput but also raising peak D1 query volume and potential collision risk if the same row is claimed in two ticks. Lowering it spreads work across more ticks, smoothing load.
- Risk — too high can cause a single tick to run long enough to overlap with the next tick, creating contention on
UPDATE ... WHERE id=?claims; too low leaves many threads idle longer, increasing average time‑to‑next‑touch.
- Knob — environment variable, default
-
concurrency (for
country_classify_bulk)- Knob — parameter passed to the bulk graph invocation, default
8. - Bounds — controls the maximum number of parallel async classification tasks running under an
asyncio.Semaphoreinside the graph. - Effect — increasing concurrency reduces wall‑clock time for the nightly country‑backfill cron, but raises instantaneous D1 load and memory pressure from in‑flight tasks. Decreasing it lowers peak resource usage at the cost of longer runtime.
- Risk — too high risks D1 throttling or OOM on a single‑worker Render instance; too low may not finish within a reasonable nightly window.
- Knob — parameter passed to the bulk graph invocation, default
-
limit (for
remote_classify)- Knob — parameter
{"limit": 2000}passed to theremote_classifygraph. - Bounds — caps the number of open, unarchived D1 opportunities scanned by the rule‑based classifier each run.
- Effect — raising the limit covers more opportunities per invocation, improving completeness but increasing the duration of the D1 scan and the size of the result set (which is logged but not written back). Lowering it shortens the run but may leave many opportunities unclassified.
- Risk — setting it too high could cause the cron to time out or consume excessive memory holding results; too low means the job never finishes the full backlog.
- Knob — parameter
-
_DEFAULT_MAX_TOUCHES
- Knob — module‑level constant, value
6. - Bounds — defines the maximum number of campaign touches (email sends) allowed for a thread when the campaign seed provides no cadence length. The cadence array
_DEFAULT_CADENCE_DAYSis also of length 6. - Effect — increasing this number extends the allowed outreach sequence, generating more LLM‑composed emails (more LLM cost) and more potential database rows. Decreasing it truncates sequences earlier, reducing both send volume and opportunity cost.
- Risk — too high risks exhausting recipient tolerance and damaging sender reputation; too low may miss conversion windows.
- Knob — module‑level constant, value
-
temperature (in
derive_followup_point)- Knob — parameter
temperature=0.2passed tomake_llm(provider="deepseek", tier="standard", temperature=0.2). - Bounds — controls the LLM’s creative randomness when distilling the conversation history into a follow‑up anchor JSON.
- Effect — a lower temperature (e.g., 0.2) produces more deterministic, conservative follow‑up points, reducing the chance of hallucination but may yield formulaic phrasing. Raising it increases variation and potential for more natural language but also raises token‑cost risk and the likelihood of unsupported claims that trigger the suppression logic.
- Risk — too high breaks the “grounded in evidence” requirement, causing the downstream
email_composeto reject the anchor or produce unsafe content; too low may make the follow‑up point too rigid.
- Knob — parameter
-
provider (LLM model choice)
- Knob —
provider="deepseek"in calls tomake_llmandainvoke_json_with_telemetry. The tier is also"standard". - Bounds — selects which LLM backend handles the
derive_followup_pointprompt (and implicitly the delegatedemail_composesubgraph, though its provider may be separately configured). - Effect — switching to a cheaper or faster provider reduces per‑inference cost and latency, while switching to a more capable model improves quality but increases both time and dollar spend. The default DeepSeek standard tier is a specific cost‑performance trade‑off point.
- Risk — changing the provider without re‑evaluating prompt behavior can break structured‑output parsing (the
ainvoke_json_with_telemetryexpects valid JSON) or degrade the safety gate’s ability to detect unsupported claims.
- Knob —
D1 Opportunity Query Failure
- Trigger —
_load_opportunityattempts ad1_allquery onopportunities+companies, but the D1 read fails (transient network, timeouts, unavailable). - Guard — The
except Exceptionclause in_load_opportunitycatches all exceptions and logs a warning:"campaign _load_opportunity failed id=%s: %s". The function returnsNone. - Posture — Fail-soft: The returned
Noneis fed into the compose payload; the downstreamoutreach_graph.ainvokereceives no opportunity data, so the drafted email will be generic (no role, no company‑specific grounding). The campaign continues. - Operator signal — Log line
WARNING:campaign _load_opportunity failed id=<id>: <exception>. Also visible in the final draft's lack of opportunity context. - Recovery — No retry. The
Nonevalue is used as‑is. The operator must manually review the resulting draft for quality; a re‑run requires a manual retrigger or editing the thread.
Compose (outreach_graph.ainvoke) Exception
- Trigger — When calling
outreach_graph.ainvoke(payload)inside thecomposenode, the shared drafting engine throws an exception (LLM API error, malformed output, internal timeout). - Guard — The outer
try/except Exceptionaroundainvokecatches any exception, setsflagged=Trueanddraft = {}, then logs:"campaign compose failed contact_id=%s: %s". - Posture — Fail-soft: The empty
draftis kept; later the node checkshas_draft = bool(draft.get("subject") and ...)which evaluatesFalse, so the thread is upserted with status"failed"and stop_reason"no_draft". The campaign does not stop other threads. - Operator signal — Log warning line
WARNING:campaign compose failed contact_id=.... Also aCampaignStatekeystop_reasonbecomes"no_draft", and the thread row incampaign_threadshasstatus='failed'. - Recovery — No retry. The thread is dead (status
failed). Manual intervention: operator may delete the thread and re‑schedule, or fix the underlying error and resume via the cron (the thread is no longerwaiting).
Eligibility Check Fail‑Open
- Trigger — The eligibility query in the
safety_gate‑like node (not shown in full but described) runs ad1_alloncontactsto checkdo_not_contact, email existence,outreach_eligible. A D1 read error occurs. - Guard — The
except Exceptionblock in that eligibility check logs"campaign eligibility check failed contact_id=%s (fail-open): %s"and returnsTrue(allow). - Posture — Fail‑open: The contact is treated as eligible even though the truth is unknown. Since every send must still go through human approval (
draft_pending→await_approval), the actual risk is limited; but a do‑not‑contact or invalid email could slip through to the draft stage. - Operator signal — Log line
WARNING:campaign eligibility check failed contact_id=... (fail-open): .... No metric or graph interruption. - Recovery — No retry. The
Truereturn is used immediately. The operator later sees a draft for a potentially blocked contact and can reject it during approval.
Counter Bump Failure
- Trigger — After a successful send (or resumption),
_bump_campaign_countsruns anUPDATEonemail_campaignsto increment counters. The D1 write fails. - Guard — The
except Exceptionclause logs"campaign _bump_campaign_counts failed: %s"and returns without re‑raising. - Posture — Fail‑soft: The send path continues unaffected. The counters (
emails_sent,emails_scheduled,emails_failed) become stale; they are only used for UI display and scheduling, so no correctness impact. - Operator signal — Log line
WARNING:campaign _bump_campaign_counts failed: <exception>. The campaign card in the UI will not reflect the updated send count until the next successful bump (or a manual refresh). - Recovery — No retry. The counters will be corrected on the next successful bump if the D1 issue is transient. Otherwise, an operator must run a manual SQL update.
Empty Draft After Compose (No Exception)
- Trigger —
outreach_graph.ainvokereturns a dict withskip_reasonabsent, but the returneddrafthas nosubjectand notextorhtml(e.g., a silent empty response from the LLM). - Guard — The explicit check
has_draft = bool(draft.get("subject") and (draft.get("text") or draft.get("html"))). IfFalse, the node upserts the thread asfailedwithstop_reason = "no_draft". No exception is raised. - Posture — Fail‑hard (for this thread): The thread is terminated with status
failed. No further retries. - Operator signal — No log (the branch that logs is only taken when
flaggedis true, which is set only on exception). The operator sees the thread row withstatus='failed',stop_reason='no_draft'in the database, and thependingfield remains empty. - Recovery — Manual. The operator must inspect the thread, decide whether to re‑run or discard.
Cron Thread Claim Failure
- Trigger — The 5‑min cron
campaign-resume-dueattempts to claim acampaign_threadsrow byUPDATE campaign_threads SET status='running' ... WHERE id=?. The update fails (D1 write error, deadlock, constraint). - Guard — The cron’s outer
try/exceptcatches the exception and returns{"ok": False, "job": "campaign-resume-due", "error": f"{type(exc).__name__}: {exc}"}. The loop does not retry the failing row. - Posture — Fail‑soft (for that single tick): The cron reports failure but continues its tick loop. The thread remains in
waitingstatus (since the update did not execute), so it will be picked up by a later tick (provided the D1 issue is transient). - Operator signal — Log line
WARNING:campaign-resume-due: due query failed(if the select fails) OR the returned error object shows{"ok": false, "error": "..."}. No individual per‑thread alert. - Recovery — Implicit retry on the next 5‑min cron tick, as long as the row’s
wake_atis still ≤ now. No exponential backoff, just a periodic retry.
Q — How does the campaign graph produce the text for each outreach touch?
A — The compose_touch node delegates to the compiled email_outreach graph by calling outreach_graph.ainvoke(payload). The payload includes the opportunity loaded fresh from D1 and the resume_context seeded at launch from @ai-apps/resume, ensuring each touch is grounded in the contact’s specific fit rather than generic boilerplate.
Follow-up — How does the campaign graph trace the sub-invocation so operators can later debug the compose call?
It wraps the ainvoke inside an agent_run_span named campaign_touch, which nests the email_outreach LLM calls under a single chain run; the run id is stamped on the emails row and campaign_threads.
Weak answer misses — That the opportunity is loaded fresh from D1 (not cached) and that the run id is captured for reply‑based outcome feedback.
Q — Why does the campaign graph not read the full email history like the email_followup graph does, and instead only ground on opportunity + resume_context?
A — The comments in email_followup_graph.py explicitly note that the campaign graph “ground each touch on opportunity + resume + sequence_step and never read what was actually said in the thread”. This is a deliberate design choice: campaign touches are independent sequence steps that should reference the core fit reason, not conversational drift. The email_followup graph is a separate dedicated graph for history‑grounded follow‑ups.
Follow-up — How does the campaign graph avoid repeating the same message across touches without thread history?
It passes the current sequence_step index ("sequence_step": step) in the payload, letting the email_outreach graph adjust its prompt for the step’s position in the cadence.
Weak answer misses — The sequence_step parameter is explicitly set in the payload and is the mechanism for cadence‑aware variation, not just a static template.
Q — What happens if the delegated email_outreach graph returns an error or a skip_reason during composition in compose_touch?
A — The node catches all exceptions with except Exception, logs a warning, sets flagged = True, and writes an empty draft = {}. The email_outreach graph itself short‑circuits on replied/unsubscribed/bounced by returning a skip_reason and an empty draft. The flagged state is later pushed into the annotation queue via record_outcome_feedback for human review.
Follow-up — Does a compose failure block the entire campaign run?
No—the graph’s design isolates per‑contact errors; only the failing touch is flagged, and the cron continues processing other contacts (the campaign‑resume‑due job logs errors but does not abort).
Weak answer misses — That errored touches are explicitly sent to the annotation queue (not silently dropped), as stated: “Errored touches are flagged into the annotation queue.”
Q — Why does the campaign graph use a separate ainvoke on the email_outreach graph instead of inlining the drafting logic inside compose_touch?
A — The email_followup_graph.py comments explain the reuse principle: “the writing is delegated to the compiled email_compose graph … so this graph … inherits the refine / anti‑AI‑marker / faithfulness logic.” This avoids duplicating the entire sending safety stack, prompt versioning, and suppression checks across multiple graphs. The campaign graph adds only the orchestration and prompt‑version tagging.
Follow-up — How does the campaign graph ensure prompt‑version changes trigger automatic regression testing?
It calls apply_prompt_version_tag(CAMPAIGN_TOUCH_PROMPT) and then on_prompt_version_change("campaign_touch", CAMPAIGN_TOUCH_PROMPT, dataset_name=_CAMPAIGN_DATASET) to fire a regression eval against the campaign dataset whenever the templating policy hash changes.
Weak answer misses — The specific dataset name _CAMPAIGN_DATASET and the fact that the prompt‑version change handler is exception‑guarded (bare except Exception) so it never aborts composition.
Q — (Hard design) The email_followup graph reads the entire thread to produce a follow‑up point, while the campaign graph only uses opportunity + sequence_step. Why is there no hybrid approach that also reads the last few replies to detect the contact’s sentiment?
A — The source explicitly calls out this trade‑off: the follow‑up graph is built because the existing agents (including campaign) “never read what was actually said in the thread” and ground too thinly. However, the campaign graph’s purpose is a pre‑set sequence where each touch is meant to be a fresh outreach based on the original fit, not a conversational follow‑up. Adding thread‑history would make the campaign graph dependent on D1 availability for the full thread and blur the line between campaign and follow‑up use cases.
Follow-up — Then how does the campaign graph handle a contact who has already replied to a prior touch (e.g., “not interested”)?
The email_outreach graph’s short‑circuit logic detects replies via skip_reason and returns an empty draft, which the campaign graph flags. The cadence cron later skips contacts with a pending status='draft' follow‑up from the separate email_followup graph.
Weak answer misses — The role of the separate email_followup graph in handling replied contacts; the campaign graph itself does not read thread history and relies on the safety check in the delegated email_outreach graph to stop.
9. Launching And Steering
It is like a toy robot that writes a letter, waits for you to say yes or no, and then either keeps going or stops completely.
Think of it as a patient robot pen-pal. You give it a task to send a series of messages to someone, but it pauses after each one to wait for your approval. You have a few simple buttons: one to start the robot, one to tell it if the message is okay or needs changes, and one to tell it to stop forever. This system is built this way so you do not have to worry about the robot's complicated inner workings — you just use a few clear actions to control it, and it handles the long, slow job of waiting and sending messages.
The system is built around a small, explicit set of user-facing actions that map directly to state transitions in a durable, long-running thread. The core idea is to hide the complexity of a multi-step, approval-pausing campaign behind a handful of verbs: create, launch, review, and stop. A rejected alternative would be to expose the internal state machine directly, allowing operators to manually advance or rewind steps, which would increase complexity and risk of errors. The trade-off is clear: by keeping the surface small and well-understood, the system sacrifices flexibility for operational simplicity, ensuring that the long-lived, stateful machinery remains a black box that can only be driven through these four safe, predictable commands.
The approval node maps operator decisions to state transitions, hiding the internal state machine behind four safe commands.
async def await_approval(state: CampaignState) -> dict[str, Any]:
decision = interrupt({
"kind": "approval",
"step": int(state.get("sequence_step") or 0),
"subject": state.get("pending_subject"),
"text": state.get("pending_text"),
"html": state.get("pending_html"),
"run_id": state.get("pending_draft_run_id"),
})
action = "approve"
if isinstance(decision, dict):
action = str(decision.get("action") or "approve").lower()
elif isinstance(decision, str):
action = decision.lower()
if action == "reject":
await _upsert_thread(state, status="stopped", wake_at=None, stop_reason="draft_rejected")
return {"status": "stopped", "stop_reason": "draft_rejected", "_decision": "reject"}
if action == "skip":
return {"status": "running", "_decision": "skip"}
# approve or edit — continue the campaign
return {"status": "running", "_decision": action}
The subsystem centers on a human-in-the-loop campaign engine where all operator actions are confined to four verbs—create, launch, review, stop—that map to durable state transitions in a LangGraph thread. The ordered mechanism begins with compose_touch, which drafts an email grounded on opportunity and resume context, then pauses at await_approval for a human decision. The operator responds with one of "approve", "reject", "skip", or "edit"; "edit" overrides pending_subject and pending_text while synchronizing the HTML body. If "reject" is chosen, _upsert_thread flips the row to status="stopped" with stop_reason="draft_rejected" so the reconciliation loop does not count a stale draft_pending as active. A "skip" keeps the thread "running" and defers the touch. Only "approve" advances to send_touch, which enforces a per-vertical daily cap (CAMPAIGN_VERTICAL_DAILY_CAP, default 20) and performs a redundant suppression check before live dispatch. All state is persisted through the checkpointer, and a shadow agent_eval verdict is recorded via record_verdict and later backfilled with the human decision.
The invariant preserved is that every campaign thread advances through exactly one explicit, user-visible approval gate per touch before any email is sent. This is a write boundary enforced by the checkpointer’s interrupt mechanism: the graph never proceeds past await_approval without a stored human decision. The send_touch node doubles this guarantee with a fail-closed belt-and-braces check of suppression lists and do_not_contact immediately before dispatch, ensuring that even a fast-tracking bug cannot bypass safety. In autonomous mode (auto_approve=True), the same boundary is replaced by the STRICT pipeline gate that requires outreach_eligible=1 and enforces the daily cap at send time, swapping human review for rigid caps and redundant suppression re-checks.
The key trade-off is operational simplicity at the cost of flexibility. The obvious rejected alternative is exposing the internal state machine directly—allowing operators to manually rewind, skip, or reorder steps, which would require complex rollback logic and risk inconsistent thread state (e.g., two sends from the same step or a missed eligibility check). The system instead provides a handful of high-level verbs (create, launch, review, stop) that each trigger a deterministic state transition. This sacrifices the ability to finely control sequencing, but it avoids the high cost of debugging concurrency bugs, phantom writes, or orphaned draft_pending rows that a freeform machine would invite. The cost of the rejected alternative is demonstrated by the elaborate handling of "reject" via _upsert_thread and stop_reason—exposing the raw state machine would multiply such edge cases.
A concrete failure mode occurs when the agent_eval gate raises an exception during the shadow evaluation. The node catches Exception broadly (BLE001) and logs a warning: "agent_eval gate failed (fail-open to human): %s", exc. The operator sees that warning in the runtime logs, and the verdict defaults to an empty dict, so no scoring is stored on the state. However, the campaign proceeds unblocked to await_approval because the gate is explicitly fail-open by design. An operator monitoring the LangSmith trace would see the touch’s run record missing the agent_eval nested span, and the emails row would have a null agent_eval_verdict_id. If they check the annotation queue, no outlier entry would appear for that touch, silently forgoing the auto-review safety net.
-
graph.ainvoke (initial call) — The graph’s entry point; the caller provides the initial state (e.g.
contact_id).- reads / writes — Consumes the input state keys passed by the caller; writes nothing itself (the graph state is initialised before any node).
- branch — None here; control always enters the first node.
-
hydrate (node) — Loads the contact, company, and company facts from D1 (best‑effort).
- reads / writes — Reads from D1 (no explicit state keys before this); writes
contact,company,company_factsinto state. - branch — If D1 is unavailable → degrades to a context‑free skip (the graph returns early with a
skip_reason). Happy path: continues toload_full_history.
- reads / writes — Reads from D1 (no explicit state keys before this); writes
-
load_full_history (node) — Loads every prior sent and received email for this contact (up to 60 rows, each body truncated to 600 characters).
- reads / writes — Reads possibly a DB or D1 (implicit); writes
full_history(or similar list) into state. - branch — Always proceeds; an empty history is valid.
- reads / writes — Reads possibly a DB or D1 (implicit); writes
-
safety_gate (node) — Checks suppression,
do_not_contact, bounced, and replied flags.- reads / writes — Reads the contact’s suppression status (likely from state or D1); if a skip condition is met, writes
skip_reasoninto state. - branch — If skip condition true → the graph ends immediately (returns to caller with
skip_reason). Happy path: continues toderive_followup_point.
- reads / writes — Reads the contact’s suppression status (likely from state or D1); if a skip condition is met, writes
-
derive_followup_point (node) — One LLM call that distills the entire thread into an explicit anchor (
followup_point).- reads / writes — Reads
full_historyand possiblycontact; writesfollowup_pointinto state. - branch — Conditional (marked
[cond]in the flow); if the LLM fails (e.g. timeout, empty output) the node may still produce a fallback or the graph may skip. Happy path: writes a validfollowup_point.
- reads / writes — Reads
-
compose (node) — Delegates to the compiled
email_composesubgraph viaainvoke, grounded on thefollowup_point.- reads / writes — Reads
followup_point,contact,company, etc.; the subgraph writessubject,body,sequence_type,to_email,parent_email_id,prompt_version,model,prompt_tokens,completion_tokensinto state. - branch — No explicit branch in this node; the subgraph handles its own internal branching. If the subgraph fails, the error propagates up (no skip).
- reads / writes — Reads
-
Graph return — The final output dictionary is assembled from the state:
subject,body,followup_point,sequence_type,to_email,parent_email_id,prompt_version,model,prompt_tokens,completion_tokens,graph_meta, and optionallyskip_reason.- reads / writes — Reads the state keys produced by
compose; writes nothing to state (the run is terminal). - branch — No branch; this is the successful terminal step. If a skip occurred earlier, the graph would have ended at
safety_gate.
- reads / writes — Reads the state keys produced by
Based on the source code excerpts, this subsystem spends time and money primarily on LLM invocations (inside email_compose, derive_followup_point, and the campaign_graph’s compose_touch step) and on D1 database queries (the due-row SELECTs and UPDATEs). Five real performance knobs appear in the code, each controlling a different dimension of resource usage:
CAMPAIGN_RESUME_BATCH
- Knob — environment variable
CAMPAIGN_RESUME_BATCH, default"25"(parsed as integer). - Bounds — limits how many
campaign_threadsrows withstatus='waiting'are pulled per cron tick. - Effect — lowering the batch reduces peak D1 read/write load and the number of concurrent campaign‑touch LLM calls, lowering both latency per tick and dollar cost; raising it increases throughput at the cost of higher burst cost and longer individual tick duration.
- Risk — too low: many due rows accumulate, causing follow‑up delay and possible missed send windows. Too high: a single tick can overload the free‑tier D1 query capacity or saturate the single‑worker LangGraph runtime (the code notes that fan‑out paralyses single‑worker runtimes).
enrich_sales_tech concurrency
- Knob — parameter
"concurrency": 2passed to theenrich_sales_techgraph’sainvokecall insiderun_enrich_sales_tech. - Bounds — limits how many companies are processed in parallel (each company takes 90–180 seconds and performs HTTP fetches + about 10 LLM calls).
- Effect — increasing from 2 to, say, 8 completes the weekly run faster, but multiplies peak LLM token spend and D1 writes proportionally; reducing it saves money and lowers concurrent network pressure.
- Risk — too high: the cron may hit API rate limits on the external fetches or exceed the LangGraph runtime’s single‑worker constraint (the same passage warns against parallel fan‑out). Too low: the run takes too long and profiles go stale.
country_classify_bulk concurrency
- Knob — parameter
"concurrency": 8passed to thecountry_classify_bulkgraph’sainvokeinsiderun_country_classify_nightly. - Bounds — controls how many company rows are classified concurrently (each row requires a small LLM call).
- Effect — higher concurrency finishes the nightly backfill faster, but burns through more token budget in a shorter window; lower concurrency spreads cost over time at the expense of slower cleanup of missing
countryvalues (which then drop those rows out of the sales‑tech tab). - Risk — too high: may cause D1 contention or exceed the per‑database‑ID query limits enforced by Cloudflare D1. Too low: the daily run may not finish before the next cycle, leaving many rows unnmarked.
_DEFAULT_MAX_TOUCHES
- Knob — source‑code constant
_DEFAULT_MAX_TOUCHES = 6incampaign_graph.py. - Bounds — caps the total number of touches (LLM‑drafted emails) in a campaign sequence before the thread goes terminal.
- Effect — a lower number reduces total LLM calls per contact, lowering cost and time per campaign; a higher number increases the chance of a reply (more touches) but also the dollar spend and the risk of sending unwanted emails.
- Risk — too low: the sequence may exhaust before a meaningful response appears, wasting the opportunity. Too high: the system may send excessive follow‑ups, annoying recipients and raising spam flag risk; also increases D1 storage for the
campaign_threadsandemailsrows.
dry_run (enrich_sales_tech)
- Knob — parameter
"dry_run": Falsepassed to theenrich_sales_techgraph. - Bounds — when
True, the graph skips writing any newdeep_analysisfeatures to D1; the graph still performs all HTTP fetches and LLM calls (thus still spending money on tokens and compute). - Effect — set to
Trueto preview the results without incurring the cost of D1 writes (though LLM cost remains); set toFalseto persist the features, making them available for downstream queries. - Risk — left
Trueaccidentally: no data is stored, so subsequent crawls will re‑process the same companies (wasting time and money). LeftFalsewhen you only want a count: wasted write operations.
These five knobs directly govern the subsystem’s latency, throughput, and dollar cost by controlling parallelism, batch size, sequence depth, and write‑side effects. All identifiers are taken verbatim from the provided source files.
D1 Due Query Timeout in Campaign Resume Cron
- Trigger — A transient D1 outage or slow query when
d1_allselects due rows fromcampaign_threadswith the conditionstatus = 'waiting' AND wake_at IS NOT NULL AND wake_at <= ? ORDER BY wake_at ASC LIMIT ?. - Guard —
except Exception as exc: log.exception("campaign-resume-due: due query failed")in the resume-due cron function (inside_cron.py). The function returns an error dict with the exception message. - Posture — Fail-soft for the entire cron tick. No campaign threads are resumed, but the cron worker does not crash and will attempt the next tick on schedule.
- Operator signal — The log line
"campaign-resume-due: due query failed"with the full exception traceback. - Recovery — The cron is driven by a fixed 5-minute schedule. The next tick retries the same query from scratch. No exponential backoff is present; continuous D1 degradation means repeated failures until the query succeeds.
Eligibility Check D1 Failure (Fail-Open)
- Trigger — A D1 transient error when querying a contact's
email,outreach_eligible, anddo_not_contactcolumns during the campaign graph's safety gate. - Guard —
except Exception as exc: log.warning("campaign eligibility check failed contact_id=%s (fail-open): %s", contact_id, exc)followed byreturn True. - Posture — Fail-open. The gate allows the campaign to proceed even though eligibility is unknown. The human-in-the-loop approval step still stands as the final safeguard, but a definitively ineligible contact might waste operator attention.
- Operator signal — The warning log line above. There is no metric counter or dashboard widget that directly catches this.
- Recovery — No automatic retry. The next time a draft is generated for the same contact (e.g., on a later step), the eligibility check will run again and may succeed. Operators must spot the warning and manually verify eligibility.
Campaign Counter Update Best-Effort Failure
- Trigger — A D1 write failure when
_bump_campaign_countsattempts toUPDATE email_campaigns SET emails_sent = emails_sent + ?, ... WHERE id = ?. - Guard —
except Exception as exc: log.warning("campaign _bump_campaign_counts failed: %s", exc). The function returns silently; the send path is never aborted. - Posture — Fail-soft (best-effort). The campaign row counters become stale, but the actual send is not rolled back and the thread state is updated independently.
- Operator signal — Warning log line
"campaign _bump_campaign_counts failed: %s". The campaign card in the UI will show frozen numbers until the next successful write. - Recovery — No retry logic. The next call to
_bump_campaign_counts(for the next send in the same campaign) will attempt a new update, but if the failure is persistent, the counters drift indefinitely. Manual reconciliation via SQL is the only recovery path.
Draft Generation Failure (Empty Draft)
- Trigger — The LLM call inside the compose subgraph fails, validation rejects the output, or the
email_composegraph returns a result withoutsubjectortext/html. This causeshas_draftto beFalsein the campaign graph. - Guard — The check
if not has_draft:followed byawait _upsert_thread(state, status="failed", wake_at=None, stop_reason="no_draft"). Separately, flagging logic may catch the failure:except Exception as exc: log.warning("campaign flag failed: %s", exc). - Posture — Fail-hard for that individual campaign thread. The thread transitions to
status='failed'and will never be retried automatically by the cron (which only querieswaitingrows). - Operator signal — No explicit log line for the no-draft condition itself (the
ifblock does not log). The operator would need to observe the thread's status changed tofailedin the database, or rely on the flagged-run error if LangSmith tracing is enabled (is_flagged_run(error=True)andqueue_flagged_run). - Recovery — No automatic retry. An operator must manually reset the thread (e.g., change
statusback towaitingwith an appropriatewake_at) and trigger a new resume. The cron will not pick up afailedrow.
Thread Claim Update Failure (Double-Resume Risk)
- Trigger — A D1 transient error when trying to claim a due row by
UPDATE campaign_threads SET status='running', ... WHERE id=?inside the resume-due cron loop. - Guard — The
try:block around thed1_runcall, whoseexcept Exception as exc:appends the exception to theerrorslist (variableerrors: list[str] = []). The row remains instatus='waiting'. - Posture — Fail-soft for that single row. The cron continues processing other rows. The unclaimed row will be picked up by the next cron tick, which re-executes the due query and sees it again. This creates a risk: if the first tick had actually completed the draft generation but the status update failed, the row might be processed twice. In practice, the draft generation only occurs after a successful claim; if the claim fails, no graph invoke runs, so no duplicate send occurs. However, if the claim succeeded but the subsequent graph invoke failed (leaving status
running), the row is never retried — a different failure mode not covered here. - Operator signal — The
errorslist is returned in the cron function's summary dict, and likely logged. Specifically, the error count appears in the summary and in any structured logging of the cron result. - Recovery — The next cron tick automatically retries the unclaimed row because
status='waiting'persists. No manual action is needed unless the D1 outage is prolonged, causing repeated retries.
Q1 (warm-up)
Q: How does the campaign graph ensure that a human review decision (approve, reject, skip, edit) acts on exactly the draft the reviewer saw, without risk of regeneration?
A: The await_approval node uses interrupt() to pause execution after compose_touch has generated the draft. On resume, only await_approval re‑runs (the docstring says “Only this node re-runs on resume (compose_touch already completed), so the draft is never regenerated”), so the operator’s decision acts on the exact draft that was held for review.
Follow-up: What happens if the reviewer edits only the subject but the system also needs to keep the HTML body in sync?
A: The code sets pending_subject from the edit dict, but the comment notes that the HTML body would remain stale because the review UI sends only subject+text — a known gap.
Weak answer misses: The Command(resume=…) checkpoint mechanism that makes compose_touch skip on resume.
Q2 (medium)
Q: Why does the system use interrupt() for cadence sleep (schedule_next) instead of a simpler scheduler‑based approach that directly updates state without pausing the thread?
A: The schedule_next node calls interrupt({"kind": "cadence", …}) to create a durable pause checkpointed exactly at that point. The cron later resumes the same thread by matching status='waiting' and wake_at, while the alternative would require a separate state‑managing scheduler prone to races. The comment “critical: this ‘waiting’ write is the ONLY thing that makes the cron re‑select this thread” justifies the design.
Follow-up: How is idempotency guaranteed if the cron triggers the same thread multiple times around the wake_at boundary?
A: The _upsert_thread before the interrupt uses ON CONFLICT … DO UPDATE, and the schedule_next node only re‑runs on resume — no duplicate step advances.
Weak answer misses: The ON CONFLICT … DO UPDATE upsert pattern and the fact that only the interrupted node re‑runs.
Q3 (hard – design question)
Q: The system deliberately hides the internal state machine behind a small set of actions (approve, reject, skip, edit). Why was this chosen over exposing operators to the full state machine (e.g., allowing manual step rewind or advance)?
A: Exposing manual step manipulation would break the invariant that compose_touch runs exactly once per step (its draft is never regenerated). The code in await_approval maps actions directly to terminal statuses like "stopped" or "running", and schedule_next enforces the max_touches boundary automatically. The chapter’s trade‑off is clear: minimizing operator complexity prevents race conditions and guarantees that the state follows a well‑understood lifecycle.
Follow-up: What mechanism prevents an operator from accidentally advancing a campaign beyond the touch limit without the system catching it?
A: In schedule_next, after incrementing next_step, the check if next_step >= max_touches: sets status "completed" with stop_reason: "sequence_complete", which is idempotent and cannot be bypassed by a manual action.
Weak answer misses: The explicit guard in schedule_next against exceeding max_touches.
Q4 (hardest)
Q: The graph uses both an approval interrupt (await_approval) and a cadence interrupt (schedule_next). How does it ensure that state updates applied during the resume of an interrupt are not double‑applied or lost?
A: For await_approval, the node returns a dictionary of updates (e.g., {"status": "stopped", …}) that are applied after the node fully completes; the interrupt prevents preceding nodes from re‑executing. For schedule_next, the _upsert_thread call is idempotent (ON CONFLICT … DO UPDATE) and marked critical=True, so a silent failure raises an exception. The checkpoint saves state before the interrupt, so a retry re‑runs only the interrupted node, and the upsert is safe.
Follow-up: What would break if the interrupt() itself raised an exception after the idempotent upsert but before the checkpoint was persisted?
A: The checkpoint persists the node’s state before the interrupt returns, so on retry the idempotent upsert is applied again harmlessly; the critical=True flag ensures no silent skips.
Weak answer misses: The critical=True flag on the _upsert_thread in schedule_next and the checkpoint‑based recovery model.
10. Putting It Together
It is like a robot pen-pal that writes a letter, waits for you to say okay, sends it, then takes a nap for a few days before waking up to check if your friend already wrote back.
Think of it as a patient robot assistant that handles long email conversations. Instead of sending all emails at once, it writes one message, pauses for you to approve it, sends it, then sleeps for days. It only continues if the person hasn't replied. This solves the problem of running many multi-week follow-up sequences by hand, which is fragile and attention-hungry. The robot never loses its place, even if the system restarts, and it stops automatically when a real conversation starts.
The campaign engine is a durable, multi-touch email sequence that pauses for human approval and survives restarts. Concrete parts include a draft-first step that queues a message for human review, a single sender that routes all sends through one auditable point, a cadence-as-interrupt mechanism that uses an outside clock to wake the sequence after days of sleep, and a stop-on-reply flag that halts further touches once a contact responds. The rejected alternative is a batch-run sweep that discovers and drafts for a fresh crowd in a single pass, which cannot nurture slow relationships. The trade-off is that the durable thread adds complexity to track state across restarts, but it prevents losing weeks of sequence progress, while the approval pause sacrifices speed for safety, ensuring no message is sent without human say-so.
The campaign engine uses a durable, multi-touch graph with human approval pauses and cron-driven cadence.
# survives restarts via D1 checkpointer; cron resumes with Command(resume=True)
# Draft-first: compose_touch generates a draft and HOLDS it for human review
async def compose_touch(state: CampaignState) -> dict[str, Any]:
# … eligibility gate …
draft = await outreach_graph.ainvoke(payload)
# status=draft_pending — NO send, waits for human approval
# await_approval — interrupt(kind="approval"); resumed by approveCampaignDraft
# send_touch — single auditable send point (send_error → END)
# schedule_next — interrupt(kind="cadence"); resumed by CF cron days later
# check_reply — if contact replied since last touch → END, else continue
# Two interrupt kinds: approval (UI-resumed) vs cadence (cron-resumed)
# cron only resumes status='waiting' rows, so draft_pending threads are safe
The campaign engine runs as a single LangGraph thread per (campaign, contact) with a stable thread_id. The ordered mechanism starts at check_reply: if the contact has replied, the thread terminates immediately. If not, compose_touch generates a draft via the email_outreach graph and holds it as status='draft_pending'—no send occurs yet. The thread then enters await_approval with an interrupt of kind "approval", waiting for a human to approve or edit via approveCampaignDraft. Only after explicit approval does send_touch fire, which is the single node that actually dispatches an email. It then calls schedule_next, which creates a "cadence" interrupt, and the thread sleeps until a Cloudflare cron (workers/campaign-runner → POST /cron/tick) resumes it days later, returning to check_reply for the next touch. Failures propagate cleanly: a skip_reason or no_draft from compose_touch ends the thread; a send_error from send_touch ends it; a rejection at await_approval ends it; completing the sequence at schedule_next ends it.
The central invariant is draft-first—"no touch is ever sent without an explicit human approve/edit". This guarantee is encoded in the graph topology: compose_touch and await_approval always precede send_touch. The state checkpoints after generation, so a resumed thread reuses the exact draft the operator saw. Autonomous mode (auto_approve=True) bypasses await_approval, but compensates with a fail-closed gate: send_touch enforces a per-vertical daily cap (CAMPAIGN_VERTICAL_DAILY_CAP, default 20) and re-checks suppression lists immediately before dispatch, plus eligibility uses the STRICT pipeline gate requiring outreach_eligible=1.
The key trade-off rejects a batch-run sweep that discovers and drafts for a fresh crowd in a single pass. Such a sweep could send many emails quickly but "cannot nurture slow relationships." This design accepts higher latency and per-contact state overhead to enable long, cadenced follow-ups grounded in the full conversation history. The cost avoided is the inability to handle multi-day gaps, personalized resumption, and the "stop-on-reply" flag—all of which would require complex re‑scheduling logic in a batch model. Instead, the graph uses cadence-as-interrupt: the thread persists via the D1 checkpointer and is woken by the cron, giving natural durability without a separate scheduler queue.
A concrete failure mode is a daily cap hit at send_touch. When the count for the vertical reaches CAMPAIGN_VERTICAL_DAILY_CAP (or the count query errors), the node does not send the email; instead it defers by creating a "cadence" interrupt for the next send window. The operator sees the thread still status='waiting' and the state’s trajectory includes a note like "touch deferred due to daily cap." No email is lost—the draft remains in the checkpointed state and will be retried on the next resume. Another visible signal is a send_error logged at warning level, which terminates the thread without sending, and the operator must inspect the error in the annotation queue or D1 to determine why dispatch failed.
-
graph.ainvoke(initial_state)
The graph receives the initial state (containingcontact_id,opportunity_id,recipient_email,recipient_name,sequence_step=0,cadence_days,max_touches) and theSTARTedge unconditionally routes tocheck_reply.
reads / writes – consumescontact_id,opportunity_id,recipient_email,recipient_name,sequence_step,cadence_days,max_touches; writes nothing yet.
branch – no conditional; always proceeds tocheck_reply. -
check_reply
Queries the D1 thread for this contact to detect any reply since the last touch (or initial state).
reads / writes – readscontact_idfrom state; writes nothing directly (the node returns an updatedstatusin state later via conditional).
branch – if no reply is found,_after_check_replyreturns"compose_touch"(happy path); if a reply exists, it returnsENDand the graph terminates with a terminal status (e.g.,"reply_received"). -
compose_touch
Drafts the next email in the sequence. It first checks campaign eligibility via_is_campaign_eligible(contact_id). If eligible, it builds apost_textfrom the opportunity and resume context, then delegates to theemail_outreachsubgraph (insideagent_run_span) to generate the draft.
reads / writes – readscontact_id,sequence_step,company_vertical,opportunity_id,resume_context,recipient_name,recipient_role,recipient_email,post_text,post_url; writesstatus='draft_pending',draft_data(subject, body),last_touch_run_id.
branch – if_is_campaign_eligiblereturnsFalse, it writesstatus='stopped',stop_reason='not_eligible'and returnsEND(failure path). Otherwise it proceeds, and_after_compose_touchreturns"await_approval". -
await_approval
Pauses the graph with aninterruptof kind"approval", holding the draft for human review. The graph checkpoints and returns control to the caller.
reads / writes – reads state unchanged; writes nothing (the interrupt is handled by the framework).
branch – after the human reviews and resumes the graph (viaapproveCampaignDraft), the node resumes with the same state, and_after_approvalchecks for anapprovedflag (or similar). If approved, it returns"send_touch"; if rejected/skipped, it returns"schedule_next"; on error it returnsEND. -
send_touch
Sends the approved draft via the single auditabledispatch_sendfunction. The send is always live, and the run id from the draft is used later for outcome feedback.
reads / writes – readsdraft_data(subject, body),to_email(fromrecipient_email),last_touch_run_id; writesstatus='sent',sent_at.
branch – on successful send,_after_sendreturns"schedule_next"; on send failure it likely writes an error status and returnsEND. -
schedule_next
Advances the sequence step. It reads the currentsequence_stepand comparesnext_step = step + 1againstmax_touches.
reads / writes – readssequence_step,cadence_days,max_touches; writesstatus='waiting',wake_at(future timestamp),stop_reason(only if terminal).
branch – ifnext_step >= max_touches, it writesstatus='completed',stop_reason='sequence_complete'and returnsEND(terminal). Otherwise it issues aninterrupt({"kind":"cadence", "wake_at":..., "next_step":...})to pause until the cron fires, and on resume it returns{"sequence_step": next_step, "status": "running"}. -
_after_scheduleconditional
Applied afterschedule_nextcompletes. It checks the updatedstatusin the returned state.
reads / writes – readsstatusfrom the node output; writes nothing.
branch – ifstatusis in_TERMINAL(e.g.,"completed"or"stopped"), it returnsEND; otherwise (happy path for continuing sequence) it returns"check_reply". -
check_reply(resumed loop)
The graph re-enters this node after either a cadence wake‑up or a direct approval rejection that skipped send. It again queries the thread for new replies.
reads / writes – same as step 2.
branch – if the contact has now replied (e.g., during the cadence sleep),_after_check_replyreturnsEND(terminal – stop‑on‑reply). If still no reply, it returns"compose_touch"to generate the next touch.
The cycle from step 2 through step 8 repeats until either a reply is detected (step 8 terminal) or the sequence is exhausted (step 6 terminal). In the happy‑path scenario where the contact replies after the first sent touch, the trace terminates at step 8 with END.
The subsystem—centered on the campaign engine but extending to supporting crons and graphs—consumes time and money through predictable performance levers. Based solely on identifiers and constants present in the provided source, five real knobs emerge. Each controls a distinct trade-off between speed, throughput, cost, and reliability.
CAMPAIGN_RESUME_BATCH
- Knob — environment variable
CAMPAIGN_RESUME_BATCH; default"25". - Bounds — caps the number of
campaign_threadsrows popped per cron tick from thewaitingstate. - Effect — increasing the batch lets more overdue campaign touches be resumed per five‑minute cycle, lowering the backlog and reducing the average time‑to‑send for due messages. Decreasing it lightens the per‑tick load on D1 and the graph runtime but may let threads linger in
waitinglonger. - Risk — a value too high can cause a single cron invocation to run past the five‑minute tick interval, creating overlapping race conditions or D1 timeouts. Too low may never catch up if the queue of due threads grows faster than the tick rate, effectively increasing total latency.
_DEFAULT_MAX_TOUCHES
- Knob — code constant
_DEFAULT_MAX_TOUCHES=6incampaign_graph.py. - Bounds — the maximum number of touches (email sends) a campaign can produce for one contact before the thread becomes terminal.
- Effect — raising the limit extends the lifetime of a campaign sequence, increasing both the chance of a reply and the total send cost (more LLM calls, more email dispatch dollars). Lowering it shortens the campaign, reducing spend but potentially leaving leads under‑nurtured.
- Risk — setting it too high risks reputation damage from excessive sending and increases the probability of being flagged as spam. Too low may abort a sequence before a slow prospect responds, wasting earlier touches.
_DEFAULT_CADENCE_DAYS
- Knob — code constant
_DEFAULT_CADENCE_DAYS=[0, 4, 7, 7, 7, 7]incampaign_graph.py. - Bounds — defines the gap in days between consecutive touches; index 0 is unused, so touch 1 waits 4 days, touch 2 waits 7, etc.
- Effect — shortening the gaps (e.g., changing 7 to 3) compresses the campaign timeline, delivering more touches per unit of wall time but increasing daily send volume and thereby potential spam complaints. Lengthening gaps spreads out costs, reduces daily rate, and respects slower decision cycles but risks the contact losing interest.
- Risk — too‑tight cadence triggers daily/per‑domain caps (enforced by the send‑safety stack) and may cause the campaign to stall or be blocked; too‑loose cadence may cause the thread to never reach the final touch before the opportunity closes.
LANGSMITH_TRACING
- Knob — environment variable
LANGSMITH_TRACING; default not shown but presence implies on/off. - Bounds — when
true, every LLM call and graph run within the campaign and follow‑up graphs is reported to LangSmith for observability. - Effect — enabling tracing adds the cost of LangSmith ingestion (money per trace) and a small latency overhead on each LLM call (transmitting trace data). Disabling it removes these costs but forfeits debugging visibility, failure annotations, and the ability to analyze prompt performance.
- Risk — leaving it on in production can accumulate unbilled LangSmith charges if usage spikes; turning it off blinds the team to regressions hidden in the LLM calls inside
compose_touchorderive_followup_point.
concurrency (country_classify_bulk)
- Knob — input parameter
concurrencyto thecountry_classify_bulkgraph; seen as"concurrency": 8in therun_country_classify_nightlycron. - Bounds — controls the number of parallel LLM or D1 operations that the bulk classification graph fans out internally under an
asyncio.Semaphore. - Effect — a higher concurrency reduces the wall‑clock time required to classify thousands of rows (time saved), but drives higher peak D1 read/write load and more concurrent LLM calls, raising the per‑run service cost (tokens and query units). A lower concurrency spreads the work over a longer period, smoothing resource usage.
- Risk — too high a concurrency may exceed the D1 database’s connection or throughput limits, causing errors or throttling; too low may cause the nightly classification job to overrun its scheduled window, leaving rows unclassified for another 24 hours.
D1 query failure for due campaign threads
- Trigger: D1 outage or transient error when
run_campaign_resume_dueexecutesd1_all("SELECT id, campaign_id, contact_id FROM campaign_threads WHERE status = 'waiting' ..."). - Guard: The
except Exception as excblock that callslog.exception("campaign-resume-due: due query failed")and returns{"ok": False, "error": f"{type(exc).__name__}: {exc}"}. - Posture: fail-soft — the cron tick for this job degrades; no threads are resumed, and an error dict is returned.
- Operator signal: The log line
"campaign-resume-due: due query failed"at exception level, plus theerrorfield in the cron response. - Recovery: No automatic retry within this tick. The next 5‑minute cron tick will re‑query and resume any still‑due threads. Manual investigation if failures persist.
Claim-row UPDATE failure for a single thread
- Trigger: Inside the loop over due rows,
d1_run("UPDATE campaign_threads SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=? AND status='waiting'")fails (e.g., deadlock, D1 timeout). - Guard: The inner
try:block containing this UPDATE; theexcept Exceptionappends the error to theerrorslist. - Posture: fail-soft for that specific thread — the loop continues, and the failed row remains in
waitingstatus. - Operator signal: The error is appended to
errors(captured but not logged individually unless the cron response is inspected). - Recovery: The same row is picked up by the next cron tick (its
wake_atis still due). If the UPDATE partially succeeded (status changed but subsequent graph resume failed), the thread would be stuck inrunningand require manual re‑claim.
Eligibility check D1 failure (campaign graph)
- Trigger: During campaign thread execution, the contact eligibility query (
SELECT email, outreach_eligible, do_not_contact FROM contacts WHERE id = ?) raises an exception (e.g., D1 timeout). - Guard: The
except Exception as exc:clause that callslog.warning("campaign eligibility check failed contact_id=%s (fail-open): %s", contact_id, exc)and returnsTrue. - Posture: fail‑soft (fail‑open) — the contact is treated as eligible, allowing the send to proceed to human approval.
- Operator signal: A warning log line containing
"campaign eligibility check failed". - Recovery: No automatic fix; human review is the ultimate gate. The next touch for the same contact re‑runs the check.
Counter bump failure during send
- Trigger: After a successful send,
_bump_campaign_countsexecutesd1_run("UPDATE email_campaigns SET emails_sent = emails_sent + ? ...")and the D1 call fails. - Guard: The
except Exception as exc:clause that logslog.warning("campaign _bump_campaign_counts failed: %s", exc). - Posture: fail‑soft — the send is not rolled back, but campaign counters (
emails_sent,emails_failed,emails_scheduled) are stale. - Operator signal: A warning log line containing
"campaign _bump_campaign_counts failed". - Recovery: No automatic correction; counters will be fixed only by a later successful bump or manual update. The campaign card may show incorrect statistics.
Final reconciliation failure after terminal threads
- Trigger: When all threads in a campaign reach a terminal state,
_reconcile_campaign_after_terminalruns ad1_allord1_runto zeroemails_scheduledand set the campaign status, and the D1 call fails. - Guard: The
except Exception as exc:clause that logslog.warning("campaign reconcile-after-terminal failed cid=%s: %s", campaign_id, exc). - Posture: fail‑soft — the campaign remains in
status='running'with staleemails_scheduledcount. - Operator signal: A warning log line containing
"campaign reconcile-after-terminal failed". - Recovery: No automatic retry (the threads are already terminal, so no future tick will re‑attempt this campaign). Manual intervention is required to update the campaign card.
Safety gate best‑effort skip (follow‑up graph)
- Trigger: In the
safety_gatenode of the follow‑up graph, a D1 query fordo_not_contact, bounce suppression, or reply history fails (e.g., D1 outage). - Guard: The design intent documented in the module: “All DB reads are best‑effort — a D1 outage degrades to a context‑free skip rather than failing the run.” No explicit exception handler is shown; the graph proceeds to skip the contact.
- Posture: fail‑soft — the contact is not drafted, and the graph produces a
skip_reason(likely set to a D1‑error indicator). - Operator signal: A log line (not shown in source, but implied) indicating the skip; the graph result will contain a non‑null
skip_reason. - Recovery: The contact is reconsidered on the next cron cycle or manual rerun. After D1 recovers, the full history load and follow‑up point derivation will re‑execute.
Q – The campaign engine uses a “draft-first” approach: it generates an email and then pauses for human approval via an interrupt. Why not generate and send directly in the same node?
A – The compose_touch node generates the draft by invoking the email_outreach_graph, stamps the run id on the held draft, and then checkpoints. A separate await_approval interrupt (LangGraph’s interrupt() with kind "approval") pauses execution so the operator can review exactly the draft they saw. On resume, the node re‑runs from the checkpoint and the draft is not regenerated—this guarantees the operator approves the same content they reviewed, preventing a regeneration race.
Follow-up – How does the system guarantee that the draft an operator approves is the same one that is eventually sent, even if the operator takes hours to review?
A – The compose_touch node completes and the state checkpoints before the interrupt(); the draft is persisted in the campaign_threads row with a run id. The await_approval interrupt only resumes when the operator calls approveCampaignDraft, at which point the graph continues from the checkpoint without re‑entering the generation node.
Weak answer misses – Shallow answers often omit the checkpointing mechanism in LangGraph and the fact that compose_touch is not re‑run on resume—the thread‑upsert with ON CONFLICT … DO UPDATE (in _upsert_thread) makes the operation idempotent.
Q – The cadence between touches is implemented as an interrupt() inside schedule_next, and the cron resumes the thread days later. Why use a blocking interrupt instead of a simple scheduled task that fires the next touch?
A – The schedule_next function writes a waiting status row with a wake_at timestamp (via _upsert_thread with critical=True) and then calls interrupt({"kind": "cadence", "wake_at": ..., "next_step": ...}). The cron polls only status='waiting' rows whose wake_at has passed, then resumes the exact durable thread. This ties the full state (sequence_step, status, generated draft) to the thread, not to a stateless scheduler; if the cron were missing, the thread stays paused, making the restart point explicit and auditable.
Follow-up – What happens if the cron fails to resume after the interrupt?
A – The thread remains in status='waiting' with a wake_at in the past. On the next cron cycle, the SELECT … WHERE status='waiting' AND wake_at < now() query will find it and resume execution from the same checkpoint. No state is lost because the checkpoint is persisted in LangGraph’s store.
Weak answer misses – A shallow answer might say “the cron wakes the thread” without explaining that the interrupt() is a durable checkpoint and that the thread upsert uses critical=True to re‑raise on failure so the thread never stalls silently.
Q – The campaign engine has a separate eligibility gate (_is_campaign_eligible) that is described as “allows inconclusive/NULL through; blocks only definitive negatives,” while the strict autonomous gate in email_orchestrator_graph (V135) is tighter. Why have two different gates?
A – Because the campaign graph is draft‑first and human‑approved, false positives are acceptable—the operator can reject a draft. The _is_campaign_eligible function (called in compose_touch before generation) only blocks contacts that are definitively ineligible (e.g., hard bounced, explicitly opted out). The strict V135 gate used in the autonomous pipeline must be conservative to avoid sending a bad email without review. Using a permissive gate here reduces false skips for the campaign sequence.
Follow-up – How does this gate differ from the safety_gate in the follow‑up graph (email_followup_graph.py)?
A – The safety_gate in the follow‑up graph checks for suppression, do_not_contact, bounce, or a reply and can skip the entire follow‑up. The campaign gate is campaign‑scoped and only checks eligibility—it does not check for reply (that is handled elsewhere by the stop‑on‑reply mechanism) and lets inconclusive results through.
Weak answer misses – A shallow answer might say “they are the same gate” but misses the key design intent: the campaign gate is permissive because human approval catches false positives, while the autonomous gate must be strict to prevent unsafe sends.
Q – The rejected alternative to this durable‑thread design is a batch‑run sweep that discovers and drafts for a fresh crowd in a single pass. Why does the system choose interrupt‑based state machines over that simpler approach?
A – A batch sweep cannot nurture slow relationships because it produces one‑shot drafts on a fixed schedule with no memory of past touches. The campaign engine uses schedule_next with an interrupt() to pause for days, then resumes the same thread with the full history intact: sequence_step, cadence_days, and status are all checkpointed. This allows the next touch to be grounded on the previous exchange (via _build_post_text and the opportunity/resume context). The interrupt also allows human‑in‑the‑loop approval at each touch, which a batch sweep cannot do without massive re‑engineering.
Follow-up – How does the batch sweep’s lack of state affect its ability to handle a contact who replies mid‑sequence?
A – In the durable design, when a reply is detected (e.g., via a separate webhook or the record_outcome_feedback call that posts a "reply_outcome" with score 1.0), the thread status is changed to "completed" and further touches are skipped. A batch sweep would either keep sending unrelated drafts or require an external database to track reply state, duplicating effort.
Weak answer misses – A shallow answer focuses only on “it’s more scalable” but misses that the batch sweep cannot provide per‑touch grounding (no access to the conversation thread) and has no natural mechanism for human approval or cadence‐aware interruption.
System-design principles
5 principles the engine is built on
Draft-First, Human-Approved
Never send without a thumbs-up. The campaign engine prepares every touch, meaning each individual message to a prospect, but holds it as a pending draft and pauses for a person to approve, edit, reject, or skip before anything goes out. This makes the safe state the default: work is done up front, but the irreversible act of sending always waits on human judgment, because a wrong message to a real prospect cannot be unsent. The trade-off is that a campaign cannot run fully unattended, accepted on purpose because a human gate on each send is the very feature the engine exists to deliver, not a limitation to engineer away.
Durable By Default
Write down where you are so a restart cannot lose it. This engine saves each contact's sequence position and next scheduled time to a database after every step. That means even if the system redeploys or crashes, it picks up exactly where it left off because the stored record, not the live process, is the source of truth. The trade-off is that this checkpointing and resuming takes extra work compared to simply sleeping in memory, but it's worth it because a multi-week outreach sequence that disappears on restart would break real campaigns.
One Sender, One Writer
Let one hand do the sending so nothing goes twice. In the campaign engine, this means exactly one step is allowed to send an email. That same step also records that the send happened. No other step can send. By concentrating both the sending action and its record in a single place, we get one chokepoint to audit and one source of truth for how far a sequence has progressed. This prevents double-sends and missing records. The trade-off is a little rigidity in the sending path, but it becomes simple to reason about and hard to get wrong.
Cadence As Interrupts, Not Sleep
Sleep with an alarm instead of standing and waiting. In the campaign engine, between contacts with a recipient, the conversation thread parks itself. Parking means the thread records a wake time and then suspends its work, instead of staying active. An external clock fires every few minutes and wakes whichever threads are due to resume. No process stays alive during the multi-day gap. This lets thousands of slow conversations coexist for almost no cost, because none of them hold a worker while waiting. The tradeoff is the extra moving part of an outside scheduler, but this design is accepted because parking with an alarm scales far better than keeping a live timer running for every conversation in flight.
Stop The Moment They Reply
Stop talking the moment the other person replies. Before composing each touch, the engine checks for any inbound reply that arrived since the last send. If there is one, it ends the sequence immediately, sending no further automated follow-ups. This hands the conversation back to a human at exactly the right moment and avoids the worst outreach failure: a canned follow-up that ignores a reply the prospect just sent. The cost is essentially nothing: a small, constant check for new inbound before each touch, a cheap price for never talking over a real response and never looking like no one was listening.
Glossary — the domain terms, grounded in the code
16terms, each defined from this subsystem’s real source.
Durable-thread campaign engine
The durable-thread campaign engine uses one LangGraph thread per `(campaign, contact)` with a stable `campaign-<campaignId>-<contactId>` thread_id, compiled with the D1 checkpointer (`resumable=True`) so its state survives between touches—it sends a touch, schedules the next, then `interrupt()`s and exits, with a Cloudflare cron resuming the thread days later using `Command(resume=True)`.
Memory hook Durable-thread campaign engine: each campaign-contact thread hibernates after a touch, then cron resumes it days later.
LangGraph thread
A LangGraph thread is a persistent execution context identified by a stable `thread_id` (e.g., `campaign-<campaignId>-<contactId>`) that LangGraph's checkpointer writes checkpoints for, enabling state to survive between touches and resume via `Command(resume=True)` from a cron or UI interrupt.
Memory hook LangGraph thread: a labelled state snapshot — the stable `thread_id` lets cron or UI unpause exactly where you left off.
D1 checkpointer
The D1 checkpointer is the persistence layer for LangGraph threads in this subsystem, storing checkpoint state in Cloudflare D1 tables (`checkpoints` and `writes`) so that threads compiled with `resumable=True` survive between touches and can be resumed later via `Command(resume=True)` by either the UI or the Cloudflare cron.
Memory hook D1 checkpointer: a save point in D1 tables that lets a thread pause and resume with Command(resume=True).
CF cron
CF cron is the Cloudflare Workers cron trigger (specified in `triggers.crons` in `backend/core/wrangler.jsonc`) whose `scheduled()` handler posts a wake-up request to `POST /cron/tick`, which calls `run_campaign_resume_due` to resume any campaign threads whose `wake_at` has passed.
Memory hook CF cron rings the /cron/tick alarm to wake campaign threads when their wake_at is due.
run_campaign_resume_due
run_campaign_resume_due is an async function that queries campaign_threads for rows where status='waiting' and wake_at has passed, then sequentially resumes each thread with Command(resume=True) on the compiled campaign graph, handling terminal states and reconciling the campaign when all threads finish.
Memory hook run_campaign_resume_due sequentially wakes each campaign thread whose wake_at is due, one at a time.
interrupt(kind="approval")
interrupt(kind="approval") is a LangGraph interrupt in the await_approval node that pauses the campaign thread until an operator decides on the held draft, returning the decision supplied by approveCampaignDraft via Command(resume=…) and ensuring the node re-runs on resume without regenerating the draft.
Memory hook Approval interrupt pauses the draft, awaiting an operator's Command(resume) without regenerating the content.
interrupt(kind="cadence")
interrupt(kind="cadence") is a LangGraph interrupt call that pauses the durable thread (a "cadence sleep") by raising an __interrupt__ with kind="cadence"; the Cloudflare cron (workers/campaign-runner → POST /cron/tick → run_campaign_resume_due) resumes the exact thread once its wake_at time passes, only for threads with status="waiting".
Memory hook interrupt(kind='cadence') puts the thread to sleep, the cron's alarm clock rings at wake_at to resume it.
compose_touch
compose_touch is a node in the campaign graph that drafts a touch email using the email_outreach graph against a post_text built from the contact's opportunity and resume_context, and then routes to gate_draft for human review or directly to send_touch if auto_approve is enabled.
Memory hook compose_touch drafts the touch email from opportunity and resume, then routes to gate_draft or send_touch if auto_approve.
await_approval
await_approval is a pipeline node where execution pauses for a human operator’s decision on a drafted touch (approve, edit, reject, or skip), and upon resume, it processes that decision to either continue to send_touch, skip to schedule_next, or stop the campaign with a "draft_rejected" status.
Memory hook await_approval halts for a human verdict on a draft, then routes to send, skip, or reject.
send_touch
send_touch is a node in the campaign graph that dispatches the composed email after enforcing a per-vertical daily send cap and re-checking suppression and do_not_contact lists, and then routes to schedule_next unless the thread's status is terminal.
Memory hook Before a send_touch dares dispatch, it double-checks daily caps and do-not-contact lists, then routes to schedule_next.
schedule_next
schedule_next is a node in the campaign state graph that advances the sequence: if the step count reaches max_touches it sets status to "completed" and ends, otherwise it writes a "waiting" row to the D1 `campaign_threads` table with a `wake_at` timestamp and then calls `interrupt({"kind": "cadence", ...})` to pause the durable thread, and upon cron resume returns the updated step and "running" status, routing back to the "check_reply" node.
Memory hook Schedule_next sets an alarm clock for the next touch, then naps until the cron rings it back to check_reply.
check_reply
check_reply is an asynchronous node in the campaign graph that queries the database for any inbound emails from the contact after the last touch, and if found, sets the campaign status to "replied" and ends the sequence; it runs first after START, routing to compose_touch if no reply is detected or to END if the contact has replied.
Memory hook check_reply peeks for inbound replies first, halting the campaign on a hit.
auto_approve
auto_approve is a per-campaign boolean flag that, when true, routes compose_touch directly to send_touch, skipping await_approval and gate_draft, and replaces the default eligibility check with a strict fail-closed gate that requires outreach_eligible=1 and enforces per-vertical daily caps and suppression re-checks before sending.
Memory hook auto_approve: compose_touch bypasses human review, then send_touch runs a strict fail-closed gate with caps and suppression.
_is_campaign_eligible
_is_campaign_eligible is the eligibility gate for the human-approval campaign engine, used when auto_approve is false; it blocks on definitive negatives (do_not_contact=1, missing email, outreach_eligible=0) but allows NULL (inconclusive) through and fails open on query errors because every touch is held for explicit human approval before sending.
Memory hook _is_campaign_eligible: human-approval gate; NULL passes, only definitive no blocks, errors fail open.
_is_eligible
_is_eligible is a function imported from graphs.pipeline_graph that implements a strict V135 eligibility gate for the autonomous pipeline: it only allows contacts with outreach_eligible=1, and fails closed on NULL or error, because no human reviews the touch before it sends.
Memory hook _is_eligible is the strict bouncer for autonomous pipeline: only green-lit 1's get through, NULLs are locked out.
_upsert_thread
_upsert_thread is an async function that writes the thread's scheduling state into the D1 ``campaign_threads`` index so the cron's ``WHERE status='waiting' AND wake_at<=now`` query can find it and so the UI can render a held draft, and when called with ``critical=True`` it re-raises on failure to prevent the thread from stalling silently.
Memory hook Upsert mirrors thread state into D1 for cron and UI; critical=True re-raises to prevent silent stall.