LangGraph — Deep Dive
🎧 19 min listen · 12 chapters
Go deeper
Agents with LangGraph— the agent loop, tools, state & memory · Autonomous agents — durable execution, guardrails, multi-agent systems · Lead-gen in production — 60+ StateGraphs, fan-out, circuit breakers
🎧 Complete audio guide — all three LangGraph pages in one narration
What is LangGraph?
LangGraph is a framework for building stateful, multi-step agentic workflows. It solves two fundamental problems in production LLM applications:
-
State management across steps — agentic systems need to maintain conversation history, intermediate computation results, and branching state. LangGraph provides a first-class state abstraction with reducers (e.g.
add_messages) that dictate how updates merge. -
Durable execution with control flow — long-running or human-in-the-loop workflows must survive process crashes, support pauses/resumes, and allow time-travel debugging. LangGraph persists every step via checkpoints and exposes primitives like
interrupt()andCommandfor explicit suspension and resumption.
The runtime, Pregel, is an actor–channel system: actors (nodes) read from and write to channels (state keys). The framework handles scheduling, concurrency, and checkpointing automatically.
Core Concepts and Primary APIs
StateGraph and MessageGraph
| Class | Purpose |
|---|---|
langgraph.graph.state.StateGraph(StateT, ContextT, InputT, OutputT) | A graph whose nodes communicate by reading/writing to a shared state. Each node receives the full state and returns a partial state update. |
langgraph.graph.message.MessageGraph | A shortcut StateGraph where the entire state is a single list of messages (append‑only by default). |
State reducers are Annotated hints on the state dict. The most common is add_messages:
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
add_messages merges two lists, updating existing messages by ID (append‑only unless the ID matches).
Nodes
Nodes are functions (or runnables) that take the current state and return a dict of updates. Added via:
graph.add_node(node, action, *, defer=False, metadata, input_schema, retry_policy, cache_policy, error_handler, destinations, timeout)
node: a name (str) or the function itself.action: the callable (ifnodeis a string).defer: ifTrue, execution is postponed until the run is about to end.input_schema: override the default state schema for this node.retry_policy,timeout,error_handler,cache_policy: per‑node fault‑tolerance.
Edges
Three types of edges:
-
Direct edge —
add_edge(start_key, end_key).
Waits forstart_key(or all nodes in a list) to finish before executingend_key. -
Conditional edge —
add_conditional_edges(source, path, path_map).
Aftersourcefinishes,path(a callable or runnable) returns the next node name(s).path_mapmaps hashable return values to node names. -
Send API —
langgraph.types.Send(node, state).
Used inside conditional edge paths to dynamically invoke a node with a custom state (e.g., for map‑reduce). The sent state can differ from the core graph state.
Command
langgraph.types.Command bundles state updates with control flow “hops”.
Command(graph=None, update=None, resume=..., ...)
graph:Nonefor current graph,Command.PARENTfor the closest parent.update: dict of state updates.resume: value to continue from after aninterrupt.
Commands can be returned from nodes to apply updates and direct which node runs next.
interrupt
langgraph.types.interrupt(value: Any) -> Any
Pauses execution and surfaces value to the caller. The run saves a checkpoint and waits. To resume, call invoke / stream with a Command(resume=...).
Requires a checkpointer on the compiled graph.
Pregel Runtime
langgraph.pregel.main.Pregel manages execution:
| Method | Description |
|---|---|
invoke(input, config, *, context, stream_mode, ...) | Synchronous single‑input run. |
ainvoke(...) | Async variant. |
stream(input, config, *, stream_mode, ...) | Synchronous streaming (iterator of dicts). |
astream(...) | Async streaming. |
stream_events(input, config, *, version, ...) | Event streaming (v1/v2 → StreamEvent dicts; v3 → GraphRunStream object with typed projections). |
Key parameters:
stream_mode:"values"(full state),"updates"(node deltas),"messages"(token‑level),"custom","tasks","checkpoints","debug".version: forstream_events,"v3"enables the new typed‑projection API.interrupt_before/interrupt_after: node names (orAll) where execution pauses.durability: controls checkpoint saves.control:RunControlobject for throttling, max steps, etc.output_keys: filter which keys appear in the output.subgraphs: ifTrue, emit subgraph events.
Functional API
An alternative, more Pythonic way to define workflows using @entrypoint and @task.
from langgraph.func import entrypoint, task
@task
def fetch_data(url: str) -> str: ...
@entrypoint(checkpointer=...)
def my_workflow(input: dict) -> dict:
data = fetch_data(input["url"])
return {"result": data}
entrypointwraps a function that runs as the graph root. It can return afinal(value, save=...)to decouple the return value from what is saved to the checkpoint.taskdefines a sub‑computation that can be called from within an entrypoint. Tasks are automatically tracked for persistence.- Both support
retry_policy,cache_policy,timeout.
Streaming / Event Streaming (v3)
The new event streaming API (stream_events(version="v3")) returns a GraphRunStream (sync) or AsyncGraphRunStream (async). Consumers iterate over typed projections:
stream = graph.stream_events(input, version="v3")
for message in stream.messages: # AsyncChatModelStream objects
for token in message.text:
print(token)
for value in stream.values: # state snapshots
...
Transformers are the building blocks of projections:
MessagesTransformer— captures message events.ValuesTransformer— captures state snapshots (run.values).LifecycleTransformer— surfaces subgraph lifecycle events.SubgraphTransformer— discovers subgraph invocations.TasksTransformer— captures raw task events.
Non‑native transformers are registered with a custom: prefix in the main event log. StreamMux routes events through transformers in registration order.
Runtime Utilities
| Utility | Description |
|---|---|
langgraph.runtime.Runtime | Injected into nodes, provides context, store, stream_writer, previous, execution_info. |
langgraph.config.get_store() | Get BaseStore inside a node (for long‑term memory). |
langgraph.config.get_stream_writer() | Get StreamWriter to emit custom streaming data. |
Main Workflows and How the Pieces Fit Together
Graph API Flow (Typical)
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
topic: str
def node_a(state: State) -> dict:
# return partial update
return {"messages": [("assistant", "Hello from A")]}
# Build graph
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge("__start__", "a")
builder.add_edge("a", "__end__")
# Compile with checkpointing
from langgraph.checkpoint.memory import InMemorySaver
graph = builder.compile(checkpointer=InMemorySaver())
# Run
for chunk in graph.stream({"messages": [("user", "hi")]}, stream_mode="values"):
print(chunk)
Branching and Conditionals
Use add_conditional_edges to route based on state:
def router(state: State) -> str:
return "continue" if state["topic"] else "exit"
builder.add_conditional_edges("a", router, {"continue": "b", "exit": "__end__"})
Map‑Reduce with Send
Inside a conditional edge, return a list of Send objects:
def map_to_parallel(state) -> list:
return [Send("process_item", {"item": item}) for item in state["items"]]
builder.add_conditional_edges("split", map_to_parallel, ["process_item"])
Human‑in‑Loop
def node_with_interrupt(state) -> dict:
response = langgraph.types.interrupt("Please confirm")
return {"user_confirmed": response == "yes"}
The caller receives the interrupt value. To resume:
graph.invoke(Command(resume="yes"), {"configurable": {"thread_id": "..."}})
Subgraphs
A StateGraph can be used as a node in another graph. The parent graph sees the subgraph’s input/output schemas. Subgraphs inherit checkpointer and store from the parent unless overridden.
Configuration and Integration
Compile Parameters
StateGraph.compile( checkpointer: Checkpointer = None, *, cache: BaseCache = None, store: BaseStore = None, interrupt_before: All | list[str] = None, interrupt_after: All | list[str] = None, debug: bool = False, name: str = None, transformers: Sequence[...] = None, ) -> CompiledStateGraph
checkpointer: e.g.,MemorySaver,SqliteSaver, cloud checkpointers.store: an instance ofBaseStorefor long‑term memory.interrupt_before/interrupt_after: nodes where execution always pauses.transformers: customStreamTransformerinstances for event streaming.
Node Defaults
StateGraph.set_node_defaults sets retry_policy, cache_policy, error_handler, timeout for all nodes. Per‑node values override defaults.
RetryPolicy
Accepts a single policy or sequence. Specifies backoff, max attempts, and exception types to retry. Example:
from langgraph.pregel.retry import RetryPolicy
retry_policy = RetryPolicy(max_attempts=3, initial_interval=1.0, backoff_factor=2)
TimeoutPolicy
Can be a float (seconds), timedelta, or TimeoutPolicy. Supported via timeout parameter on add_node or set_node_defaults.
Config and Context
config (a RunnableConfig) is passed to every node. It can contain configurable dict with thread_id, user_id, etc.
context (available in Pregel.invoke / .stream) is static per run and accessible via Runtime.context.
Production Patterns and Common Pitfalls
Durable Execution
Always use a checkpointer in production. Without one, interrupt will raise an error, and the graph has no fault tolerance. Checkpoints are saved at every node boundary.
Backward Compatibility
LangGraph applies the latest graph code to existing checkpoints. If nodes are renamed or edges change, stale checkpoints for in‑flight threads may cause errors. Mitigate by:
- Only adding new nodes/edges (never removing).
- Using conditional edges with default fallback routes.
- Testing version upgrades on sample threads via time‑travel.
Memory Patterns
| Memory Type | Implementation | Persistence | Lifetime |
|---|---|---|---|
| Short‑term | State (with add_messages reducer) stored in checkpoints | Checkpointer | Thread |
| Long‑term | BaseStore (e.g., InMemoryStore, PostgresStore) | store parameter | User / session |
Access store with get_store() in nodes.
Time‑Travel (Replay / Fork)
- Replay: Resume from a checkpoint ID. Nodes before the checkpoint are skipped; nodes after re‑execute.
- Fork: Resume with modified state. Enables exploring alternative paths without losing the original.
- Use
graph.get_state(config)andgraph.update_state(config, values)to inspect and mutate.
Fault Tolerance
Stack retry_policy, timeout, and error_handler per node:
def error_recovery(state, error):
# e.g., log error, return safe state
return {"error": str(error)}
builder.add_node("risky", risky_fn, timeout=30.0, retry_policy=my_retry, error_handler=error_recovery)
Common Pitfalls
- Reducer misunderstanding — If a state key has no reducer, writes overwrite. Use
add_messagesfor lists that should accumulate. - Missing checkpointer —
interruptwill fail silently or raise. - Stale subgraph schemas — If a subgraph’s input/output schema changes, parent edges may break.
- Unbounded conditional edges — Always supply a
path_map; missing values raiseValueError. - Async in Python < 3.11 —
@taskwith async functions requires Python 3.11+. - Stream consumption —
GraphRunStreamprojections are single‑consumer; iterating twice raises.
Interview Q&A
Q1: How does LangGraph differ from traditional workflow engines like Apache Airflow or Temporal?
A: LangGraph is designed specifically for LLM agent workflows where each step can be a dynamic, LLM‑driven decision. Unlike Airflow (DAG‑based) or Temporal (long‑running activities), LangGraph supports conditional branching, looping, and Send‑based map‑reduce that are tightly integrated with a mutable state managed by reducers. Checkpoints are taken at every node boundary automatically, enabling fine‑grained time‑travel and human‑in‑loop. Airflow and Temporal require explicit state management and do not natively understand LLM message schemas.
Q2: Explain the difference between stream_mode="values", "updates", and the new v3 event streaming API.
A: values yields the full state after each node completes. updates yields only the partial updates returned by each node (deltas). Both are flat dict streams. The v3 event streaming API returns a GraphRunStream object with typed projections (.messages, .values, .output, .subgraphs, etc.). Consumers can iterate multiple projections independently; the underlying stream pump advances the graph as needed. v3 also exposes lifecycle events and custom transformers for extension.
Q3: How would you implement a map‑reduce workflow where N items are processed in parallel and results are aggregated?
A: Use the Send API. In the map node, return a list of Send("process_item", {"item": item}) from a conditional edge. Each process_item node runs independently; the parent graph waits for all to complete. Aggregate in a reduce node that reads the shared state (each process_item writes to a list key with a reducer like operator.add). For nested parallelism, subgraphs can be used.
Q4: What happens if a node calls interrupt without a checkpointer? How do you resume?
A: Without a checkpointer, interrupt raises an exception because there is no mechanism to save state. With a checkpointer, the graph pauses, saves a checkpoint, and surfaces the interrupt value. To resume, pass a Command(resume=<value>) to the next invoke or stream call for the same thread_id.
Q5: Describe the role of Command vs returning a dict from a node.
A: A node returning a dict only updates state. Command can update state and dynamically control the next node (Command(graph=..., update=...)). It can also supply a resume value to continue after an interrupt. Command is essential for human‑in‑loop and for implementing loops where the next step depends on external feedback.
Q6: How does LangGraph handle backward compatibility when you update the graph code for existing threads?
A: LangGraph does not pin execution to a specific version. When a thread resumes, the latest compiled graph is applied to the checkpointed state. Nodes that already completed are skipped (their outputs are cached). New nodes or edges are executed. To avoid breaking changes, never remove nodes or edges; use conditional edges with fallback defaults. Test upgrades by replaying historical threads. The set_node_defaults pattern helps maintain consistent error handling across versions.
API Reference
loading…Loading reference…
No matches