Back to Knowledge Base

LangGraph — Deep Dive

🎧 19 min listen · 12 chapters

Go deeper

Agents with LangGraph— the agent loop, tools, state & memory · Autonomous agents — durable execution, guardrails, multi-agent systems · Lead-gen in production — 60+ StateGraphs, fan-out, circuit breakers

🎧 Complete audio guide — all three LangGraph pages in one narration

What is LangGraph?

LangGraph is a framework for building stateful, multi-step agentic workflows. It solves two fundamental problems in production LLM applications:

  1. State management across steps — agentic systems need to maintain conversation history, intermediate computation results, and branching state. LangGraph provides a first-class state abstraction with reducers (e.g. add_messages) that dictate how updates merge.

  2. Durable execution with control flow — long-running or human-in-the-loop workflows must survive process crashes, support pauses/resumes, and allow time-travel debugging. LangGraph persists every step via checkpoints and exposes primitives like interrupt() and Command for explicit suspension and resumption.

The runtime, Pregel, is an actor–channel system: actors (nodes) read from and write to channels (state keys). The framework handles scheduling, concurrency, and checkpointing automatically.


Core Concepts and Primary APIs

StateGraph and MessageGraph

ClassPurpose
langgraph.graph.state.StateGraph(StateT, ContextT, InputT, OutputT)A graph whose nodes communicate by reading/writing to a shared state. Each node receives the full state and returns a partial state update.
langgraph.graph.message.MessageGraphA shortcut StateGraph where the entire state is a single list of messages (append‑only by default).

State reducers are Annotated hints on the state dict. The most common is add_messages:

python
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]

add_messages merges two lists, updating existing messages by ID (append‑only unless the ID matches).


Nodes

Nodes are functions (or runnables) that take the current state and return a dict of updates. Added via:

python
graph.add_node(node, action, *, defer=False, metadata, input_schema, retry_policy, cache_policy, error_handler, destinations, timeout)
  • node: a name (str) or the function itself.
  • action: the callable (if node is a string).
  • defer: if True, execution is postponed until the run is about to end.
  • input_schema: override the default state schema for this node.
  • retry_policy, timeout, error_handler, cache_policy: per‑node fault‑tolerance.

Edges

Three types of edges:

  1. Direct edgeadd_edge(start_key, end_key).
    Waits for start_key (or all nodes in a list) to finish before executing end_key.

  2. Conditional edgeadd_conditional_edges(source, path, path_map).
    After source finishes, path (a callable or runnable) returns the next node name(s). path_map maps hashable return values to node names.

  3. Send APIlanggraph.types.Send(node, state).
    Used inside conditional edge paths to dynamically invoke a node with a custom state (e.g., for map‑reduce). The sent state can differ from the core graph state.


Command

langgraph.types.Command bundles state updates with control flow “hops”.

python
Command(graph=None, update=None, resume=..., ...)
  • graph: None for current graph, Command.PARENT for the closest parent.
  • update: dict of state updates.
  • resume: value to continue from after an interrupt.

Commands can be returned from nodes to apply updates and direct which node runs next.


interrupt

python
langgraph.types.interrupt(value: Any) -> Any

Pauses execution and surfaces value to the caller. The run saves a checkpoint and waits. To resume, call invoke / stream with a Command(resume=...).

Requires a checkpointer on the compiled graph.


Pregel Runtime

langgraph.pregel.main.Pregel manages execution:

MethodDescription
invoke(input, config, *, context, stream_mode, ...)Synchronous single‑input run.
ainvoke(...)Async variant.
stream(input, config, *, stream_mode, ...)Synchronous streaming (iterator of dicts).
astream(...)Async streaming.
stream_events(input, config, *, version, ...)Event streaming (v1/v2 → StreamEvent dicts; v3 → GraphRunStream object with typed projections).

Key parameters:

  • stream_mode: "values" (full state), "updates" (node deltas), "messages" (token‑level), "custom", "tasks", "checkpoints", "debug".
  • version: for stream_events, "v3" enables the new typed‑projection API.
  • interrupt_before / interrupt_after: node names (or All) where execution pauses.
  • durability: controls checkpoint saves.
  • control: RunControl object for throttling, max steps, etc.
  • output_keys: filter which keys appear in the output.
  • subgraphs: if True, emit subgraph events.

Functional API

An alternative, more Pythonic way to define workflows using @entrypoint and @task.

python
from langgraph.func import entrypoint, task

@task
def fetch_data(url: str) -> str: ...

@entrypoint(checkpointer=...)
def my_workflow(input: dict) -> dict:
    data = fetch_data(input["url"])
    return {"result": data}
  • entrypoint wraps a function that runs as the graph root. It can return a final(value, save=...) to decouple the return value from what is saved to the checkpoint.
  • task defines a sub‑computation that can be called from within an entrypoint. Tasks are automatically tracked for persistence.
  • Both support retry_policy, cache_policy, timeout.

Streaming / Event Streaming (v3)

The new event streaming API (stream_events(version="v3")) returns a GraphRunStream (sync) or AsyncGraphRunStream (async). Consumers iterate over typed projections:

python
stream = graph.stream_events(input, version="v3")
for message in stream.messages:         # AsyncChatModelStream objects
    for token in message.text:
        print(token)
for value in stream.values:             # state snapshots
    ...

Transformers are the building blocks of projections:

  • MessagesTransformer — captures message events.
  • ValuesTransformer — captures state snapshots (run.values).
  • LifecycleTransformer — surfaces subgraph lifecycle events.
  • SubgraphTransformer — discovers subgraph invocations.
  • TasksTransformer — captures raw task events.

Non‑native transformers are registered with a custom: prefix in the main event log. StreamMux routes events through transformers in registration order.


Runtime Utilities

UtilityDescription
langgraph.runtime.RuntimeInjected into nodes, provides context, store, stream_writer, previous, execution_info.
langgraph.config.get_store()Get BaseStore inside a node (for long‑term memory).
langgraph.config.get_stream_writer()Get StreamWriter to emit custom streaming data.

Main Workflows and How the Pieces Fit Together

Graph API Flow (Typical)

python
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]
    topic: str

def node_a(state: State) -> dict:
    # return partial update
    return {"messages": [("assistant", "Hello from A")]}

# Build graph
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge("__start__", "a")
builder.add_edge("a", "__end__")

# Compile with checkpointing
from langgraph.checkpoint.memory import InMemorySaver
graph = builder.compile(checkpointer=InMemorySaver())

# Run
for chunk in graph.stream({"messages": [("user", "hi")]}, stream_mode="values"):
    print(chunk)

Branching and Conditionals

Use add_conditional_edges to route based on state:

python
def router(state: State) -> str:
    return "continue" if state["topic"] else "exit"

builder.add_conditional_edges("a", router, {"continue": "b", "exit": "__end__"})

Map‑Reduce with Send

Inside a conditional edge, return a list of Send objects:

python
def map_to_parallel(state) -> list:
    return [Send("process_item", {"item": item}) for item in state["items"]]

builder.add_conditional_edges("split", map_to_parallel, ["process_item"])

Human‑in‑Loop

python
def node_with_interrupt(state) -> dict:
    response = langgraph.types.interrupt("Please confirm")
    return {"user_confirmed": response == "yes"}

The caller receives the interrupt value. To resume:

python
graph.invoke(Command(resume="yes"), {"configurable": {"thread_id": "..."}})

Subgraphs

A StateGraph can be used as a node in another graph. The parent graph sees the subgraph’s input/output schemas. Subgraphs inherit checkpointer and store from the parent unless overridden.


Configuration and Integration

Compile Parameters

StateGraph.compile( checkpointer: Checkpointer = None, *, cache: BaseCache = None, store: BaseStore = None, interrupt_before: All | list[str] = None, interrupt_after: All | list[str] = None, debug: bool = False, name: str = None, transformers: Sequence[...] = None, ) -> CompiledStateGraph

  • checkpointer: e.g., MemorySaver, SqliteSaver, cloud checkpointers.
  • store: an instance of BaseStore for long‑term memory.
  • interrupt_before / interrupt_after: nodes where execution always pauses.
  • transformers: custom StreamTransformer instances for event streaming.

Node Defaults

StateGraph.set_node_defaults sets retry_policy, cache_policy, error_handler, timeout for all nodes. Per‑node values override defaults.

RetryPolicy

Accepts a single policy or sequence. Specifies backoff, max attempts, and exception types to retry. Example:

python
from langgraph.pregel.retry import RetryPolicy
retry_policy = RetryPolicy(max_attempts=3, initial_interval=1.0, backoff_factor=2)

TimeoutPolicy

Can be a float (seconds), timedelta, or TimeoutPolicy. Supported via timeout parameter on add_node or set_node_defaults.

Config and Context

config (a RunnableConfig) is passed to every node. It can contain configurable dict with thread_id, user_id, etc.
context (available in Pregel.invoke / .stream) is static per run and accessible via Runtime.context.


Production Patterns and Common Pitfalls

Durable Execution

Always use a checkpointer in production. Without one, interrupt will raise an error, and the graph has no fault tolerance. Checkpoints are saved at every node boundary.

Backward Compatibility

LangGraph applies the latest graph code to existing checkpoints. If nodes are renamed or edges change, stale checkpoints for in‑flight threads may cause errors. Mitigate by:

  • Only adding new nodes/edges (never removing).
  • Using conditional edges with default fallback routes.
  • Testing version upgrades on sample threads via time‑travel.

Memory Patterns

Memory TypeImplementationPersistenceLifetime
Short‑termState (with add_messages reducer) stored in checkpointsCheckpointerThread
Long‑termBaseStore (e.g., InMemoryStore, PostgresStore)store parameterUser / session

Access store with get_store() in nodes.

Time‑Travel (Replay / Fork)

  • Replay: Resume from a checkpoint ID. Nodes before the checkpoint are skipped; nodes after re‑execute.
  • Fork: Resume with modified state. Enables exploring alternative paths without losing the original.
  • Use graph.get_state(config) and graph.update_state(config, values) to inspect and mutate.

Fault Tolerance

Stack retry_policy, timeout, and error_handler per node:

python
def error_recovery(state, error):
    # e.g., log error, return safe state
    return {"error": str(error)}

builder.add_node("risky", risky_fn, timeout=30.0, retry_policy=my_retry, error_handler=error_recovery)

Common Pitfalls

  1. Reducer misunderstanding — If a state key has no reducer, writes overwrite. Use add_messages for lists that should accumulate.
  2. Missing checkpointerinterrupt will fail silently or raise.
  3. Stale subgraph schemas — If a subgraph’s input/output schema changes, parent edges may break.
  4. Unbounded conditional edges — Always supply a path_map; missing values raise ValueError.
  5. Async in Python < 3.11@task with async functions requires Python 3.11+.
  6. Stream consumptionGraphRunStream projections are single‑consumer; iterating twice raises.

Interview Q&A

Q1: How does LangGraph differ from traditional workflow engines like Apache Airflow or Temporal?

A: LangGraph is designed specifically for LLM agent workflows where each step can be a dynamic, LLM‑driven decision. Unlike Airflow (DAG‑based) or Temporal (long‑running activities), LangGraph supports conditional branching, looping, and Send‑based map‑reduce that are tightly integrated with a mutable state managed by reducers. Checkpoints are taken at every node boundary automatically, enabling fine‑grained time‑travel and human‑in‑loop. Airflow and Temporal require explicit state management and do not natively understand LLM message schemas.


Q2: Explain the difference between stream_mode="values", "updates", and the new v3 event streaming API.

A: values yields the full state after each node completes. updates yields only the partial updates returned by each node (deltas). Both are flat dict streams. The v3 event streaming API returns a GraphRunStream object with typed projections (.messages, .values, .output, .subgraphs, etc.). Consumers can iterate multiple projections independently; the underlying stream pump advances the graph as needed. v3 also exposes lifecycle events and custom transformers for extension.


Q3: How would you implement a map‑reduce workflow where N items are processed in parallel and results are aggregated?

A: Use the Send API. In the map node, return a list of Send("process_item", {"item": item}) from a conditional edge. Each process_item node runs independently; the parent graph waits for all to complete. Aggregate in a reduce node that reads the shared state (each process_item writes to a list key with a reducer like operator.add). For nested parallelism, subgraphs can be used.


Q4: What happens if a node calls interrupt without a checkpointer? How do you resume?

A: Without a checkpointer, interrupt raises an exception because there is no mechanism to save state. With a checkpointer, the graph pauses, saves a checkpoint, and surfaces the interrupt value. To resume, pass a Command(resume=<value>) to the next invoke or stream call for the same thread_id.


Q5: Describe the role of Command vs returning a dict from a node.

A: A node returning a dict only updates state. Command can update state and dynamically control the next node (Command(graph=..., update=...)). It can also supply a resume value to continue after an interrupt. Command is essential for human‑in‑loop and for implementing loops where the next step depends on external feedback.


Q6: How does LangGraph handle backward compatibility when you update the graph code for existing threads?

A: LangGraph does not pin execution to a specific version. When a thread resumes, the latest compiled graph is applied to the checkpointed state. Nodes that already completed are skipped (their outputs are cached). New nodes or edges are executed. To avoid breaking changes, never remove nodes or edges; use conditional edges with fallback defaults. Test upgrades by replaying historical threads. The set_node_defaults pattern helps maintain consistent error handling across versions.

API Reference

loading…

Loading reference…

No matches