Deploying AI systems in production requires a fundamentally different mindset from prototyping. Prompt engineering in a notebook is not software engineering. Production AI systems must handle failures gracefully, scale predictably, produce consistent results, and operate within cost budgets. This article catalogs the essential architectural patterns, workflow orchestration strategies, and operational practices that distinguish reliable production AI from fragile demos.
When processing data that exceeds a single LLM context window, map-reduce decomposes the task into parallelizable chunks:
import asyncio
from typing import TypeVar, Callable, List
T = TypeVar('T')
class MapReduceProcessor:
def __init__(self, llm_client, max_concurrent=10):
self.llm = llm_client
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process(
self,
items: List[T],
map_prompt: str,
reduce_prompt: str,
chunk_size: int = 5,
) -> str:
# MAP: Process each chunk independently
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
map_tasks = [self.map_chunk(chunk, map_prompt) for chunk in chunks]
map_results = await asyncio.gather(*map_tasks)
# REDUCE: Combine map results
# If too many results, reduce hierarchically
while len(map_results) > chunk_size:
reduce_chunks = [
map_results[i:i + chunk_size]
for i in range(0, len(map_results), chunk_size)
]
map_results = await asyncio.gather(*[
self.reduce_chunk(chunk, reduce_prompt)
for chunk in reduce_chunks
])
# Final reduction
return await self.reduce_chunk(map_results, reduce_prompt)
async def map_chunk(self, chunk, prompt):
async with self.semaphore:
return await self.llm.generate(
prompt=prompt.format(data=chunk),
)
async def reduce_chunk(self, results, prompt):
return await self.llm.generate(
prompt=prompt.format(results="\n---\n".join(results)),
)
# Example: Summarize a large document
processor = MapReduceProcessor(llm_client, max_concurrent=5)
summary = await processor.process(
items=document_chunks,
map_prompt="Summarize this section, preserving key facts and figures:\n{data}",
reduce_prompt="Combine these summaries into a single coherent summary:\n{results}",
)
When to use: Document summarization, large dataset analysis, code review across many files, aggregating information from multiple sources.
Pitfalls: Information loss during reduction (important details from individual chunks may be dropped), inconsistent interpretation across map workers, high total token cost.
Similar to map-reduce but each branch may perform a different task:
class FanOutFanIn:
"""Execute multiple different analyses in parallel, then synthesize"""
async def analyze_product_launch(self, product_spec):
# FAN-OUT: Different analyses in parallel
tasks = {
"market": self.analyze_market(product_spec),
"technical": self.analyze_technical_feasibility(product_spec),
"competitive": self.analyze_competition(product_spec),
"financial": self.analyze_unit_economics(product_spec),
"risk": self.assess_risks(product_spec),
}
results = {}
for name, coro in tasks.items():
results[name] = await coro # Could use gather for true parallelism
# FAN-IN: Synthesize all analyses
synthesis = await self.llm.generate(
system="You are a product strategy analyst.",
prompt=f"""Based on these independent analyses of a proposed product,
provide a comprehensive go/no-go recommendation:
Market Analysis: {results['market']}
Technical Feasibility: {results['technical']}
Competitive Landscape: {results['competitive']}
Unit Economics: {results['financial']}
Risk Assessment: {results['risk']}
Synthesize these into a unified recommendation with:
1. Overall recommendation (go/no-go/conditional)
2. Key opportunities
3. Critical risks
4. Recommended next steps""",
)
return synthesis
When to use: Multi-dimensional analysis, getting multiple "expert" perspectives, tasks where different aspects require different prompting strategies or even different models.
Sequential processing where each step's output feeds the next:
class ProcessingChain:
def __init__(self):
self.steps = []
def add_step(self, name, processor, validator=None):
self.steps.append({
"name": name,
"processor": processor,
"validator": validator,
})
return self
async def execute(self, input_data):
current = input_data
execution_log = []
for step in self.steps:
start = time.time()
try:
result = await step["processor"](current)
# Validate output before passing to next step
if step["validator"]:
validation = step["validator"](result)
if not validation.passed:
raise ValidationError(
f"Step '{step['name']}' validation failed: "
f"{validation.message}"
)
execution_log.append({
"step": step["name"],
"status": "success",
"duration_ms": (time.time() - start) * 1000,
"input_preview": str(current)[:200],
"output_preview": str(result)[:200],
})
current = result
except Exception as e:
execution_log.append({
"step": step["name"],
"status": "error",
"error": str(e),
"duration_ms": (time.time() - start) * 1000,
})
raise ChainError(step["name"], e, execution_log)
return current, execution_log
# Example: Content moderation pipeline
chain = ProcessingChain()
chain.add_step(
"classify",
lambda text: llm.classify(text, categories=["safe", "review", "block"]),
validator=lambda r: r in ["safe", "review", "block"],
)
chain.add_step(
"explain",
lambda classification: llm.explain(classification) if classification != "safe" else "Content approved",
)
chain.add_step(
"action",
lambda explanation: apply_moderation_action(explanation),
)
Route inputs to different processing paths based on classification:
class LLMRouter:
"""Route requests to specialized handlers based on intent"""
def __init__(self, routes: dict, fallback_handler=None):
self.routes = routes # {intent: handler}
self.fallback = fallback_handler
self.classifier = IntentClassifier()
async def route(self, request):
# Step 1: Classify the request
classification = await self.classifier.classify(
request,
categories=list(self.routes.keys()),
)
intent = classification.category
confidence = classification.confidence
# Step 2: Route based on classification
if confidence < 0.5 and self.fallback:
# Low confidence - use fallback (often a general-purpose LLM)
return await self.fallback(request)
handler = self.routes.get(intent, self.fallback)
if handler is None:
raise RoutingError(f"No handler for intent '{intent}'")
return await handler(request)
# Example: Customer support router
router = LLMRouter(
routes={
"billing": BillingHandler(model="gpt-4o-mini"), # Cheaper model for routine
"technical": TechnicalHandler(model="gpt-4o"), # Better model for complex
"complaint": ComplaintHandler(model="gpt-4o", # Best model + empathy prompt
escalation_enabled=True),
"general_info": FAQHandler(rag_enabled=True), # RAG for factual answers
},
fallback_handler=GeneralHandler(model="gpt-4o"),
)
Cost optimization: The router pattern lets you use expensive models only for complex tasks and cheap models for simple ones. A well-designed router can reduce costs by 50-70% compared to using the best model for everything.
Generate a result, evaluate its quality, and iteratively improve:
class EvaluatorOptimizer:
"""Generate, evaluate, and refine until quality threshold is met"""
def __init__(self, generator, evaluator, max_iterations=3):
self.generator = generator
self.evaluator = evaluator
self.max_iterations = max_iterations
async def produce(self, task, quality_threshold=0.8):
result = await self.generator(task)
evaluation_history = []
for iteration in range(self.max_iterations):
# Evaluate the current result
evaluation = await self.evaluator(task, result)
evaluation_history.append(evaluation)
if evaluation.score >= quality_threshold:
return {
"result": result,
"iterations": iteration + 1,
"final_score": evaluation.score,
"history": evaluation_history,
}
# Refine based on feedback
result = await self.generator(
task,
previous_attempt=result,
feedback=evaluation.feedback,
instruction=f"Address these issues: {evaluation.issues}",
)
# Return best attempt even if threshold not met
best_idx = max(range(len(evaluation_history)),
key=lambda i: evaluation_history[i].score)
return {
"result": result,
"iterations": self.max_iterations,
"final_score": evaluation_history[-1].score,
"quality_warning": True,
}
Production AI workflows need orchestration beyond simple function calls:
| Framework | Strengths | Best For |
|---|---|---|
| Temporal | Durable execution, fault tolerance | Long-running, mission-critical workflows |
| Prefect | Python-native, easy monitoring | Data-oriented AI pipelines |
| LangGraph | LLM-native, built-in agent loops | Complex agent workflows |
| Inngest | Event-driven, serverless-friendly | Event-triggered AI processing |
| Custom (Redis + workers) | Full control, minimal overhead | Simple, high-volume pipelines |
Temporal provides durable execution - if a step fails or a worker crashes, the workflow resumes from where it left off:
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
# Activities are the units of work
@activity.defn
async def extract_text(document_url: str) -> str:
"""Download and extract text from a document"""
response = await httpx.get(document_url)
return extract_text_from_pdf(response.content)
@activity.defn
async def generate_summary(text: str) -> str:
"""Generate an AI summary with retry logic"""
return await llm_client.generate(
prompt=f"Summarize:\n{text}",
model="gpt-4o",
)
@activity.defn
async def store_result(document_id: str, summary: str) -> None:
"""Store the summary in the database"""
await db.execute(
"UPDATE documents SET summary = $1 WHERE id = $2",
summary, document_id,
)
# Workflow orchestrates the activities
@workflow.defn
class DocumentSummaryWorkflow:
@workflow.run
async def run(self, document_id: str, document_url: str) -> str:
# Step 1: Extract text (retry up to 3 times)
text = await workflow.execute_activity(
extract_text,
document_url,
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
backoff_coefficient=2.0,
),
)
# Step 2: Generate summary (retry with different strategy for LLM)
summary = await workflow.execute_activity(
generate_summary,
text,
start_to_close_timeout=timedelta(minutes=2),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=5),
non_retryable_error_types=["ContentPolicyViolation"],
),
)
# Step 3: Store result
await workflow.execute_activity(
store_result,
args=[document_id, summary],
start_to_close_timeout=timedelta(seconds=30),
)
return summary
LLM API calls fail in ways that require specific handling:
class LLMRetryHandler:
"""Retry handler with LLM-specific error classification"""
# Error categories and strategies
RETRY_STRATEGIES = {
# Rate limits: exponential backoff with jitter
"rate_limit": {
"max_retries": 5,
"base_delay": 1.0,
"max_delay": 60.0,
"backoff": "exponential_with_jitter",
},
# Transient server errors: quick retry
"server_error": {
"max_retries": 3,
"base_delay": 0.5,
"max_delay": 10.0,
"backoff": "exponential",
},
# Timeout: retry with shorter input or different model
"timeout": {
"max_retries": 2,
"base_delay": 1.0,
"fallback": "reduce_input_or_switch_model",
},
# Context length exceeded: truncate and retry
"context_length": {
"max_retries": 1,
"fallback": "truncate_input",
},
# Content filter: do not retry (deterministic failure)
"content_filter": {
"max_retries": 0,
"action": "log_and_skip",
},
# Invalid response format: retry with stricter prompt
"parse_error": {
"max_retries": 3,
"fallback": "add_format_instructions",
},
}
async def call_with_retry(self, func, *args, **kwargs):
last_error = None
strategy = None
for attempt in range(5):
try:
result = await func(*args, **kwargs)
return result
except RateLimitError as e:
strategy = self.RETRY_STRATEGIES["rate_limit"]
last_error = e
except TimeoutError as e:
strategy = self.RETRY_STRATEGIES["timeout"]
last_error = e
# Reduce input on timeout
if "messages" in kwargs:
kwargs["messages"] = self.truncate_messages(kwargs["messages"])
except ContextLengthError as e:
strategy = self.RETRY_STRATEGIES["context_length"]
kwargs["messages"] = self.truncate_messages(kwargs["messages"], aggressive=True)
last_error = e
except ContentFilterError as e:
# Don't retry - this is a policy decision
raise
except json.JSONDecodeError as e:
strategy = self.RETRY_STRATEGIES["parse_error"]
# Add explicit format instructions
kwargs["messages"][-1]["content"] += (
"\n\nIMPORTANT: Respond with valid JSON only. "
"No markdown, no explanation."
)
last_error = e
if attempt >= strategy["max_retries"]:
break
delay = self.compute_delay(attempt, strategy)
await asyncio.sleep(delay)
raise MaxRetriesExceeded(last_error)
def compute_delay(self, attempt, strategy):
base = strategy["base_delay"]
max_delay = strategy.get("max_delay", 60.0)
if strategy.get("backoff") == "exponential_with_jitter":
delay = min(base * (2 ** attempt), max_delay)
delay *= (0.5 + random.random()) # Add jitter
else:
delay = min(base * (2 ** attempt), max_delay)
return delay
When a primary model fails, fall back to alternatives:
class ModelFallbackChain:
def __init__(self, models):
"""
models: ordered list of (model_name, client, config)
First model is preferred; later models are fallbacks.
"""
self.models = models
async def generate(self, **kwargs):
errors = []
for model_name, client, config in self.models:
try:
result = await client.generate(
model=model_name,
**{**kwargs, **config},
)
if model_name != self.models[0][0]:
logger.warning(
f"Used fallback model {model_name} "
f"(primary failed: {errors[-1]})"
)
return result
except Exception as e:
errors.append(f"{model_name}: {e}")
continue
raise AllModelsFailed(errors)
# Example: GPT-4o -> Claude -> GPT-4o-mini -> local model
fallback = ModelFallbackChain([
("gpt-4o", openai_client, {"temperature": 0.7}),
("claude-sonnet-4-20250514", anthropic_client, {"temperature": 0.7}),
("gpt-4o-mini", openai_client, {"temperature": 0.5}),
("llama-3-8b", local_client, {"temperature": 0.7}),
])
LLM calls are inherently non-deterministic - the same prompt can produce different outputs. For workflows that may be retried (due to failures, timeouts, or infrastructure issues), idempotency ensures operations are not duplicated:
class IdempotentAIOperation:
def __init__(self, cache_store):
self.cache = cache_store # Redis, DynamoDB, etc.
def idempotency_key(self, operation, inputs):
"""Generate a deterministic key for this operation + inputs"""
content = json.dumps({
"operation": operation,
"inputs": inputs,
}, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
async def execute_once(self, key, operation_fn, *args, **kwargs):
"""Execute an operation at most once for a given idempotency key"""
# Check if already executed
cached = await self.cache.get(f"idempotent:{key}")
if cached:
return json.loads(cached)
# Acquire lock to prevent concurrent execution
lock = await self.cache.acquire_lock(f"lock:{key}", ttl=300)
if not lock:
# Another worker is executing this - wait for result
return await self.wait_for_result(key, timeout=300)
try:
result = await operation_fn(*args, **kwargs)
# Cache the result
await self.cache.set(
f"idempotent:{key}",
json.dumps(result),
ex=86400, # 24-hour TTL
)
return result
finally:
await self.cache.release_lock(f"lock:{key}")
# Usage: Ensure an email is only generated and sent once
idempotent = IdempotentAIOperation(redis_client)
key = idempotent.idempotency_key("welcome_email", {"user_id": "123"})
email_content = await idempotent.execute_once(
key,
generate_personalized_email,
user_id="123",
)
Beyond idempotency, caching identical requests saves cost and reduces latency:
class LLMCache:
def __init__(self, store, default_ttl=3600):
self.store = store
self.default_ttl = default_ttl
def cache_key(self, model, messages, temperature, **kwargs):
"""Only cache deterministic-ish requests (low temperature)"""
if temperature > 0.1:
return None # Don't cache high-temperature requests
content = json.dumps({
"model": model,
"messages": messages,
"temperature": temperature,
**{k: v for k, v in kwargs.items() if k in self.cacheable_params},
}, sort_keys=True)
return f"llm_cache:{hashlib.sha256(content.encode()).hexdigest()}"
async def cached_generate(self, client, **kwargs):
key = self.cache_key(**kwargs)
if key:
cached = await self.store.get(key)
if cached:
return json.loads(cached)
result = await client.generate(**kwargs)
if key:
await self.store.set(key, json.dumps(result), ex=self.default_ttl)
return result
Feature flags give you fine-grained control over AI behavior without deployments:
class AIFeatureFlags:
"""Feature flag system for AI behavior control"""
def __init__(self, flag_provider):
self.flags = flag_provider # LaunchDarkly, Flagsmith, etc.
def get_model_config(self, feature, user_context=None):
"""Get the model configuration for a feature"""
return {
"model": self.flags.get_string(
f"ai.{feature}.model",
default="gpt-4o-mini",
user=user_context,
),
"temperature": self.flags.get_float(
f"ai.{feature}.temperature",
default=0.7,
),
"max_tokens": self.flags.get_int(
f"ai.{feature}.max_tokens",
default=1000,
),
"enabled": self.flags.get_bool(
f"ai.{feature}.enabled",
default=True,
),
"system_prompt_version": self.flags.get_string(
f"ai.{feature}.prompt_version",
default="v1",
),
}
def get_prompt(self, feature, version=None):
"""Load versioned prompts from flag system"""
version = version or self.get_model_config(feature)["system_prompt_version"]
return self.flags.get_string(f"ai.{feature}.prompts.{version}")
# Usage:
flags = AIFeatureFlags(launchdarkly_client)
config = flags.get_model_config("summarization", user_context=current_user)
if config["enabled"]:
summary = await llm.generate(
model=config["model"],
temperature=config["temperature"],
max_tokens=config["max_tokens"],
system=flags.get_prompt("summarization", config["system_prompt_version"]),
messages=[{"role": "user", "content": document_text}],
)
Rolling out AI features gradually reduces risk:
class GradualRollout:
def __init__(self, flag_provider):
self.flags = flag_provider
def should_use_new_model(self, user_id, feature):
"""Gradually roll out a new model"""
rollout_pct = self.flags.get_float(f"ai.{feature}.new_model_pct", default=0)
# Deterministic: same user always gets same treatment
user_hash = int(hashlib.md5(
f"{user_id}:{feature}".encode()
).hexdigest(), 16) % 100
return user_hash < rollout_pct
async def generate_with_rollout(self, user_id, feature, **kwargs):
if self.should_use_new_model(user_id, feature):
config = self.flags.get_model_config(f"{feature}.new")
source = "new"
else:
config = self.flags.get_model_config(f"{feature}.current")
source = "current"
result = await self.llm.generate(**{**kwargs, **config})
# Track which model served each request for analysis
self.metrics.emit("ai_model_served", {
"feature": feature,
"source": source,
"model": config["model"],
"user_id": user_id,
})
return result
A/B testing AI features is harder than testing UI changes because:
class AIExperiment:
def __init__(self, experiment_id, variants):
self.id = experiment_id
self.variants = variants # {"control": config_a, "treatment": config_b}
def assign_user(self, user_id):
"""Deterministic assignment based on user_id"""
hash_val = int(hashlib.sha256(
f"{self.id}:{user_id}".encode()
).hexdigest(), 16)
variant_index = hash_val % len(self.variants)
return list(self.variants.keys())[variant_index]
async def run(self, user_id, input_data):
variant = self.assign_user(user_id)
config = self.variants[variant]
start = time.time()
result = await self.llm.generate(**config, messages=input_data)
latency = time.time() - start
# Log experiment data
self.log_exposure(user_id, variant, {
"latency_ms": latency * 1000,
"output_tokens": result.usage.completion_tokens,
"input_tokens": result.usage.prompt_tokens,
"cost": self.compute_cost(result.usage, config["model"]),
})
return result
def analyze(self, metric="task_completion_rate"):
"""Analyze experiment results with statistical rigor"""
control_data = self.get_metric_data("control", metric)
treatment_data = self.get_metric_data("treatment", metric)
# Check for statistical significance
stat, p_value = scipy.stats.mannwhitneyu(control_data, treatment_data)
# Effect size (Cohen's d)
effect_size = (
np.mean(treatment_data) - np.mean(control_data)
) / np.sqrt(
(np.var(control_data) + np.var(treatment_data)) / 2
)
return {
"control_mean": np.mean(control_data),
"treatment_mean": np.mean(treatment_data),
"p_value": p_value,
"effect_size": effect_size,
"significant": p_value < 0.05,
"sample_size": {
"control": len(control_data),
"treatment": len(treatment_data),
},
}
AI systems have unique architectural decisions that should be documented:
# ADR-007: LLM Provider Strategy
## Status: Accepted
## Context
We need to choose how to handle LLM provider selection for our
AI features. Options include single-provider, multi-provider with
manual switching, or automated multi-provider with routing.
## Decision
We will implement a multi-provider architecture with:
1. Primary provider: OpenAI (GPT-4o) for quality-critical paths
2. Secondary provider: Anthropic (Claude) for long-context tasks
3. Fallback provider: Local Llama for non-sensitive, high-volume tasks
4. Automated routing based on task type, cost budget, and latency SLA
## Rationale
- Avoids vendor lock-in
- Enables cost optimization per task type
- Provides resilience against single-provider outages
- Different models excel at different tasks
## Consequences
- Increased operational complexity (monitoring multiple providers)
- Need standardized prompt format that works across providers
- Must maintain provider-specific prompt optimizations
- Additional latency for routing decisions (~5ms)
## Review Date: 2025-06-01
Key AI-specific decisions to document:
Every LLM call should emit latency, token usage, cost, and error-type metrics, tagged by model and feature. Without this telemetry, cost optimization and quality improvement are flying blind.
Tip: Tag every metric with both
modelandfeature_name. This lets you answer "which feature is driving our cost spike?" and "which model variant is slowest for our summarization pipeline?" โ questions that aggregated metrics alone cannot answer.
class AIObservability:
def __init__(self, metrics_client, trace_client):
self.metrics = metrics_client
self.traces = trace_client
def wrap_llm_call(self, func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
span = self.traces.start_span("llm_call", {
"model": kwargs.get("model"),
"feature": kwargs.get("feature_name"),
})
start = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start
# Emit metrics
self.metrics.histogram("llm.latency_ms", duration * 1000, {
"model": kwargs.get("model"),
"feature": kwargs.get("feature_name"),
})
self.metrics.counter("llm.tokens.input", result.usage.prompt_tokens)
self.metrics.counter("llm.tokens.output", result.usage.completion_tokens)
self.metrics.counter("llm.cost_cents",
self.compute_cost_cents(result.usage, kwargs.get("model")))
self.metrics.counter("llm.requests.success", 1)
span.set_attribute("tokens.total",
result.usage.total_tokens)
span.set_status("ok")
return result
except Exception as e:
self.metrics.counter("llm.requests.error", 1, {
"error_type": type(e).__name__,
})
span.set_status("error", str(e))
raise
finally:
span.end()
return wrapper
Fully autonomous AI pipelines are a design goal, not a starting point. Most production systems need human oversight at critical junctures:
The challenge is designing intervention points that improve reliability without creating bottlenecks.
Note: The goal is not to minimize human involvement โ it is to direct human attention precisely where it adds the most value. Systems that route 80% of requests automatically while escalating the critical 20% typically outperform both fully autonomous and fully manual approaches.
The simplest human-in-the-loop pattern routes requests based on model confidence. Below a threshold, the system escalates to a human rather than acting on an uncertain prediction:
class ConfidenceRouter:
"""Route to human review when model confidence is below threshold"""
def __init__(self, model, review_queue, thresholds=None):
self.model = model
self.review_queue = review_queue
self.thresholds = thresholds or {
"auto_approve": 0.95,
"auto_reject": 0.10,
# Everything between goes to human review
}
async def process(self, request):
result = await self.model.predict(request)
if result.confidence >= self.thresholds["auto_approve"]:
return Action(type="auto", decision=result.label, confidence=result.confidence)
elif result.confidence <= self.thresholds["auto_reject"]:
return Action(type="auto_reject", decision="rejected", confidence=result.confidence)
else:
# Queue for human review with model's suggestion
ticket = await self.review_queue.enqueue({
"request": request,
"model_suggestion": result.label,
"model_confidence": result.confidence,
"model_reasoning": result.explanation,
"priority": self.compute_priority(result),
})
return Action(type="pending_review", ticket_id=ticket.id)
def compute_priority(self, result):
"""Higher priority for borderline cases and high-value requests"""
confidence_urgency = 1.0 - abs(result.confidence - 0.5) * 2
return confidence_urgency * result.request_value
For operations with irreversible consequences -- sending emails to customers, executing financial transactions, publishing content -- approval workflows gate AI output behind human sign-off:
class ApprovalWorkflow:
"""Gate AI actions behind human approval for high-stakes operations"""
RISK_LEVELS = {
"low": {"auto_approve": True, "reviewers_required": 0},
"medium": {"auto_approve": False, "reviewers_required": 1},
"high": {"auto_approve": False, "reviewers_required": 2},
"critical": {"auto_approve": False, "reviewers_required": 2, "requires_senior": True},
}
async def submit(self, ai_output, operation_type, context):
risk = await self.assess_risk(ai_output, operation_type, context)
policy = self.RISK_LEVELS[risk]
if policy["auto_approve"]:
await self.execute(ai_output)
return {"status": "auto_approved", "risk": risk}
approval_request = await self.create_approval_request(
output=ai_output,
risk_level=risk,
reviewers_required=policy["reviewers_required"],
requires_senior=policy.get("requires_senior", False),
expires_at=datetime.utcnow() + timedelta(hours=4),
)
# Notify reviewers via Slack, email, or dashboard
await self.notify_reviewers(approval_request)
return {"status": "pending_approval", "request_id": approval_request.id}
async def assess_risk(self, ai_output, operation_type, context):
"""Classify risk using rules + model judgment"""
# Rule-based checks first
if operation_type in ("financial_transaction", "legal_document"):
return "critical"
if context.get("customer_tier") == "enterprise":
return "high"
# Model-based risk assessment for everything else
risk_assessment = await self.risk_model.classify(
output=ai_output,
operation=operation_type,
categories=["low", "medium", "high"],
)
return risk_assessment.label
Beyond confidence thresholds, specific content patterns or operational conditions should trigger human escalation. Design these as composable rules:
class EscalationEngine:
def __init__(self):
self.triggers = []
def add_trigger(self, name, condition, priority="medium"):
self.triggers.append({"name": name, "condition": condition, "priority": priority})
async def check(self, request, ai_response):
fired = []
for trigger in self.triggers:
if await trigger["condition"](request, ai_response):
fired.append(trigger)
return sorted(fired, key=lambda t: {"critical": 0, "high": 1, "medium": 2}[t["priority"]])
# Example triggers
engine = EscalationEngine()
engine.add_trigger(
"mentions_legal",
lambda req, resp: any(term in resp.text.lower() for term in ["lawsuit", "liability", "attorney"]),
priority="high",
)
engine.add_trigger(
"sentiment_negative",
lambda req, resp: resp.sentiment_score < -0.7,
priority="medium",
)
engine.add_trigger(
"high_token_output",
lambda req, resp: resp.usage.completion_tokens > 2000,
priority="medium",
)
The key principle is that human-in-the-loop is not a failure mode -- it is a design pattern. Systems that route 80% of requests automatically while escalating the remaining 20% typically outperform both fully autonomous and fully manual approaches. For how to observe and trace these routing decisions in production, see Article 40 -- Observability.
Traditional software testing asserts that f(x) == y. LLM-based systems break this contract: the same prompt can produce different outputs on every call. Testing strategies must shift from exact match verification to property-based and statistical assertions.
Tip: Run your LLM test suite with a fixed
seedandtemperature=0to maximize reproducibility in CI. Then separately run statistical tests (multiple trials) on temperature > 0 configurations to catch distribution-level regressions.
Instead of testing for specific outputs, test that outputs satisfy invariant properties:
import pytest
from hypothesis import given, strategies as st
class TestLLMProperties:
"""Test invariant properties of LLM outputs, not exact values"""
async def test_json_output_always_valid(self, llm_client):
"""Property: structured output endpoint always returns valid JSON"""
prompts = [
"List three countries and their capitals",
"Describe the water cycle in structured format",
"Generate a product catalog entry for a laptop",
]
for prompt in prompts:
result = await llm_client.generate(
prompt=prompt,
response_format={"type": "json_object"},
)
parsed = json.loads(result.text) # Must not raise
assert isinstance(parsed, dict)
async def test_classification_within_expected_labels(self):
"""Property: classifier outputs only valid categories"""
VALID_LABELS = {"positive", "negative", "neutral"}
test_inputs = load_test_corpus("sentiment_samples.jsonl")
for sample in test_inputs:
result = await classifier.classify(sample.text)
assert result.label in VALID_LABELS, (
f"Unexpected label '{result.label}' for input: {sample.text[:80]}"
)
assert 0.0 <= result.confidence <= 1.0
async def test_summary_shorter_than_input(self):
"""Property: summaries should always be shorter than source text"""
documents = load_test_corpus("documents.jsonl")
for doc in documents:
summary = await summarizer.summarize(doc.text)
assert len(summary) < len(doc.text), "Summary should be shorter than input"
assert len(summary) > 50, "Summary should not be trivially short"
async def test_idempotent_at_zero_temperature(self):
"""Property: temperature=0 should produce near-identical outputs"""
prompt = "What is 2 + 2? Reply with just the number."
results = set()
for _ in range(5):
result = await llm_client.generate(prompt=prompt, temperature=0)
results.add(result.text.strip())
assert len(results) == 1, f"Expected deterministic output, got: {results}"
Exact snapshot matching does not work for LLM outputs. Instead, use semantic similarity to detect regressions -- if a prompt change causes outputs to drift beyond a threshold, the test fails:
from sentence_transformers import SentenceTransformer
class SemanticSnapshotTest:
def __init__(self, snapshot_dir, similarity_threshold=0.85):
self.snapshot_dir = snapshot_dir
self.threshold = similarity_threshold
self.embedder = SentenceTransformer("all-MiniLM-L6-v2")
async def assert_semantically_similar(self, test_name, current_output):
snapshot_path = os.path.join(self.snapshot_dir, f"{test_name}.json")
if not os.path.exists(snapshot_path):
# First run: save snapshot
self.save_snapshot(snapshot_path, current_output)
return
saved = self.load_snapshot(snapshot_path)
similarity = self.cosine_similarity(
self.embedder.encode(saved["output"]),
self.embedder.encode(current_output),
)
assert similarity >= self.threshold, (
f"Semantic drift detected for '{test_name}': "
f"similarity={similarity:.3f}, threshold={self.threshold}\n"
f"Saved: {saved['output'][:200]}\n"
f"Current: {current_output[:200]}"
)
For non-deterministic outputs, run multiple trials and assert on distributions rather than individual results:
class StatisticalAssertions:
@staticmethod
async def assert_pass_rate(test_fn, n_trials=20, min_pass_rate=0.90):
"""Run a test multiple times and assert a minimum pass rate"""
results = []
for _ in range(n_trials):
try:
await test_fn()
results.append(True)
except AssertionError:
results.append(False)
pass_rate = sum(results) / len(results)
assert pass_rate >= min_pass_rate, (
f"Pass rate {pass_rate:.0%} below minimum {min_pass_rate:.0%} "
f"over {n_trials} trials"
)
@staticmethod
async def assert_quality_distribution(eval_fn, samples, min_median=0.7, min_p10=0.4):
"""Assert that quality scores meet distribution requirements"""
scores = [await eval_fn(sample) for sample in samples]
scores.sort()
median = scores[len(scores) // 2]
p10 = scores[len(scores) // 10]
assert median >= min_median, f"Median quality {median:.2f} below threshold {min_median}"
assert p10 >= min_p10, f"P10 quality {p10:.2f} below threshold {min_p10}"
These testing patterns integrate naturally into CI/CD pipelines. For the full picture of regression testing, continuous evaluation, and eval-driven development workflows, see Article 36 -- CI/CD for AI. For the evaluation metrics and methodology that underpin these test assertions, see Article 31 -- Eval Fundamentals.
Prompts are not strings -- they are configuration artifacts with versioning, deployment, and lifecycle requirements comparable to feature flags or database migrations. Treating prompts as code checked into version control is a good start, but production systems need more: A/B testing of prompt variants, rollback on quality regression, and separation of prompt authoring from application deployment.
Note: A prompt change that ships silently inside a code deployment is an untested change to a critical system component. Prompt management systems make this visible, measurable, and reversible.
A prompt management system separates prompts from application code, enabling non-engineers (domain experts, content teams) to iterate on prompts without triggering a full deployment cycle:
class PromptRegistry:
"""Central registry for versioned, deployable prompts"""
def __init__(self, store):
self.store = store # Database, Langfuse, or config service
async def get_prompt(self, name, *, version=None, label="production"):
"""Fetch a prompt by name, optionally pinned to a version or label"""
if version:
return await self.store.get_prompt_version(name, version)
# Labels allow promoting prompts through environments:
# "draft" -> "staging" -> "production"
return await self.store.get_prompt_by_label(name, label)
async def create_version(self, name, template, metadata=None):
"""Create a new prompt version (immutable once created)"""
version = await self.store.create_prompt_version(
name=name,
template=template,
metadata={
"author": metadata.get("author"),
"change_reason": metadata.get("change_reason"),
"created_at": datetime.utcnow().isoformat(),
**(metadata or {}),
},
)
return version
async def promote(self, name, version, from_label, to_label):
"""Promote a prompt version from one environment to another"""
# Validate that it was tested in the source environment
prompt = await self.store.get_prompt_version(name, version)
if from_label not in prompt.labels:
raise ValueError(f"Prompt {name}@{version} not labeled '{from_label}'")
await self.store.add_label(name, version, to_label)
# Remove the label from any previous version
await self.store.remove_label_from_others(name, version, to_label)
Langfuse provides built-in prompt management that links prompts to traces, enabling direct correlation between prompt versions and quality metrics. This is covered in depth in Article 40 -- Observability, but the deployment pattern is worth highlighting here:
from langfuse import Langfuse
langfuse = Langfuse()
# Fetch the production prompt -- Langfuse handles versioning
prompt = langfuse.get_prompt("customer-support-router", label="production")
# The prompt object carries its version and metadata
print(f"Using prompt version: {prompt.version}, updated: {prompt.config.get('updated_at')}")
# Compile the prompt with variables
compiled = prompt.compile(
customer_name=customer.name,
issue_category=ticket.category,
account_tier=customer.tier,
)
# When used with tracing, Langfuse links this prompt version to every
# trace that uses it -- enabling quality analysis per prompt version
generation = langfuse.generation(
name="support-response",
prompt=prompt, # Links trace to prompt version
input=compiled,
)
Prompt A/B testing extends the experiment framework from earlier in this article. The critical addition is linking experiment variants to prompt versions so that quality regressions are traceable to specific prompt changes:
class PromptExperiment:
def __init__(self, prompt_registry, experiment_id):
self.registry = prompt_registry
self.experiment_id = experiment_id
async def run(self, user_id, input_data, prompt_name, variants):
"""
variants: {"control": "v12", "treatment": "v13"}
Maps variant names to prompt versions.
"""
variant = self.assign_variant(user_id)
prompt_version = variants[variant]
prompt = await self.registry.get_prompt(prompt_name, version=prompt_version)
compiled = prompt.compile(**input_data)
result = await self.llm.generate(messages=[{"role": "user", "content": compiled}])
# Log for analysis: links experiment -> prompt version -> output quality
await self.log_experiment_exposure(
experiment_id=self.experiment_id,
user_id=user_id,
variant=variant,
prompt_name=prompt_name,
prompt_version=prompt_version,
result_id=result.id,
)
return result
For how prompt A/B tests fit into a broader gateway routing architecture, see Article 42 -- AI Gateways.
Getting an LLM to return valid JSON in a demo is easy. Guaranteeing schema compliance at scale โ across thousands of requests per hour, with varying input complexity and multiple model providers โ is a production engineering problem.
The tooling has matured significantly: Instructor, Outlines, and native JSON mode each offer different tradeoff points between flexibility, reliability, and vendor coupling. For the foundational techniques (JSON mode, constrained decoding, tool-schema tricks), see Article 10 -- Structured Output. This section focuses on the production patterns that wrap those techniques.
Instructor patches LLM client libraries to return validated Pydantic models instead of raw strings. It handles retries on validation failure automatically:
import instructor
from pydantic import BaseModel, Field, field_validator
from openai import AsyncOpenAI
client = instructor.from_openai(AsyncOpenAI())
class ExtractedEntity(BaseModel):
name: str = Field(description="Entity name")
entity_type: str = Field(description="Type: person, organization, location, or product")
confidence: float = Field(ge=0.0, le=1.0, description="Extraction confidence")
@field_validator("entity_type")
@classmethod
def validate_entity_type(cls, v):
allowed = {"person", "organization", "location", "product"}
if v.lower() not in allowed:
raise ValueError(f"entity_type must be one of {allowed}, got '{v}'")
return v.lower()
class ExtractionResult(BaseModel):
entities: list[ExtractedEntity]
summary: str = Field(max_length=500)
# Instructor validates the response against the Pydantic model.
# On validation failure, it automatically retries with the error message
# injected into the prompt, giving the model a chance to self-correct.
result = await client.chat.completions.create(
model="gpt-4o",
response_model=ExtractionResult,
max_retries=3,
messages=[{"role": "user", "content": f"Extract entities from: {document_text}"}],
)
# result is a validated ExtractionResult instance -- guaranteed by Pydantic
assert isinstance(result, ExtractionResult)
for entity in result.entities:
assert 0.0 <= entity.confidence <= 1.0
Even with structured output guarantees from the model, a production system should validate outputs at the application boundary. Defense in depth:
class OutputValidator:
"""Validate LLM outputs before they reach downstream consumers"""
def __init__(self, schema_registry):
self.schemas = schema_registry
async def validate_and_repair(self, output, schema_name, repair_attempts=2):
schema = self.schemas.get(schema_name)
# Attempt 1: Direct validation
try:
return schema.model_validate_json(output)
except ValidationError as first_error:
pass
# Attempt 2: Common repairs (strip markdown fences, fix trailing commas)
cleaned = self.clean_common_issues(output)
try:
return schema.model_validate_json(cleaned)
except ValidationError:
pass
# Attempt 3: Ask the LLM to fix its own output
for attempt in range(repair_attempts):
repair_prompt = (
f"The following JSON is invalid according to the schema.\n"
f"Schema: {schema.model_json_schema()}\n"
f"Invalid JSON: {output}\n"
f"Validation error: {first_error}\n"
f"Return only the corrected JSON."
)
output = await self.llm.generate(prompt=repair_prompt, temperature=0)
try:
return schema.model_validate_json(output)
except ValidationError:
continue
raise StructuredOutputError(
f"Could not produce valid {schema_name} after {repair_attempts + 2} attempts"
)
def clean_common_issues(self, text):
"""Fix common LLM JSON formatting issues"""
# Strip markdown code fences
text = re.sub(r"^```(?:json)?\s*\n?", "", text.strip())
text = re.sub(r"\n?```\s*$", "", text)
# Remove trailing commas before closing brackets
text = re.sub(r",\s*([}\]])", r"\1", text)
return text.strip()
The validate-and-repair pattern is essential because no single technique guarantees 100% schema compliance across all models, all inputs, and all edge cases. Instructor handles the common case; the validation layer catches the rest.