Back to Knowledge Base

LangSmith Python SDK — Deep Dive

🎧 27 min listen · 12 chapters · 📖 12 min read

Audio guide

12 chapters · 27 min. Press play in the bar below — it continues chapter to chapter.

  1. 01Why LangSmith Exists1:56
  2. 02Tracing The Basics2:02
  3. 03Understanding Run Trees2:18
  4. 04The LangSmith Client1:58
  5. 05How Traces Arrive2:14
  6. 06Datasets And Examples2:21
  7. 07Evaluation Fundamentals2:03
  8. 08Writing Good Evaluators2:23
  9. 09Feedback And Monitoring2:22
  10. 10Framework Integrations2:10
  11. 11Production Best Practices2:15
  12. 12Putting It Together2:31

What LangSmith Is and the Problems It Solves

LangSmith is a unified observability, evaluation, and testing platform for LLM applications. It addresses three fundamental challenges that emerge when moving LLM-powered systems from prototype to production:

Observability: LLM applications are non-deterministic, stateful, and composed of multiple interacting components (prompts, retrievers, LLM calls, tool executions). Traditional logging and APM tools lack the semantic understanding needed to trace through these systems. LangSmith provides structured tracing via runs (the atomic unit of observability, analogous to spans in distributed tracing) organized into traces (trees of runs representing a single invocation).

Evaluation: Standard software testing patterns break down when outputs are free-form text. LangSmith provides a framework for dataset-driven evaluation, where you define test cases (inputs + expected outputs or reference criteria), run your application against them, and compute metrics using evaluators—functions that score outputs programmatically or via LLM-as-judge.

Iteration: LangSmith connects observability and evaluation into a feedback loop. You can inspect failed traces, identify problematic runs, convert them into dataset examples, and re-evaluate after making changes. The platform also supports prompt management via push_prompt/pull_prompt, enabling version-controlled prompt deployment.

The SDK provides both synchronous (langsmith.client.Client) and asynchronous (langsmith.async_client.AsyncClient) interfaces, with the async variant using AsyncIterator and async with patterns throughout.

Client and Authentication/Configuration

Client Instantiation

The primary entry point is langsmith.client.Client. It can be instantiated with explicit credentials or configured via environment variables:

python
from langsmith import Client

# Environment variables: LANGCHAIN_API_KEY, LANGCHAIN_ENDPOINT (default: https://api.smith.langchain.com)
client = Client()

# Explicit configuration
client = Client(
    api_key="ls_...",
    api_url="https://api.smith.langchain.com",
)

The async counterpart is langsmith.async_client.AsyncClient, which shares the same constructor signature and authentication mechanisms:

python
from langsmith import AsyncClient

async_client = AsyncClient()

Key Configuration Parameters

  • api_key: LangSmith API key. Can also be set via LANGCHAIN_API_KEY environment variable.
  • api_url: Base URL for the LangSmith API. Defaults to https://api.smith.langchain.com. For self-hosted instances, point this to your deployment.
  • tenant_id: Optional tenant identifier for multi-tenant setups.

Core Client Methods

The Client class provides methods for every major LangSmith operation:

MethodPurpose
create_runPersist a single run to the API
update_runUpdate an existing run (e.g., add outputs, end time)
batch_ingest_runsBatch create/update multiple runs efficiently
multipart_ingestBatch ingest with separate create/update lists
list_runsQuery runs with filters (project, run type, time range, etc.)
create_datasetCreate a new dataset
create_exampleAdd a single example to a dataset
create_examplesBatch create examples
list_examplesRetrieve examples from a dataset
evaluateRun evaluation against a dataset
create_feedbackAttach feedback/scores to a run
push_promptVersion and store a prompt
pull_promptRetrieve a prompt as a LangChain PromptTemplate

Tracing: @traceable, Run Trees, and How Traces Reach LangSmith

The Run Model

A run is the fundamental unit of tracing. Each run has:

  • name: Human-readable identifier (e.g., "ChatOpenAI", "retrieve_docs")
  • run_type: One of "llm", "chain", "tool", "retriever", "embedding", "prompt", "parser"
  • inputs/outputs: The data flowing through the component
  • start_time/end_time: Timing information
  • parent_run_id: Links child runs to their parent, forming a tree
  • trace_id: Identifies the entire trace (all runs sharing the same trace_id)
  • error: Captured exception information if the run failed

The @traceable Decorator

The simplest way to add tracing to any function is the @traceable decorator:

python
from langsmith import traceable

@traceable(run_type="chain", name="my_chain")
def my_function(inputs: dict) -> dict:
    # Function logic here
    return {"result": "processed"}

The decorator automatically:

  1. Creates a run with the function's inputs
  2. Wraps the function execution
  3. Captures outputs (or exceptions) on completion
  4. Manages parent-child relationships when nested

Run Trees and Nesting

When @traceable-decorated functions call other @traceable-decorated functions, LangSmith automatically builds a run tree:

python
@traceable(run_type="chain")
def retrieve_docs(query: str) -> list[str]:
    # This becomes a child run of the parent
    return vector_store.similarity_search(query)

@traceable(run_type="chain")
def generate_answer(query: str, context: list[str]) -> str:
    # Another child run
    return llm.invoke(f"Context: {context}\nQuery: {query}")

@traceable(run_type="chain")
def rag_pipeline(query: str) -> str:
    docs = retrieve_docs(query)  # Child run 1
    answer = generate_answer(query, docs)  # Child run 2
    return answer

The resulting trace is a tree:

rag_pipeline (root)
├── retrieve_docs
└── generate_answer

How Traces Reach LangSmith

Traces are sent to LangSmith via one of two mechanisms:

  1. Synchronous ingestion: client.create_run() and client.update_run() make individual HTTP calls. Suitable for low-throughput scenarios.

  2. Batch ingestion: client.batch_ingest_runs() and client.multipart_ingest() buffer runs and send them in batches. The SDK's internal tracing infrastructure (used by @traceable and framework integrations) automatically batches runs for efficiency.

The multipart_ingest method accepts separate create and update sequences:

python
client.multipart_ingest(
    create=[run_dict_1, run_dict_2],
    update=[existing_run_update]
)

Querying Traces

The list_runs method provides powerful filtering:

python
# Get all root runs in a project
runs = client.list_runs(
    project_name="my-project",
    is_root=True,
    run_type="chain",
    start_time=datetime.datetime.now() - datetime.timedelta(hours=1),
    limit=100
)

# Filter by error status
error_runs = client.list_runs(
    project_name="my-project",
    error=True,
    limit=50
)

# Filter using query language
runs = client.list_runs(
    project_name="my-project",
    filter='and(eq(run_type, "llm"), gt(total_tokens, 1000))'
)

The async variant returns AsyncIterator[ls_schemas.Run]:

python
async for run in async_client.list_runs(project_name="my-project"):
    print(run.id, run.name)

Datasets and Examples

Creating Datasets

Datasets are collections of examples (input/output pairs or input-only test cases). Create them programmatically:

python
# Create a dataset
dataset = client.create_dataset(
    dataset_name="qa-eval-set",
    description="Question answering evaluation dataset",
    data_type=ls_schemas.DataType.kv  # key-value pairs
)

# Add examples individually
client.create_example(
    inputs={"question": "What is the capital of France?"},
    outputs={"answer": "Paris"},
    dataset_id=dataset.id
)

# Batch create examples
client.create_examples(
    dataset_id=dataset.id,
    examples=[
        {"inputs": {"question": "Q1"}, "outputs": {"answer": "A1"}},
        {"inputs": {"question": "Q2"}, "outputs": {"answer": "A2"}},
    ]
)

Uploading from DataFrames and CSVs

python
import pandas as pd

df = pd.DataFrame({
    "question": ["Q1", "Q2"],
    "answer": ["A1", "A2"]
})

client.upload_dataframe(
    df=df,
    name="my-dataset",
    input_keys=["question"],
    output_keys=["answer"]
)

# Or from CSV
client.upload_csv(
    csv_file="path/to/data.csv",
    input_keys=["question"],
    output_keys=["answer"],
    name="csv-dataset"
)

Retrieving and Updating Examples

python
# List examples
examples = client.list_examples(
    dataset_name="qa-eval-set",
    limit=100
)

# Update an example
client.update_example(
    example_id=example.id,
    outputs={"answer": "Updated answer"}
)

# Batch update examples
client.update_examples(
    dataset_name="qa-eval-set",
    updates=[
        {"id": ex1.id, "outputs": {"answer": "New A1"}},
        {"id": ex2.id, "outputs": {"answer": "New A2"}},
    ]
)

Dataset Versioning

LangSmith supports dataset versioning via read_dataset_version and diff_dataset_versions:

python
# Get a specific version
version = client.read_dataset_version(
    dataset_name="qa-eval-set",
    as_of=datetime.datetime.now() - datetime.timedelta(days=1)
)

# Diff two versions
diff = client.diff_dataset_versions(
    dataset_name="qa-eval-set",
    from_version="2024-01-01T00:00:00Z",
    to_version="2024-01-02T00:00:00Z"
)

Evaluation: Evaluators, evaluate(), and Feedback

The evaluate() Function

The core evaluation workflow uses client.evaluate():

python
results = client.evaluate(
    target=my_rag_function,  # Function that takes dict, returns dict
    data="qa-eval-set",      # Dataset name or ID
    evaluators=[
        exact_match_evaluator,
        llm_as_judge_evaluator,
    ],
    experiment_prefix="rag-v1",
    max_concurrency=5,
    num_repetitions=1,
)

Parameters:

  • target: The system under evaluation. Can be a function (dict) -> dict, a LangChain Runnable, an experiment ID (to re-evaluate), or a tuple of experiment IDs for comparative evaluation.
  • data: Dataset name, ID, or iterable of examples.
  • evaluators: Sequence of evaluator functions.
  • summary_evaluators: Evaluators that run once over all results (e.g., for aggregate metrics).
  • experiment_prefix: Prefix for the experiment name in LangSmith.
  • max_concurrency: Maximum concurrent evaluations (0 = unlimited).
  • num_repetitions: Number of times to run each example (for stochastic systems).
  • blocking: If True, wait for results; if False, return immediately.
  • error_handling: "log" (default) or "ignore" for evaluator errors.

The async variant client.aevaluate() accepts async targets and returns AsyncExperimentResults.

Writing Evaluators

An evaluator is a function that takes a Run (the target's output run) and an Example (the dataset example) and returns a dictionary with a key and score:

python
def exact_match_evaluator(run: Run, example: Example) -> dict:
    predicted = run.outputs.get("answer", "")
    expected = example.outputs.get("answer", "")
    return {
        "key": "exact_match",
        "score": 1.0 if predicted == expected else 0.0
    }

def llm_as_judge_evaluator(run: Run, example: Example) -> dict:
    from langsmith.evaluation import evaluate
    # Use an LLM to judge the quality
    response = judge_llm.invoke(
        f"Question: {example.inputs['question']}\n"
        f"Answer: {run.outputs['answer']}\n"
        f"Expected: {example.outputs['answer']}\n"
        f"Score 0-5:"
    )
    return {
        "key": "llm_judge",
        "score": int(response.content.strip())
    }

Creating Feedback Manually

Beyond evaluation, you can attach feedback to any run:

python
client.create_feedback(
    run_id=run.id,
    key="user_satisfaction",
    score=4.5,
    comment="User rated this response highly",
    feedback_source_type=ls_schemas.FeedbackSourceType.API
)

For latency-sensitive environments, LangSmith recommends specifying trace_id to enable batch uploading of feedback in the background.

Pre-signed Feedback Tokens

For browser-based clients that shouldn't have access to API keys:

python
token = client.create_presigned_feedback_token(
    run_id=run.id,
    feedback_key="user_rating",
    expiration=datetime.timedelta(hours=1)
)
# Give this token to the frontend

Wrappers and Framework Integrations

LangChain Integration

LangSmith is deeply integrated with LangChain. When LANGCHAIN_TRACING_V2=true is set, all LangChain runs are automatically traced:

python
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "my-project"

from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

# All invocations are automatically traced
chain = LLMChain(llm=ChatOpenAI(), prompt=my_prompt)
chain.run("Hello")

Runnable Integration

LangChain's Runnable interface is directly supported by evaluate():

python
from langchain.schema.runnable import RunnableLambda

runnable = RunnableLambda(my_function)
results = client.evaluate(
    target=runnable,  # Runnable is accepted directly
    data="my-dataset",
    evaluators=[my_evaluator]
)

LangGraph Integration

LangGraph applications are automatically traced when LangSmith tracing is enabled. Each node execution becomes a run, and the graph structure is preserved in the trace tree.

Prompt Management

The push_prompt and pull_prompt methods enable version-controlled prompt management:

python
# Push a prompt
commit_hash = client.push_prompt(
    "my-prompt",
    object=my_prompt_template,  # LangChain PromptTemplate
    description="Initial version",
    is_public=False
)

# Pull a prompt
prompt = client.pull_prompt(
    "my-prompt:latest",  # Can specify version
    include_model=True   # Include model configuration
)

Production Patterns and Pitfalls

Batch Ingestion for High Throughput

In production, always use batch ingestion rather than individual create_run calls:

python
# Good for production
client.multipart_ingest(
    create=[run1, run2, run3],
    update=[run4_update]
)

# Avoid in production - too many HTTP calls
client.create_run(...)
client.create_run(...)

Feedback with Trace ID for Background Upload

python
# Enable background batch upload by specifying trace_id
client.create_feedback(
    run_id=run.id,
    trace_id=trace.id,  # Critical for performance
    key="quality",
    score=0.95
)

Error Handling in Evaluation

Use error_handling="ignore" to prevent evaluator failures from stopping the entire evaluation:

python
results = client.evaluate(
    target=my_function,
    data="my-dataset",
    evaluators=[potentially_unstable_evaluator],
    error_handling="ignore"  # Continue on evaluator errors
)

Concurrency Management

Set max_concurrency appropriately to avoid overwhelming your target system or the LangSmith API:

python
# Limit to 10 concurrent evaluations
results = client.evaluate(
    target=my_function,
    data="my-dataset",
    evaluators=[evaluator],
    max_concurrency=10
)

Pitfalls

  1. Missing trace_id in feedback: Without trace_id, feedback is sent synchronously, increasing latency. Always provide trace_id in production.

  2. Over-filtering in list_runs: Complex filter strings can impact performance. Use specific parameters (project_name, run_type, is_root) before resorting to the filter string.

  3. Dataset mutation during evaluation: Don't modify a dataset while an evaluation is running against it. Use dataset versioning (read_dataset_version) to pin a specific version.

  4. Unbounded list_runs: Always specify limit when calling list_runs to avoid fetching millions of runs.

  5. Synchronous evaluation of async targets: Use aevaluate() for async targets, not evaluate().

  6. Ignoring dangerously_allow_filesystem: Methods like update_examples and multipart_ingest have this parameter for a reason—enabling it without understanding the security implications can lead to path traversal vulnerabilities.

Interview Q&A

Q1: How does LangSmith handle trace context propagation across asynchronous boundaries, and what happens if a trace is started in a thread pool?

LangSmith uses Python's contextvars to propagate trace context. The @traceable decorator captures the current run context at decoration time and restores it when the decorated function executes. For thread pools, you must explicitly pass the context using contextvars.copy_context() or use run_in_executor with context propagation. Without this, child runs created in a thread pool will be orphaned (no parent) and appear as separate traces. The SDK provides langsmith.run_helpers.get_current_run_tree() to inspect the current context programmatically.

Q2: What's the difference between batch_ingest_runs and multipart_ingest, and when would you use each?

batch_ingest_runs is a simpler interface that accepts create and update lists but treats them as separate operations. multipart_ingest is the newer, preferred method that accepts the same parameters but uses a more efficient multipart upload protocol internally. Use multipart_ingest for all new code. The key advantage is that multipart_ingest can handle larger payloads more efficiently and supports file attachments via the dangerously_allow_filesystem parameter.

Q3: How would you implement a custom evaluator that compares two experimental runs (A/B testing) using evaluate()?

Pass a tuple of experiment IDs as the target parameter:

python
results = client.evaluate(
    target=(experiment_a_id, experiment_b_id),
    data="shared-dataset",
    evaluators=[comparative_evaluator]
)

The evaluator receives both runs:

python
def comparative_evaluator(runs: list[Run], example: Example) -> dict:
    run_a, run_b = runs
    score = 1.0 if run_a.outputs["score"] > run_b.outputs["score"] else 0.0
    return {"key": "a_beats_b", "score": score}

This returns ComparativeExperimentResults instead of ExperimentResults.

Q4: What's the performance impact of enabling LANGCHAIN_TRACING_V2=true in production, and how can you mitigate it?

The primary overhead comes from (1) serializing inputs/outputs to JSON, (2) network I/O for sending runs, and (3) queue management in the background thread. Mitigations include:

  • Use LANGCHAIN_SAMPLE_RATE to trace only a fraction of requests (e.g., 0.1 for 10%)
  • Set LANGCHAIN_MAX_CONCURRENCY to limit background upload threads
  • Use batch_ingest_runs with appropriate batch sizes (100-500 runs per batch)
  • For ultra-low-latency requirements, consider sampling or using the async client with proper backpressure

Q5: How does LangSmith handle PII in traces, and what controls are available?

LangSmith provides several mechanisms:

  • Input/output filtering: You can configure the SDK to redact or hash specific keys before sending
  • Project-level retention policies: Configure how long traces are retained
  • Self-hosted deployment: For complete data sovereignty, run LangSmith on your own infrastructure
  • The dangerously_allow_filesystem parameter: When False (default), the SDK refuses to read files from disk, preventing accidental PII leakage through file paths
  • API-level access controls: RBAC with workspace-level isolation

Q6: You're evaluating a stochastic system (e.g., an LLM with temperature > 0). How do you design an evaluation that produces statistically meaningful results?

Use num_repetitions to run each example multiple times:

python
results = client.evaluate(
    target=stochastic_system,
    data="test-set",
    evaluators=[quality_evaluator],
    num_repetitions=5  # Run each example 5 times
)

Then analyze the distribution of scores rather than point estimates. Use summary_evaluators to compute aggregate statistics:

python
def mean_score_summary(results: list[dict]) -> dict:
    scores = [r["score"] for r in results if r["key"] == "quality"]
    return {
        "key": "mean_quality",
        "score": sum(scores) / len(scores),
        "std": statistics.stdev(scores) if len(scores) > 1 else 0
    }

For critical evaluations, consider using confidence intervals or bootstrap resampling to quantify uncertainty.

API Reference

loading…

Loading reference…

No matches