LangSmith Python SDK — Deep Dive
🎧 27 min listen · 12 chapters · 📖 12 min read
Audio guide
12 chapters · 27 min. Press play in the bar below — it continues chapter to chapter.
- 01Why LangSmith Exists1:56
- 02Tracing The Basics2:02
- 03Understanding Run Trees2:18
- 04The LangSmith Client1:58
- 05How Traces Arrive2:14
- 06Datasets And Examples2:21
- 07Evaluation Fundamentals2:03
- 08Writing Good Evaluators2:23
- 09Feedback And Monitoring2:22
- 10Framework Integrations2:10
- 11Production Best Practices2:15
- 12Putting It Together2:31
What LangSmith Is and the Problems It Solves
LangSmith is a unified observability, evaluation, and testing platform for LLM applications. It addresses three fundamental challenges that emerge when moving LLM-powered systems from prototype to production:
Observability: LLM applications are non-deterministic, stateful, and composed of multiple interacting components (prompts, retrievers, LLM calls, tool executions). Traditional logging and APM tools lack the semantic understanding needed to trace through these systems. LangSmith provides structured tracing via runs (the atomic unit of observability, analogous to spans in distributed tracing) organized into traces (trees of runs representing a single invocation).
Evaluation: Standard software testing patterns break down when outputs are free-form text. LangSmith provides a framework for dataset-driven evaluation, where you define test cases (inputs + expected outputs or reference criteria), run your application against them, and compute metrics using evaluators—functions that score outputs programmatically or via LLM-as-judge.
Iteration: LangSmith connects observability and evaluation into a feedback loop. You can inspect failed traces, identify problematic runs, convert them into dataset examples, and re-evaluate after making changes. The platform also supports prompt management via push_prompt/pull_prompt, enabling version-controlled prompt deployment.
The SDK provides both synchronous (langsmith.client.Client) and asynchronous (langsmith.async_client.AsyncClient) interfaces, with the async variant using AsyncIterator and async with patterns throughout.
Client and Authentication/Configuration
Client Instantiation
The primary entry point is langsmith.client.Client. It can be instantiated with explicit credentials or configured via environment variables:
from langsmith import Client
# Environment variables: LANGCHAIN_API_KEY, LANGCHAIN_ENDPOINT (default: https://api.smith.langchain.com)
client = Client()
# Explicit configuration
client = Client(
api_key="ls_...",
api_url="https://api.smith.langchain.com",
)
The async counterpart is langsmith.async_client.AsyncClient, which shares the same constructor signature and authentication mechanisms:
from langsmith import AsyncClient
async_client = AsyncClient()
Key Configuration Parameters
api_key: LangSmith API key. Can also be set viaLANGCHAIN_API_KEYenvironment variable.api_url: Base URL for the LangSmith API. Defaults tohttps://api.smith.langchain.com. For self-hosted instances, point this to your deployment.tenant_id: Optional tenant identifier for multi-tenant setups.
Core Client Methods
The Client class provides methods for every major LangSmith operation:
| Method | Purpose |
|---|---|
create_run | Persist a single run to the API |
update_run | Update an existing run (e.g., add outputs, end time) |
batch_ingest_runs | Batch create/update multiple runs efficiently |
multipart_ingest | Batch ingest with separate create/update lists |
list_runs | Query runs with filters (project, run type, time range, etc.) |
create_dataset | Create a new dataset |
create_example | Add a single example to a dataset |
create_examples | Batch create examples |
list_examples | Retrieve examples from a dataset |
evaluate | Run evaluation against a dataset |
create_feedback | Attach feedback/scores to a run |
push_prompt | Version and store a prompt |
pull_prompt | Retrieve a prompt as a LangChain PromptTemplate |
Tracing: @traceable, Run Trees, and How Traces Reach LangSmith
The Run Model
A run is the fundamental unit of tracing. Each run has:
name: Human-readable identifier (e.g., "ChatOpenAI", "retrieve_docs")run_type: One of"llm","chain","tool","retriever","embedding","prompt","parser"inputs/outputs: The data flowing through the componentstart_time/end_time: Timing informationparent_run_id: Links child runs to their parent, forming a treetrace_id: Identifies the entire trace (all runs sharing the same trace_id)error: Captured exception information if the run failed
The @traceable Decorator
The simplest way to add tracing to any function is the @traceable decorator:
from langsmith import traceable
@traceable(run_type="chain", name="my_chain")
def my_function(inputs: dict) -> dict:
# Function logic here
return {"result": "processed"}
The decorator automatically:
- Creates a run with the function's inputs
- Wraps the function execution
- Captures outputs (or exceptions) on completion
- Manages parent-child relationships when nested
Run Trees and Nesting
When @traceable-decorated functions call other @traceable-decorated functions, LangSmith automatically builds a run tree:
@traceable(run_type="chain")
def retrieve_docs(query: str) -> list[str]:
# This becomes a child run of the parent
return vector_store.similarity_search(query)
@traceable(run_type="chain")
def generate_answer(query: str, context: list[str]) -> str:
# Another child run
return llm.invoke(f"Context: {context}\nQuery: {query}")
@traceable(run_type="chain")
def rag_pipeline(query: str) -> str:
docs = retrieve_docs(query) # Child run 1
answer = generate_answer(query, docs) # Child run 2
return answer
The resulting trace is a tree:
rag_pipeline (root)
├── retrieve_docs
└── generate_answer
How Traces Reach LangSmith
Traces are sent to LangSmith via one of two mechanisms:
-
Synchronous ingestion:
client.create_run()andclient.update_run()make individual HTTP calls. Suitable for low-throughput scenarios. -
Batch ingestion:
client.batch_ingest_runs()andclient.multipart_ingest()buffer runs and send them in batches. The SDK's internal tracing infrastructure (used by@traceableand framework integrations) automatically batches runs for efficiency.
The multipart_ingest method accepts separate create and update sequences:
client.multipart_ingest(
create=[run_dict_1, run_dict_2],
update=[existing_run_update]
)
Querying Traces
The list_runs method provides powerful filtering:
# Get all root runs in a project
runs = client.list_runs(
project_name="my-project",
is_root=True,
run_type="chain",
start_time=datetime.datetime.now() - datetime.timedelta(hours=1),
limit=100
)
# Filter by error status
error_runs = client.list_runs(
project_name="my-project",
error=True,
limit=50
)
# Filter using query language
runs = client.list_runs(
project_name="my-project",
filter='and(eq(run_type, "llm"), gt(total_tokens, 1000))'
)
The async variant returns AsyncIterator[ls_schemas.Run]:
async for run in async_client.list_runs(project_name="my-project"):
print(run.id, run.name)
Datasets and Examples
Creating Datasets
Datasets are collections of examples (input/output pairs or input-only test cases). Create them programmatically:
# Create a dataset
dataset = client.create_dataset(
dataset_name="qa-eval-set",
description="Question answering evaluation dataset",
data_type=ls_schemas.DataType.kv # key-value pairs
)
# Add examples individually
client.create_example(
inputs={"question": "What is the capital of France?"},
outputs={"answer": "Paris"},
dataset_id=dataset.id
)
# Batch create examples
client.create_examples(
dataset_id=dataset.id,
examples=[
{"inputs": {"question": "Q1"}, "outputs": {"answer": "A1"}},
{"inputs": {"question": "Q2"}, "outputs": {"answer": "A2"}},
]
)
Uploading from DataFrames and CSVs
import pandas as pd
df = pd.DataFrame({
"question": ["Q1", "Q2"],
"answer": ["A1", "A2"]
})
client.upload_dataframe(
df=df,
name="my-dataset",
input_keys=["question"],
output_keys=["answer"]
)
# Or from CSV
client.upload_csv(
csv_file="path/to/data.csv",
input_keys=["question"],
output_keys=["answer"],
name="csv-dataset"
)
Retrieving and Updating Examples
# List examples
examples = client.list_examples(
dataset_name="qa-eval-set",
limit=100
)
# Update an example
client.update_example(
example_id=example.id,
outputs={"answer": "Updated answer"}
)
# Batch update examples
client.update_examples(
dataset_name="qa-eval-set",
updates=[
{"id": ex1.id, "outputs": {"answer": "New A1"}},
{"id": ex2.id, "outputs": {"answer": "New A2"}},
]
)
Dataset Versioning
LangSmith supports dataset versioning via read_dataset_version and diff_dataset_versions:
# Get a specific version
version = client.read_dataset_version(
dataset_name="qa-eval-set",
as_of=datetime.datetime.now() - datetime.timedelta(days=1)
)
# Diff two versions
diff = client.diff_dataset_versions(
dataset_name="qa-eval-set",
from_version="2024-01-01T00:00:00Z",
to_version="2024-01-02T00:00:00Z"
)
Evaluation: Evaluators, evaluate(), and Feedback
The evaluate() Function
The core evaluation workflow uses client.evaluate():
results = client.evaluate(
target=my_rag_function, # Function that takes dict, returns dict
data="qa-eval-set", # Dataset name or ID
evaluators=[
exact_match_evaluator,
llm_as_judge_evaluator,
],
experiment_prefix="rag-v1",
max_concurrency=5,
num_repetitions=1,
)
Parameters:
target: The system under evaluation. Can be a function(dict) -> dict, a LangChainRunnable, an experiment ID (to re-evaluate), or a tuple of experiment IDs for comparative evaluation.data: Dataset name, ID, or iterable of examples.evaluators: Sequence of evaluator functions.summary_evaluators: Evaluators that run once over all results (e.g., for aggregate metrics).experiment_prefix: Prefix for the experiment name in LangSmith.max_concurrency: Maximum concurrent evaluations (0 = unlimited).num_repetitions: Number of times to run each example (for stochastic systems).blocking: IfTrue, wait for results; ifFalse, return immediately.error_handling:"log"(default) or"ignore"for evaluator errors.
The async variant client.aevaluate() accepts async targets and returns AsyncExperimentResults.
Writing Evaluators
An evaluator is a function that takes a Run (the target's output run) and an Example (the dataset example) and returns a dictionary with a key and score:
def exact_match_evaluator(run: Run, example: Example) -> dict:
predicted = run.outputs.get("answer", "")
expected = example.outputs.get("answer", "")
return {
"key": "exact_match",
"score": 1.0 if predicted == expected else 0.0
}
def llm_as_judge_evaluator(run: Run, example: Example) -> dict:
from langsmith.evaluation import evaluate
# Use an LLM to judge the quality
response = judge_llm.invoke(
f"Question: {example.inputs['question']}\n"
f"Answer: {run.outputs['answer']}\n"
f"Expected: {example.outputs['answer']}\n"
f"Score 0-5:"
)
return {
"key": "llm_judge",
"score": int(response.content.strip())
}
Creating Feedback Manually
Beyond evaluation, you can attach feedback to any run:
client.create_feedback(
run_id=run.id,
key="user_satisfaction",
score=4.5,
comment="User rated this response highly",
feedback_source_type=ls_schemas.FeedbackSourceType.API
)
For latency-sensitive environments, LangSmith recommends specifying trace_id to enable batch uploading of feedback in the background.
Pre-signed Feedback Tokens
For browser-based clients that shouldn't have access to API keys:
token = client.create_presigned_feedback_token(
run_id=run.id,
feedback_key="user_rating",
expiration=datetime.timedelta(hours=1)
)
# Give this token to the frontend
Wrappers and Framework Integrations
LangChain Integration
LangSmith is deeply integrated with LangChain. When LANGCHAIN_TRACING_V2=true is set, all LangChain runs are automatically traced:
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
# All invocations are automatically traced
chain = LLMChain(llm=ChatOpenAI(), prompt=my_prompt)
chain.run("Hello")
Runnable Integration
LangChain's Runnable interface is directly supported by evaluate():
from langchain.schema.runnable import RunnableLambda
runnable = RunnableLambda(my_function)
results = client.evaluate(
target=runnable, # Runnable is accepted directly
data="my-dataset",
evaluators=[my_evaluator]
)
LangGraph Integration
LangGraph applications are automatically traced when LangSmith tracing is enabled. Each node execution becomes a run, and the graph structure is preserved in the trace tree.
Prompt Management
The push_prompt and pull_prompt methods enable version-controlled prompt management:
# Push a prompt
commit_hash = client.push_prompt(
"my-prompt",
object=my_prompt_template, # LangChain PromptTemplate
description="Initial version",
is_public=False
)
# Pull a prompt
prompt = client.pull_prompt(
"my-prompt:latest", # Can specify version
include_model=True # Include model configuration
)
Production Patterns and Pitfalls
Batch Ingestion for High Throughput
In production, always use batch ingestion rather than individual create_run calls:
# Good for production
client.multipart_ingest(
create=[run1, run2, run3],
update=[run4_update]
)
# Avoid in production - too many HTTP calls
client.create_run(...)
client.create_run(...)
Feedback with Trace ID for Background Upload
# Enable background batch upload by specifying trace_id
client.create_feedback(
run_id=run.id,
trace_id=trace.id, # Critical for performance
key="quality",
score=0.95
)
Error Handling in Evaluation
Use error_handling="ignore" to prevent evaluator failures from stopping the entire evaluation:
results = client.evaluate(
target=my_function,
data="my-dataset",
evaluators=[potentially_unstable_evaluator],
error_handling="ignore" # Continue on evaluator errors
)
Concurrency Management
Set max_concurrency appropriately to avoid overwhelming your target system or the LangSmith API:
# Limit to 10 concurrent evaluations
results = client.evaluate(
target=my_function,
data="my-dataset",
evaluators=[evaluator],
max_concurrency=10
)
Pitfalls
-
Missing
trace_idin feedback: Withouttrace_id, feedback is sent synchronously, increasing latency. Always providetrace_idin production. -
Over-filtering in
list_runs: Complex filter strings can impact performance. Use specific parameters (project_name,run_type,is_root) before resorting to thefilterstring. -
Dataset mutation during evaluation: Don't modify a dataset while an evaluation is running against it. Use dataset versioning (
read_dataset_version) to pin a specific version. -
Unbounded
list_runs: Always specifylimitwhen callinglist_runsto avoid fetching millions of runs. -
Synchronous evaluation of async targets: Use
aevaluate()for async targets, notevaluate(). -
Ignoring
dangerously_allow_filesystem: Methods likeupdate_examplesandmultipart_ingesthave this parameter for a reason—enabling it without understanding the security implications can lead to path traversal vulnerabilities.
Interview Q&A
Q1: How does LangSmith handle trace context propagation across asynchronous boundaries, and what happens if a trace is started in a thread pool?
LangSmith uses Python's contextvars to propagate trace context. The @traceable decorator captures the current run context at decoration time and restores it when the decorated function executes. For thread pools, you must explicitly pass the context using contextvars.copy_context() or use run_in_executor with context propagation. Without this, child runs created in a thread pool will be orphaned (no parent) and appear as separate traces. The SDK provides langsmith.run_helpers.get_current_run_tree() to inspect the current context programmatically.
Q2: What's the difference between batch_ingest_runs and multipart_ingest, and when would you use each?
batch_ingest_runs is a simpler interface that accepts create and update lists but treats them as separate operations. multipart_ingest is the newer, preferred method that accepts the same parameters but uses a more efficient multipart upload protocol internally. Use multipart_ingest for all new code. The key advantage is that multipart_ingest can handle larger payloads more efficiently and supports file attachments via the dangerously_allow_filesystem parameter.
Q3: How would you implement a custom evaluator that compares two experimental runs (A/B testing) using evaluate()?
Pass a tuple of experiment IDs as the target parameter:
results = client.evaluate(
target=(experiment_a_id, experiment_b_id),
data="shared-dataset",
evaluators=[comparative_evaluator]
)
The evaluator receives both runs:
def comparative_evaluator(runs: list[Run], example: Example) -> dict:
run_a, run_b = runs
score = 1.0 if run_a.outputs["score"] > run_b.outputs["score"] else 0.0
return {"key": "a_beats_b", "score": score}
This returns ComparativeExperimentResults instead of ExperimentResults.
Q4: What's the performance impact of enabling LANGCHAIN_TRACING_V2=true in production, and how can you mitigate it?
The primary overhead comes from (1) serializing inputs/outputs to JSON, (2) network I/O for sending runs, and (3) queue management in the background thread. Mitigations include:
- Use
LANGCHAIN_SAMPLE_RATEto trace only a fraction of requests (e.g.,0.1for 10%) - Set
LANGCHAIN_MAX_CONCURRENCYto limit background upload threads - Use
batch_ingest_runswith appropriate batch sizes (100-500 runs per batch) - For ultra-low-latency requirements, consider sampling or using the async client with proper backpressure
Q5: How does LangSmith handle PII in traces, and what controls are available?
LangSmith provides several mechanisms:
- Input/output filtering: You can configure the SDK to redact or hash specific keys before sending
- Project-level retention policies: Configure how long traces are retained
- Self-hosted deployment: For complete data sovereignty, run LangSmith on your own infrastructure
- The
dangerously_allow_filesystemparameter: WhenFalse(default), the SDK refuses to read files from disk, preventing accidental PII leakage through file paths - API-level access controls: RBAC with workspace-level isolation
Q6: You're evaluating a stochastic system (e.g., an LLM with temperature > 0). How do you design an evaluation that produces statistically meaningful results?
Use num_repetitions to run each example multiple times:
results = client.evaluate(
target=stochastic_system,
data="test-set",
evaluators=[quality_evaluator],
num_repetitions=5 # Run each example 5 times
)
Then analyze the distribution of scores rather than point estimates. Use summary_evaluators to compute aggregate statistics:
def mean_score_summary(results: list[dict]) -> dict:
scores = [r["score"] for r in results if r["key"] == "quality"]
return {
"key": "mean_quality",
"score": sum(scores) / len(scores),
"std": statistics.stdev(scores) if len(scores) > 1 else 0
}
For critical evaluations, consider using confidence intervals or bootstrap resampling to quantify uncertainty.
API Reference
loading…Loading reference…
No matches