01. What Discovery Is
Discovery is the very first stage of the lead pipeline. It scans real hiring sources for companies that are actively building teams. Those sources are applicant tracking boards, startup launch feeds, and a large web crawl. A seed query can steer the search toward a niche when you supply one. But the system never trusts a name a language model simply invented. Any candidate that came only from brainstorming is dropped before it can reach the database. That write boundary is what keeps the stored records honest.
Discovery sits at the top of the funnel, so everything later depends on it. Each candidate must show a genuine hiring signal, such as a minimum number of open artificial intelligence roles. A company that fails the check is quietly skipped. The trade-off is deliberate. We accept fewer leads in exchange for trusting every one of them. A short list of real companies beats a long list of guesses, because every downstream step rests on facts rather than fiction.
Persisting seed-query discovery candidates with deduplication, confidence, and evidence.
async def _persist_seed_candidates(state: DiscoveryState) -> tuple[list[int], list[int], int]:
scored = state.get("scored") or []
if not scored:
return [], [], 0
vertical = state.get("vertical")
geography = state.get("geography")
profile_tags = _profile(state).tags
inserted_ids: list[int] = []
existing_ids: list[int] = []
now = datetime.now(timezone.utc).isoformat()
for c in scored:
tags_list = ["discovery-candidate"]
if vertical: tags_list.append(vertical)
if geography: tags_list.append(geography)
# … language/market tag handling …
tags_list.extend(profile_tags)
key = _slugify(c["domain"])[:200]
meta = await d1_run(
"""INSERT INTO companies (key,name,canonical_domain,website,category,tier,
tags,score,score_reasons,created_at,updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT (key) DO NOTHING""",
[key, c["name"], c["domain"], f"https://{c['domain']}", "UNKNOWN", 0,
json.dumps(tags_list), c["pre_score"],
json.dumps([c["why"], f"confidence:{float(c.get('confidence') or 0.0):.2f}",
f"evidence:{str(c.get('evidence') or '').strip()[:200]}",
"source:seed_query"]),
now, now],
)
if int(meta.get("changes") or 0) >= 1:
new_id = int(meta.get("last_row_id") or 0)
if new_id: inserted_ids.append(new_id)
continue
existing = await d1_one("SELECT id FROM companies WHERE key = ?", [key])
if existing: existing_ids.append(int(existing["id"]))
return inserted_ids, existing_ids, 0
Picture a scout who visits different farmers’ markets—applicant tracking boards, startup launch feeds, and a huge web crawl—to find real farms that are actively growing crops (companies that are building teams). He carries a shopping list (a seed query) if you want to aim at a specific niche, but he never buys produce that a storyteller simply claimed existed; any candidate that came only from imagination gets thrown out before it reaches the cart. This subsystem is the very first stage of the lead pipeline, built to find real companies from honest sources and nothing else.
The scout works step by step. He first scans boards where employers post jobs (ATS sources like commoncrawl), then browses recent startup announcements (launchfeed, the default source), and also pulls from the Common Crawl archive of the web. If you give him a seed query, he runs it through a function called _brainstorm_direction that asks DeepSeek to suggest real companies—this path is marked as a sanctioned SEED_SOURCE and is allowed to persist. But there's a separate, older channel called brainstorm that fabricates candidates rather than finding them. The code keeps them apart using a frozenset named _SYNTHETIC_SOURCES that holds only "brainstorm". Any candidate that emerged from that fake channel is hard-stripped in the discover step before it ever reaches the database.
The trickiest part is why even the seed‑query path, which also asks an LLM for suggestions, is considered honest. The key edge is that the seed_query source is not in _SYNTHETIC_SOURCES—it expands from a real user-provided query and is gated differently. Meanwhile, the legacy brainstorm channel is excluded by a write boundary: the _SYNTHETIC_SOURCES set ensures that no invented company can slip through. Without this subsystem, the pipeline would eagerly store fabricated companies, filling the database with names that don't exist and causing every later step—outreach, scoring, everything—to waste time chasing ghosts. That’s the concrete failure a beginner would feel: sending emails to businesses that never were.
-
plan_targetsis called with the incomingDiscoveryState; it resolves the list of active directions viamicro_verticals.resolve_directions(state.get("directions"))and, unless the source set is limited to seed‑query‑only (the_ATS_PATH_SOURCEScheck), reads and advances the per‑direction sub‑niche cursor from a D1 cursor table, returning a_direction_sub_nichesdict mapping each direction to the chosen sub‑niche string.- reads:
state.get("directions"),state.get("sub_niche"),state.get("sources") - writes:
{"_direction_sub_niches": dict[str, str]} - branch: if the active sources are not ATS‑path sources (i.e., only
seed_queryis present), it short‑circuits and returns an empty_direction_sub_niches— the rotation logic is skipped.
- reads:
-
discoveris invoked with the state that now includes_direction_sub_niches; it first calls_resolve_sources(state)to determine the active data channels and, if any synthetic source (likebrainstorm) was explicitly passed, logs a warning and strips it from the channel list.- reads:
state.get("sources"),state.get("directions"),state.get("concurrency"),state.get("per_host"),state.get("cc_warc"),state.get("cc_max") - writes: internal
sourcesset (list of channels), sets up aHostLimiterandhttpx.AsyncClient - branch: synthetic sources are silently dropped; only real‑data channels move forward.
- reads:
-
discoverresets the list of active directions by callingmicro_verticals.resolve_directions(state.get("directions"))and, for each direction, prepares to run Wave 1 — the LLM brainstorm for that micro‑vertical. The per‑directionexclude_domainsfrom the registry (not shown fully in snippets) is merged with the run‑levelstate.get("exclude_domains").- reads:
state.get("exclude_domains"), merged with registry per‑vertical exclude list - writes: no return yet; the merged exclude list is used per direction
- reads:
-
_brainstorm_direction(mv, exclude_domains=..., sub_niche=...)is called for eachMicroVerticalobjectmvwith the merged exclude list and the sub‑niche string from_direction_sub_niches(if any). It constructs the brainstorm payload withseed_queryset tof"micro-vertical:{mv.vertical}",vertical,keywordsfrommv.keyword_signals,geographyhard‑coded to"remote", and theexclude_domainsandsub_niche.- reads:
mv.vertical,mv.keyword_signals,mv.sub_niches(via earlier cursor),exclude_domainsparameter - writes: returns a list of candidate dicts with keys
name,domain,why,why_in_vertical,confidence,evidence,vertical
- reads:
-
Inside
_brainstorm_direction, the actual LLM call goes to the externalbrainstormfunction; a failed call (exception) logs a warning and returns an empty list.- branch: on any exception from
brainstorm, the function returns[]— the entire direction yields no candidates.
- branch: on any exception from
-
For each candidate returned by
brainstorm,_brainstorm_directioncanonicalises thedomainviablocklist.canonicalize_domainand discards any candidate whose domain is empty or lacks a dot; surviving candidates are appended to the result list with the fields renamed to match the internal format (e.g.,why_in_verticalis populated from eitherwhy_in_verticalorwhyin the raw response).- reads: raw candidate fields
domain,name,why_in_vertical,why,confidence,evidence - writes: result list of cleaned dicts
- branch: a candidate with an invalid domain is silently dropped.
- reads: raw candidate fields
-
Back in
discover, the output of all_brainstorm_directioncalls for the active directions is collected into the localbrainstormlist (Wave 1 result).- writes: local
brainstormlist (list of dicts)
- writes: local
-
After all wave 1 calls complete,
discovercontinues with subsequent waves (Common Crawl, launchfeed — not detailed in this part), then calls_persist_seed_candidates(state)which writes the deduplicated candidates to thecompaniestable.- reads:
state(includingbrainstormfield) - writes:
seed_inserted_ids,_seed_existing,seed_blocked - branch: if
_persist_seed_candidatesraises aD1Error,discoverreturns{"_error": str(e)}immediately instead of the normal result.
- reads:
-
In the normal path, the
_persist_seed_candidatesstep grounds the write: it refuses LLM‑invented (synthetic) rows — only real‑data sources (launchfeed, Common Crawl) are allowed; the brainstorm channel (synthetic) is stripped, meaning thebrainstormlist from Wave 1 is not persisted. (This is the guard described in the “Grounding guard” comment.) -
discoverreturns the final dict containing theseed_inserted_ids,_seed_existing,seed_blocked, and anagent_timingsmap; the terminal step is the return of that dict to the graph runner.- reads: internal results from persistence
- writes: final return dict with
_error(on failure) or normal keys.
Control flow summary: The request loops (fans out) over the list of directions in step 4, each direction being processed independently via _brainstorm_direction. The happy path proceeds through all branches that yield valid candidates, the deduplication (via exclude_domains in the brainstorm payload) happening per‑direction and later at persistence. Synthetic brainstorm results are ultimately stripped before writing to the company database.
The discovery subsystem begins with plan_targets, which resolves the active micro-verticals and, for each, reads and advances a per-vertical sub‑niche cursor stored in the discovery_cursors table. That cursor selects a single sub‑niche from the vertical’s sub_niches tuple, rotating the framing on each tick to force the LLM to explore fresh pockets of the vertical. After that, per direction, _brainstorm_direction is called with a seed query, keyword signals, geography, and the current sub_niche plus an exclude_domains list built from already-known companies. It returns a list of candidate dictionaries containing name, domain, why_in_vertical, confidence, and evidence. Those candidates are then handed to _persist_seed_candidates. If that D1 operation raises a D1Error, the node immediately returns a dict with an "_error" key holding the string representation of the exception, along with agent_timings; otherwise the normal state is returned and the pipeline continues.
The central invariant is the “Grounding guard” — a design rule that the write boundary (the companies table) must refuse LLM‑invented (synthetic) rows. Only real‑data sources such as launchfeed and CommonCrawl pass through untouched; the brainstorm channel is the sole synthetic path and its output must never directly enter the table without additional verification or a separate staging layer. This guarantees that scoring and outreach decisions always operate on validated, non‑hallucinated company records, even though the brainstorm is used to generate candidate suggestions.
The key trade‑off is the choice of one parameterised graph over several separate workers. The obvious alternative would be a dedicated worker per micro‑vertical, each with its own logic. Instead, the entire discovery flow is driven by a frozen configuration table (MicroVertical dataclasses) that differs only in seed query, keyword signals, and sub‑niche list. This eliminates per‑vertical code duplication and the maintenance cost of keeping multiple near‑identical workers in sync. The rejected alternative would also introduce deployment overhead (more workers to monitor, more DDL schemas) and amplify the blast radius of any bug—a single parameterised graph can be fixed in one place.
A concrete failure mode occurs when _persist_seed_candidates encounters a database constraint violation (e.g., duplicate domain or invalid vertical tag) and raises a D1Error. In that case the node returns {"_error": str(exc), "agent_timings": ...}. An operator monitoring the discovery logs would see a warning line prefixed with the direction name and the textual error message from the D1 runtime—no further candidate insertion is attempted for that tick, and the state proceeds without the seed candidates. The pipeline does not retry; it moves on to the next direction or ends, relying on the next scheduled tick to attempt those candidates again (if they survive deduplication).
Concurrency
- Knob —
concurrencyinput (overridesDEFAULT_CONCURRENCY = 6). - Bounds — Global parallelism cap, shared across all hosts and channels.
- Effect — Higher values lower wall-clock time by running more fetches in parallel, but increase load on external sources and the local connection pool.
- Risk — Too high triggers rate‑limiting, timeouts, or connection exhaustion; too low serializes work, raising latency.
Per‑host limits
- Knob —
per_hostinput (overridesDEFAULT_PER_HOST = {"commoncrawl": 6, "llm": 4, "d1": 6, "launchfeed": 2}). - Bounds – Per‑source fan‑out; prevents one throttled host from starving others or causing retry storms.
- Effect — Raising a host’s cap allows it to absorb more concurrent requests, reducing that source’s share of wall‑clock time; lowering the cap conserves its capacity but delays its completion.
- Risk — Too high on a rate‑limited host (e.g. Common Crawl) causes retry storms and wasted time; too low serializes that channel, increasing overall latency.
Common Crawl max domains
- Knob —
cc_maxinput (overrides constantCC_MAX_DOMAINS = 40). - Bounds — Number of CDX lookups and page extractions per run.
- Effect — Higher values harvest more candidate domains from CC, increasing both discovery breadth and runtime; lower values reduce bandwidth and processing cost.
- Risk — Too high may hit CC quotas or slow the run; too low misses potential leads.
Sources
- Knob —
sourcesinput (overridesDEFAULT_SOURCES = ("launchfeed",);brainstormis always stripped by_resolve_sources). - Bounds — Which discovery channels are enabled:
launchfeed,commoncrawl, and optionallyseed_query(but notbrainstorm). - Effect — Adding channels increases the candidate pool and runtime proportionally; removing channels reduces data volume and cost.
- Risk — Enabling
brainstormis futile (synthetic data is stripped at persist), wasting cycles; excluding real channels (e.g.launchfeed) may starve the pipeline of recent signals.
LLM API Exception
- Trigger — Network timeout, rate limit exhaustion, or service unavailability during the call to
brainstorm. - Guard — The
except Exception as excclause in_brainstorm_direction. - Posture — Fail-soft: the function returns
[]and does not propagate the exception upward. - Operator signal — A warning log line:
log.warning("brainstorm failed direction=%s: %r", mv.vertical, exc). - Recovery — No retry or backoff is implemented; the direction produces zero candidates, and the discovery graph continues with other directions if any.
LLM Returns Empty or Missing Candidates
- Trigger — The
brainstormcall succeeds (no exception) but the returned dictionary either lacks acandidateskey, or its value isNoneor an empty list. - Guard —
out.get("candidates") or []— no exception handler is engaged; the code silently treats the absence as an empty list. - Posture — Fail-soft: degrades without any log or error signal.
- Operator signal — No log line is emitted; the operator would see only that the direction yielded no candidate records.
- Recovery — The function returns
[]; downstream steps (dedupe, scoring) receive nothing for this direction.
Candidate Lacking a Valid Domain
- Trigger — A candidate dict contains no
domainfield, an empty string, or a string without a.character (e.g."example"). - Guard — The inline check
if not dom or "." not in dom: continueafter callingblocklist.canonicalize_domain(...). - Posture — Fail-soft: the invalid candidate is silently skipped; the loop proceeds to the next candidate.
- Operator signal — No log or metric; the candidate is simply absent from the returned list.
- Recovery — No retry; the remaining candidates are processed and returned.
Missing exclude_domains Input Leading to Duplicate Suggestions
- Trigger — The caller does not supply an
exclude_domainsargument (or supplies an empty list) while some of the potential candidates already exist in the database. - Guard — No guard exists inside
_brainstorm_direction; theexclude_domainsparameter defaults to[]whenNone. The later persist step for the seed‑query path usesON CONFLICT DO NOTHING(in_persist_seed_candidates), but the brainstorm channel itself is not persisted. - Posture — Fail-soft for the brainstorm channel: duplicates are returned but are stripped later. For the seed‑query path, duplicates are silently ignored at insert time.
- Operator signal — No immediate signal; the operator may notice that fewer new companies are inserted than expected.
- Recovery — No retry; the duplicate companies are not persisted, and the discovery continues normally.
Blocklist Canonicalization Exception (e.g. Malformed Input)
- Trigger —
blocklist.canonicalize_domainraises an exception (unexpected) while processing a candidate’s domain string. - Guard — The outer
except Exception as exccatches any exception from inside thefor c in candsloop, including this one. - Posture — Fail-soft for the entire direction: the function returns
[]and logs a warning. - Operator signal — Same warning log:
log.warning("brainstorm failed direction=%s: %r", mv.vertical, exc). - Recovery — No retry; all candidates for that direction are discarded, and the graph continues with other directions.
Q — Walk me through the two different discovery channels that involve an LLM. How does the system decide which one to use and what makes each one distinct?
A — The sanctioned, persist‑eligible path is the seed_query channel, identified by the constant SEED_SOURCE = "seed_query". It runs through expand_seed → brainstorm → dedupe → pre_score and is driven by the lead‑gen pipeline via {seed_query, industry}. In contrast, the legacy brainstorm channel is listed in _SYNTHETIC_SOURCES = frozenset({"brainstorm"}) and is hard‑stripped in the discover function so it cannot be re‑enabled per run — it is treated as synthetic data that invents companies.
Follow-up — How does the code guarantee that the synthetic brainstorm channel never leaks into the output?
Answer — It is explicitly stripped in the discover function and the persist logic; the module docstring states it is “hard‑stripped in discover” and the _SYNTHETIC_SOURCES frozenset is used to filter it out.
Weak answer misses — The SEED_SOURCE constant and the _SYNTHETIC_SOURCES frozenset, as well as the “hard‑stripped” comment in the source.
Q — The AI brainstorm is supposed to return up to twenty candidates, but the same vertical is called over multiple ticks. How does the system avoid mining the same well‑known companies each time?
A — The plan_targets function uses a per‑vertical sub_niche cursor (_read_and_advance_cursor) to rotate through MicroVertical.sub_niches one per tick. The chosen sub_niche is stored in direction_sub_niches[direction] and passed to _brainstorm_direction as the sub_niche parameter, which changes the framing so the LLM explores a fresh pocket. If the caller already pinned a sub_niche, the cursor logic is skipped.
Follow-up — What happens if a MicroVertical has no sub_niches defined?
Answer — The plan_targets function checks if not niches: continue, so that direction gets no sub_niche, and the brainstorm uses only the seed_query/keyword_signals framing.
Weak answer misses — The exact cursor mechanics (_read_and_advance_cursor, direction_sub_niches dict) and the sub_niches field on MicroVertical.
Q (design question) — Why does the code treat the seed_query brainstorm as a non‑synthetic, persist‑eligible source, while the legacy brainstorm channel is classified as synthetic and forbidden? They both call an LLM.
A — The seed_query channel is anchored to a real user query and passes through expand_seed (LLM facet extraction) before the brainstorm; it is explicitly called “the sanctioned, persist‑eligible company‑discovery brainstorm path” (docstring of SEED_SOURCE). The legacy brainstorm channel, however, has no seed query from a user — it is a generic synthetic micro‑vertical generator and is therefore listed in _SYNTHETIC_SOURCES and hard‑stripped so it “cannot be re‑enabled per run” (comment in the module).
Follow-up — How does the _resolve_sources function distinguish them?
Answer — _resolve_sources returns a set of active sources; the code then checks whether SEED_SOURCE is in that set to run expand_seed, while the brainstorm source is always filtered out by _SYNTHETIC_SOURCES.
Weak answer misses — The existence of _SYNTHETIC_SOURCES as a frozenset and the module‑level comment about the brainstorm channel being “hard‑stripped”.
Q — The user‑facing description says the AI returns a confidence score. Where in the source is that score actually captured and passed forward?
A — In the _brainstorm_direction function, after calling the brainstorm LLM, each candidate dict is built with a "confidence" key taken directly from the LLM output field c.get("confidence"). That dict is appended to the result list and eventually stored as part of the discovery state.
Follow-up — What happens if the LLM call raises an exception during _brainstorm_direction?
Answer — The function catches all exceptions with except Exception as exc and returns an empty list [], failing soft so the whole discovery wave isn’t blocked.
Weak answer misses — The exact "confidence" key assignment and the fail‑soft except block in _brainstorm_direction.
Q (tough) — When multiple real sources (commoncrawl, launchfeed) are active, how does the system control concurrency to avoid overwhelming hosts or starving slower channels?
A — A shared httpx.AsyncClient is injected into every fetcher, and a HostLimiter gates the fan‑out with a global cap (DEFAULT_CONCURRENCY = 6) plus per‑host caps defined in DEFAULT_PER_HOST (e.g., "commoncrawl": 6, "launchfeed": 2). Per‑host caps “stop one throttling host from causing retry storms or starving the others.” All caps are overridable via the run input.
Follow-up — Why is the launchfeed cap set lower than commoncrawl’s?
Answer — The comment says launchfeed caps are 2 (while commoncrawl is 6), likely because launchfeed endpoints are more sensitive to load, but that specific rationale is not further explained in the snippet.
Weak answer misses — The HostLimiter class (implicit), DEFAULT_CONCURRENCY, and DEFAULT_PER_HOST dictionary with per‑host values.