โ† all lessons/๐Ÿš€ Phase 6 ยท Ship on Cloudflare/#67
Lesson 5 of 11 in Phase 6 ยท Ship on Cloudflare

Production AI Patterns: Workflows, Pipelines & Architecture

๐Ÿš€ Phase 6 ยท Ship on CloudflareIntermediate~27 min read
Recommended prerequisite:#66 LLM Serving: API Design, Batching & Streaming

Deploying AI systems in production requires a fundamentally different mindset from prototyping. Prompt engineering in a notebook is not software engineering. Production AI systems must handle failures gracefully, scale predictably, produce consistent results, and operate within cost budgets. This article catalogs the essential architectural patterns, workflow orchestration strategies, and operational practices that distinguish reliable production AI from fragile demos.

TL;DR

  • Four core patterns cover most production AI workflows: map-reduce (parallel decomposition), fan-out/fan-in (multi-perspective synthesis), chain (sequential pipeline), and router (task routing)
  • The router pattern is the highest-leverage cost optimization: routing simple tasks to cheaper models can reduce costs by 50โ€“70% with minimal quality impact
  • Human-in-the-loop is a design pattern, not a failure mode: confidence-based routing and approval workflows let you automate the safe majority while protecting against the risky minority
  • Prompt management needs version control, staging environments, and A/B testing โ€” treat prompts as deployable artifacts, not strings in your source code
  • Testing non-deterministic systems requires property-based and statistical assertions over multiple trials, not exact-match checks

Core Production AI Patterns

The Map-Reduce Pattern

When processing data that exceeds a single LLM context window, map-reduce decomposes the task into parallelizable chunks:

python
import asyncio
from typing import TypeVar, Callable, List

T = TypeVar('T')

class MapReduceProcessor:
    def __init__(self, llm_client, max_concurrent=10):
        self.llm = llm_client
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def process(
        self,
        items: List[T],
        map_prompt: str,
        reduce_prompt: str,
        chunk_size: int = 5,
    ) -> str:
        # MAP: Process each chunk independently
        chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
        map_tasks = [self.map_chunk(chunk, map_prompt) for chunk in chunks]
        map_results = await asyncio.gather(*map_tasks)

        # REDUCE: Combine map results
        # If too many results, reduce hierarchically
        while len(map_results) > chunk_size:
            reduce_chunks = [
                map_results[i:i + chunk_size]
                for i in range(0, len(map_results), chunk_size)
            ]
            map_results = await asyncio.gather(*[
                self.reduce_chunk(chunk, reduce_prompt)
                for chunk in reduce_chunks
            ])

        # Final reduction
        return await self.reduce_chunk(map_results, reduce_prompt)

    async def map_chunk(self, chunk, prompt):
        async with self.semaphore:
            return await self.llm.generate(
                prompt=prompt.format(data=chunk),
            )

    async def reduce_chunk(self, results, prompt):
        return await self.llm.generate(
            prompt=prompt.format(results="\n---\n".join(results)),
        )

# Example: Summarize a large document
processor = MapReduceProcessor(llm_client, max_concurrent=5)
summary = await processor.process(
    items=document_chunks,
    map_prompt="Summarize this section, preserving key facts and figures:\n{data}",
    reduce_prompt="Combine these summaries into a single coherent summary:\n{results}",
)

When to use: Document summarization, large dataset analysis, code review across many files, aggregating information from multiple sources.

Pitfalls: Information loss during reduction (important details from individual chunks may be dropped), inconsistent interpretation across map workers, high total token cost.

The Fan-Out/Fan-In Pattern

Similar to map-reduce but each branch may perform a different task:

python
class FanOutFanIn:
    """Execute multiple different analyses in parallel, then synthesize"""

    async def analyze_product_launch(self, product_spec):
        # FAN-OUT: Different analyses in parallel
        tasks = {
            "market": self.analyze_market(product_spec),
            "technical": self.analyze_technical_feasibility(product_spec),
            "competitive": self.analyze_competition(product_spec),
            "financial": self.analyze_unit_economics(product_spec),
            "risk": self.assess_risks(product_spec),
        }

        results = {}
        for name, coro in tasks.items():
            results[name] = await coro  # Could use gather for true parallelism

        # FAN-IN: Synthesize all analyses
        synthesis = await self.llm.generate(
            system="You are a product strategy analyst.",
            prompt=f"""Based on these independent analyses of a proposed product,
provide a comprehensive go/no-go recommendation:

Market Analysis: {results['market']}
Technical Feasibility: {results['technical']}
Competitive Landscape: {results['competitive']}
Unit Economics: {results['financial']}
Risk Assessment: {results['risk']}

Synthesize these into a unified recommendation with:
1. Overall recommendation (go/no-go/conditional)
2. Key opportunities
3. Critical risks
4. Recommended next steps""",
        )

        return synthesis

When to use: Multi-dimensional analysis, getting multiple "expert" perspectives, tasks where different aspects require different prompting strategies or even different models.

The Chain Pattern

Sequential processing where each step's output feeds the next:

python
class ProcessingChain:
    def __init__(self):
        self.steps = []

    def add_step(self, name, processor, validator=None):
        self.steps.append({
            "name": name,
            "processor": processor,
            "validator": validator,
        })
        return self

    async def execute(self, input_data):
        current = input_data
        execution_log = []

        for step in self.steps:
            start = time.time()
            try:
                result = await step["processor"](current)

                # Validate output before passing to next step
                if step["validator"]:
                    validation = step["validator"](result)
                    if not validation.passed:
                        raise ValidationError(
                            f"Step '{step['name']}' validation failed: "
                            f"{validation.message}"
                        )

                execution_log.append({
                    "step": step["name"],
                    "status": "success",
                    "duration_ms": (time.time() - start) * 1000,
                    "input_preview": str(current)[:200],
                    "output_preview": str(result)[:200],
                })

                current = result

            except Exception as e:
                execution_log.append({
                    "step": step["name"],
                    "status": "error",
                    "error": str(e),
                    "duration_ms": (time.time() - start) * 1000,
                })
                raise ChainError(step["name"], e, execution_log)

        return current, execution_log

# Example: Content moderation pipeline
chain = ProcessingChain()
chain.add_step(
    "classify",
    lambda text: llm.classify(text, categories=["safe", "review", "block"]),
    validator=lambda r: r in ["safe", "review", "block"],
)
chain.add_step(
    "explain",
    lambda classification: llm.explain(classification) if classification != "safe" else "Content approved",
)
chain.add_step(
    "action",
    lambda explanation: apply_moderation_action(explanation),
)

The Router Pattern

Route inputs to different processing paths based on classification:

python
class LLMRouter:
    """Route requests to specialized handlers based on intent"""

    def __init__(self, routes: dict, fallback_handler=None):
        self.routes = routes  # {intent: handler}
        self.fallback = fallback_handler
        self.classifier = IntentClassifier()

    async def route(self, request):
        # Step 1: Classify the request
        classification = await self.classifier.classify(
            request,
            categories=list(self.routes.keys()),
        )

        intent = classification.category
        confidence = classification.confidence

        # Step 2: Route based on classification
        if confidence < 0.5 and self.fallback:
            # Low confidence - use fallback (often a general-purpose LLM)
            return await self.fallback(request)

        handler = self.routes.get(intent, self.fallback)
        if handler is None:
            raise RoutingError(f"No handler for intent '{intent}'")

        return await handler(request)

# Example: Customer support router
router = LLMRouter(
    routes={
        "billing": BillingHandler(model="gpt-4o-mini"),     # Cheaper model for routine
        "technical": TechnicalHandler(model="gpt-4o"),       # Better model for complex
        "complaint": ComplaintHandler(model="gpt-4o",        # Best model + empathy prompt
                                     escalation_enabled=True),
        "general_info": FAQHandler(rag_enabled=True),        # RAG for factual answers
    },
    fallback_handler=GeneralHandler(model="gpt-4o"),
)

Cost optimization: The router pattern lets you use expensive models only for complex tasks and cheap models for simple ones. A well-designed router can reduce costs by 50-70% compared to using the best model for everything.

The Evaluator-Optimizer Pattern

Generate a result, evaluate its quality, and iteratively improve:

python
class EvaluatorOptimizer:
    """Generate, evaluate, and refine until quality threshold is met"""

    def __init__(self, generator, evaluator, max_iterations=3):
        self.generator = generator
        self.evaluator = evaluator
        self.max_iterations = max_iterations

    async def produce(self, task, quality_threshold=0.8):
        result = await self.generator(task)
        evaluation_history = []

        for iteration in range(self.max_iterations):
            # Evaluate the current result
            evaluation = await self.evaluator(task, result)
            evaluation_history.append(evaluation)

            if evaluation.score >= quality_threshold:
                return {
                    "result": result,
                    "iterations": iteration + 1,
                    "final_score": evaluation.score,
                    "history": evaluation_history,
                }

            # Refine based on feedback
            result = await self.generator(
                task,
                previous_attempt=result,
                feedback=evaluation.feedback,
                instruction=f"Address these issues: {evaluation.issues}",
            )

        # Return best attempt even if threshold not met
        best_idx = max(range(len(evaluation_history)),
                      key=lambda i: evaluation_history[i].score)
        return {
            "result": result,
            "iterations": self.max_iterations,
            "final_score": evaluation_history[-1].score,
            "quality_warning": True,
        }

Workflow Orchestration

Choosing an Orchestration Framework

Production AI workflows need orchestration beyond simple function calls:

FrameworkStrengthsBest For
TemporalDurable execution, fault toleranceLong-running, mission-critical workflows
PrefectPython-native, easy monitoringData-oriented AI pipelines
LangGraphLLM-native, built-in agent loopsComplex agent workflows
InngestEvent-driven, serverless-friendlyEvent-triggered AI processing
Custom (Redis + workers)Full control, minimal overheadSimple, high-volume pipelines

Temporal for AI Workflows

Temporal provides durable execution - if a step fails or a worker crashes, the workflow resumes from where it left off:

python
from temporalio import workflow, activity
from temporalio.common import RetryPolicy

# Activities are the units of work
@activity.defn
async def extract_text(document_url: str) -> str:
    """Download and extract text from a document"""
    response = await httpx.get(document_url)
    return extract_text_from_pdf(response.content)

@activity.defn
async def generate_summary(text: str) -> str:
    """Generate an AI summary with retry logic"""
    return await llm_client.generate(
        prompt=f"Summarize:\n{text}",
        model="gpt-4o",
    )

@activity.defn
async def store_result(document_id: str, summary: str) -> None:
    """Store the summary in the database"""
    await db.execute(
        "UPDATE documents SET summary = $1 WHERE id = $2",
        summary, document_id,
    )

# Workflow orchestrates the activities
@workflow.defn
class DocumentSummaryWorkflow:
    @workflow.run
    async def run(self, document_id: str, document_url: str) -> str:
        # Step 1: Extract text (retry up to 3 times)
        text = await workflow.execute_activity(
            extract_text,
            document_url,
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=RetryPolicy(
                maximum_attempts=3,
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=30),
                backoff_coefficient=2.0,
            ),
        )

        # Step 2: Generate summary (retry with different strategy for LLM)
        summary = await workflow.execute_activity(
            generate_summary,
            text,
            start_to_close_timeout=timedelta(minutes=2),
            retry_policy=RetryPolicy(
                maximum_attempts=3,
                initial_interval=timedelta(seconds=5),
                non_retryable_error_types=["ContentPolicyViolation"],
            ),
        )

        # Step 3: Store result
        await workflow.execute_activity(
            store_result,
            args=[document_id, summary],
            start_to_close_timeout=timedelta(seconds=30),
        )

        return summary

Error Handling and Retry Strategies

LLM-Specific Error Types

LLM API calls fail in ways that require specific handling:

python
class LLMRetryHandler:
    """Retry handler with LLM-specific error classification"""

    # Error categories and strategies
    RETRY_STRATEGIES = {
        # Rate limits: exponential backoff with jitter
        "rate_limit": {
            "max_retries": 5,
            "base_delay": 1.0,
            "max_delay": 60.0,
            "backoff": "exponential_with_jitter",
        },
        # Transient server errors: quick retry
        "server_error": {
            "max_retries": 3,
            "base_delay": 0.5,
            "max_delay": 10.0,
            "backoff": "exponential",
        },
        # Timeout: retry with shorter input or different model
        "timeout": {
            "max_retries": 2,
            "base_delay": 1.0,
            "fallback": "reduce_input_or_switch_model",
        },
        # Context length exceeded: truncate and retry
        "context_length": {
            "max_retries": 1,
            "fallback": "truncate_input",
        },
        # Content filter: do not retry (deterministic failure)
        "content_filter": {
            "max_retries": 0,
            "action": "log_and_skip",
        },
        # Invalid response format: retry with stricter prompt
        "parse_error": {
            "max_retries": 3,
            "fallback": "add_format_instructions",
        },
    }

    async def call_with_retry(self, func, *args, **kwargs):
        last_error = None
        strategy = None

        for attempt in range(5):
            try:
                result = await func(*args, **kwargs)
                return result
            except RateLimitError as e:
                strategy = self.RETRY_STRATEGIES["rate_limit"]
                last_error = e
            except TimeoutError as e:
                strategy = self.RETRY_STRATEGIES["timeout"]
                last_error = e
                # Reduce input on timeout
                if "messages" in kwargs:
                    kwargs["messages"] = self.truncate_messages(kwargs["messages"])
            except ContextLengthError as e:
                strategy = self.RETRY_STRATEGIES["context_length"]
                kwargs["messages"] = self.truncate_messages(kwargs["messages"], aggressive=True)
                last_error = e
            except ContentFilterError as e:
                # Don't retry - this is a policy decision
                raise
            except json.JSONDecodeError as e:
                strategy = self.RETRY_STRATEGIES["parse_error"]
                # Add explicit format instructions
                kwargs["messages"][-1]["content"] += (
                    "\n\nIMPORTANT: Respond with valid JSON only. "
                    "No markdown, no explanation."
                )
                last_error = e

            if attempt >= strategy["max_retries"]:
                break

            delay = self.compute_delay(attempt, strategy)
            await asyncio.sleep(delay)

        raise MaxRetriesExceeded(last_error)

    def compute_delay(self, attempt, strategy):
        base = strategy["base_delay"]
        max_delay = strategy.get("max_delay", 60.0)

        if strategy.get("backoff") == "exponential_with_jitter":
            delay = min(base * (2 ** attempt), max_delay)
            delay *= (0.5 + random.random())  # Add jitter
        else:
            delay = min(base * (2 ** attempt), max_delay)

        return delay

Fallback Chains

When a primary model fails, fall back to alternatives:

python
class ModelFallbackChain:
    def __init__(self, models):
        """
        models: ordered list of (model_name, client, config)
        First model is preferred; later models are fallbacks.
        """
        self.models = models

    async def generate(self, **kwargs):
        errors = []
        for model_name, client, config in self.models:
            try:
                result = await client.generate(
                    model=model_name,
                    **{**kwargs, **config},
                )
                if model_name != self.models[0][0]:
                    logger.warning(
                        f"Used fallback model {model_name} "
                        f"(primary failed: {errors[-1]})"
                    )
                return result
            except Exception as e:
                errors.append(f"{model_name}: {e}")
                continue

        raise AllModelsFailed(errors)

# Example: GPT-4o -> Claude -> GPT-4o-mini -> local model
fallback = ModelFallbackChain([
    ("gpt-4o", openai_client, {"temperature": 0.7}),
    ("claude-sonnet-4-20250514", anthropic_client, {"temperature": 0.7}),
    ("gpt-4o-mini", openai_client, {"temperature": 0.5}),
    ("llama-3-8b", local_client, {"temperature": 0.7}),
])

Idempotency

Why AI Operations Need Idempotency

LLM calls are inherently non-deterministic - the same prompt can produce different outputs. For workflows that may be retried (due to failures, timeouts, or infrastructure issues), idempotency ensures operations are not duplicated:

python
class IdempotentAIOperation:
    def __init__(self, cache_store):
        self.cache = cache_store  # Redis, DynamoDB, etc.

    def idempotency_key(self, operation, inputs):
        """Generate a deterministic key for this operation + inputs"""
        content = json.dumps({
            "operation": operation,
            "inputs": inputs,
        }, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    async def execute_once(self, key, operation_fn, *args, **kwargs):
        """Execute an operation at most once for a given idempotency key"""

        # Check if already executed
        cached = await self.cache.get(f"idempotent:{key}")
        if cached:
            return json.loads(cached)

        # Acquire lock to prevent concurrent execution
        lock = await self.cache.acquire_lock(f"lock:{key}", ttl=300)
        if not lock:
            # Another worker is executing this - wait for result
            return await self.wait_for_result(key, timeout=300)

        try:
            result = await operation_fn(*args, **kwargs)

            # Cache the result
            await self.cache.set(
                f"idempotent:{key}",
                json.dumps(result),
                ex=86400,  # 24-hour TTL
            )

            return result
        finally:
            await self.cache.release_lock(f"lock:{key}")

# Usage: Ensure an email is only generated and sent once
idempotent = IdempotentAIOperation(redis_client)
key = idempotent.idempotency_key("welcome_email", {"user_id": "123"})
email_content = await idempotent.execute_once(
    key,
    generate_personalized_email,
    user_id="123",
)

Caching LLM Responses

Beyond idempotency, caching identical requests saves cost and reduces latency:

python
class LLMCache:
    def __init__(self, store, default_ttl=3600):
        self.store = store
        self.default_ttl = default_ttl

    def cache_key(self, model, messages, temperature, **kwargs):
        """Only cache deterministic-ish requests (low temperature)"""
        if temperature > 0.1:
            return None  # Don't cache high-temperature requests

        content = json.dumps({
            "model": model,
            "messages": messages,
            "temperature": temperature,
            **{k: v for k, v in kwargs.items() if k in self.cacheable_params},
        }, sort_keys=True)
        return f"llm_cache:{hashlib.sha256(content.encode()).hexdigest()}"

    async def cached_generate(self, client, **kwargs):
        key = self.cache_key(**kwargs)
        if key:
            cached = await self.store.get(key)
            if cached:
                return json.loads(cached)

        result = await client.generate(**kwargs)

        if key:
            await self.store.set(key, json.dumps(result), ex=self.default_ttl)

        return result

Feature Flags for AI

Controlling AI Behavior in Production

Feature flags give you fine-grained control over AI behavior without deployments:

python
class AIFeatureFlags:
    """Feature flag system for AI behavior control"""

    def __init__(self, flag_provider):
        self.flags = flag_provider  # LaunchDarkly, Flagsmith, etc.

    def get_model_config(self, feature, user_context=None):
        """Get the model configuration for a feature"""
        return {
            "model": self.flags.get_string(
                f"ai.{feature}.model",
                default="gpt-4o-mini",
                user=user_context,
            ),
            "temperature": self.flags.get_float(
                f"ai.{feature}.temperature",
                default=0.7,
            ),
            "max_tokens": self.flags.get_int(
                f"ai.{feature}.max_tokens",
                default=1000,
            ),
            "enabled": self.flags.get_bool(
                f"ai.{feature}.enabled",
                default=True,
            ),
            "system_prompt_version": self.flags.get_string(
                f"ai.{feature}.prompt_version",
                default="v1",
            ),
        }

    def get_prompt(self, feature, version=None):
        """Load versioned prompts from flag system"""
        version = version or self.get_model_config(feature)["system_prompt_version"]
        return self.flags.get_string(f"ai.{feature}.prompts.{version}")

# Usage:
flags = AIFeatureFlags(launchdarkly_client)

config = flags.get_model_config("summarization", user_context=current_user)
if config["enabled"]:
    summary = await llm.generate(
        model=config["model"],
        temperature=config["temperature"],
        max_tokens=config["max_tokens"],
        system=flags.get_prompt("summarization", config["system_prompt_version"]),
        messages=[{"role": "user", "content": document_text}],
    )

Gradual Rollout

Rolling out AI features gradually reduces risk:

python
class GradualRollout:
    def __init__(self, flag_provider):
        self.flags = flag_provider

    def should_use_new_model(self, user_id, feature):
        """Gradually roll out a new model"""
        rollout_pct = self.flags.get_float(f"ai.{feature}.new_model_pct", default=0)

        # Deterministic: same user always gets same treatment
        user_hash = int(hashlib.md5(
            f"{user_id}:{feature}".encode()
        ).hexdigest(), 16) % 100

        return user_hash < rollout_pct

    async def generate_with_rollout(self, user_id, feature, **kwargs):
        if self.should_use_new_model(user_id, feature):
            config = self.flags.get_model_config(f"{feature}.new")
            source = "new"
        else:
            config = self.flags.get_model_config(f"{feature}.current")
            source = "current"

        result = await self.llm.generate(**{**kwargs, **config})

        # Track which model served each request for analysis
        self.metrics.emit("ai_model_served", {
            "feature": feature,
            "source": source,
            "model": config["model"],
            "user_id": user_id,
        })

        return result

A/B Testing AI Features

Designing AI Experiments

A/B testing AI features is harder than testing UI changes because:

  1. AI outputs are non-deterministic (same input can produce different outputs)
  2. Quality is subjective and hard to measure automatically
  3. Users may adapt their behavior to the AI's capabilities
  4. Long-term effects (trust, reliance) differ from short-term engagement
python
class AIExperiment:
    def __init__(self, experiment_id, variants):
        self.id = experiment_id
        self.variants = variants  # {"control": config_a, "treatment": config_b}

    def assign_user(self, user_id):
        """Deterministic assignment based on user_id"""
        hash_val = int(hashlib.sha256(
            f"{self.id}:{user_id}".encode()
        ).hexdigest(), 16)
        variant_index = hash_val % len(self.variants)
        return list(self.variants.keys())[variant_index]

    async def run(self, user_id, input_data):
        variant = self.assign_user(user_id)
        config = self.variants[variant]

        start = time.time()
        result = await self.llm.generate(**config, messages=input_data)
        latency = time.time() - start

        # Log experiment data
        self.log_exposure(user_id, variant, {
            "latency_ms": latency * 1000,
            "output_tokens": result.usage.completion_tokens,
            "input_tokens": result.usage.prompt_tokens,
            "cost": self.compute_cost(result.usage, config["model"]),
        })

        return result

    def analyze(self, metric="task_completion_rate"):
        """Analyze experiment results with statistical rigor"""
        control_data = self.get_metric_data("control", metric)
        treatment_data = self.get_metric_data("treatment", metric)

        # Check for statistical significance
        stat, p_value = scipy.stats.mannwhitneyu(control_data, treatment_data)

        # Effect size (Cohen's d)
        effect_size = (
            np.mean(treatment_data) - np.mean(control_data)
        ) / np.sqrt(
            (np.var(control_data) + np.var(treatment_data)) / 2
        )

        return {
            "control_mean": np.mean(control_data),
            "treatment_mean": np.mean(treatment_data),
            "p_value": p_value,
            "effect_size": effect_size,
            "significant": p_value < 0.05,
            "sample_size": {
                "control": len(control_data),
                "treatment": len(treatment_data),
            },
        }

Architecture Decision Records for AI

Documenting AI Architecture Decisions

AI systems have unique architectural decisions that should be documented:

markdown
# ADR-007: LLM Provider Strategy

## Status: Accepted

## Context
We need to choose how to handle LLM provider selection for our
AI features. Options include single-provider, multi-provider with
manual switching, or automated multi-provider with routing.

## Decision
We will implement a multi-provider architecture with:
1. Primary provider: OpenAI (GPT-4o) for quality-critical paths
2. Secondary provider: Anthropic (Claude) for long-context tasks
3. Fallback provider: Local Llama for non-sensitive, high-volume tasks
4. Automated routing based on task type, cost budget, and latency SLA

## Rationale
- Avoids vendor lock-in
- Enables cost optimization per task type
- Provides resilience against single-provider outages
- Different models excel at different tasks

## Consequences
- Increased operational complexity (monitoring multiple providers)
- Need standardized prompt format that works across providers
- Must maintain provider-specific prompt optimizations
- Additional latency for routing decisions (~5ms)

## Review Date: 2025-06-01

Key AI-specific decisions to document:

  • Model selection rationale (why this model, not that one)
  • Prompt versioning strategy
  • Context window management approach
  • Evaluation methodology and quality thresholds
  • Cost budgets and optimization strategy
  • Data handling and privacy decisions
  • Fallback and degradation strategy

Observability

Comprehensive AI Monitoring

Every LLM call should emit latency, token usage, cost, and error-type metrics, tagged by model and feature. Without this telemetry, cost optimization and quality improvement are flying blind.

Tip: Tag every metric with both model and feature_name. This lets you answer "which feature is driving our cost spike?" and "which model variant is slowest for our summarization pipeline?" โ€” questions that aggregated metrics alone cannot answer.

python
class AIObservability:
    def __init__(self, metrics_client, trace_client):
        self.metrics = metrics_client
        self.traces = trace_client

    def wrap_llm_call(self, func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            span = self.traces.start_span("llm_call", {
                "model": kwargs.get("model"),
                "feature": kwargs.get("feature_name"),
            })

            start = time.time()
            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start

                # Emit metrics
                self.metrics.histogram("llm.latency_ms", duration * 1000, {
                    "model": kwargs.get("model"),
                    "feature": kwargs.get("feature_name"),
                })
                self.metrics.counter("llm.tokens.input", result.usage.prompt_tokens)
                self.metrics.counter("llm.tokens.output", result.usage.completion_tokens)
                self.metrics.counter("llm.cost_cents",
                    self.compute_cost_cents(result.usage, kwargs.get("model")))
                self.metrics.counter("llm.requests.success", 1)

                span.set_attribute("tokens.total",
                    result.usage.total_tokens)
                span.set_status("ok")

                return result

            except Exception as e:
                self.metrics.counter("llm.requests.error", 1, {
                    "error_type": type(e).__name__,
                })
                span.set_status("error", str(e))
                raise
            finally:
                span.end()

        return wrapper

Human-in-the-Loop Patterns

Fully autonomous AI pipelines are a design goal, not a starting point. Most production systems need human oversight at critical junctures:

  • Stakes are too high for unsupervised decisions
  • Model confidence is below the threshold for autonomous action
  • Regulatory requirements mandate human review for certain decision types

The challenge is designing intervention points that improve reliability without creating bottlenecks.

Note: The goal is not to minimize human involvement โ€” it is to direct human attention precisely where it adds the most value. Systems that route 80% of requests automatically while escalating the critical 20% typically outperform both fully autonomous and fully manual approaches.

Confidence-Based Routing

The simplest human-in-the-loop pattern routes requests based on model confidence. Below a threshold, the system escalates to a human rather than acting on an uncertain prediction:

python
class ConfidenceRouter:
    """Route to human review when model confidence is below threshold"""

    def __init__(self, model, review_queue, thresholds=None):
        self.model = model
        self.review_queue = review_queue
        self.thresholds = thresholds or {
            "auto_approve": 0.95,
            "auto_reject": 0.10,
            # Everything between goes to human review
        }

    async def process(self, request):
        result = await self.model.predict(request)

        if result.confidence >= self.thresholds["auto_approve"]:
            return Action(type="auto", decision=result.label, confidence=result.confidence)
        elif result.confidence <= self.thresholds["auto_reject"]:
            return Action(type="auto_reject", decision="rejected", confidence=result.confidence)
        else:
            # Queue for human review with model's suggestion
            ticket = await self.review_queue.enqueue({
                "request": request,
                "model_suggestion": result.label,
                "model_confidence": result.confidence,
                "model_reasoning": result.explanation,
                "priority": self.compute_priority(result),
            })
            return Action(type="pending_review", ticket_id=ticket.id)

    def compute_priority(self, result):
        """Higher priority for borderline cases and high-value requests"""
        confidence_urgency = 1.0 - abs(result.confidence - 0.5) * 2
        return confidence_urgency * result.request_value

Approval Workflows

For operations with irreversible consequences -- sending emails to customers, executing financial transactions, publishing content -- approval workflows gate AI output behind human sign-off:

python
class ApprovalWorkflow:
    """Gate AI actions behind human approval for high-stakes operations"""

    RISK_LEVELS = {
        "low": {"auto_approve": True, "reviewers_required": 0},
        "medium": {"auto_approve": False, "reviewers_required": 1},
        "high": {"auto_approve": False, "reviewers_required": 2},
        "critical": {"auto_approve": False, "reviewers_required": 2, "requires_senior": True},
    }

    async def submit(self, ai_output, operation_type, context):
        risk = await self.assess_risk(ai_output, operation_type, context)
        policy = self.RISK_LEVELS[risk]

        if policy["auto_approve"]:
            await self.execute(ai_output)
            return {"status": "auto_approved", "risk": risk}

        approval_request = await self.create_approval_request(
            output=ai_output,
            risk_level=risk,
            reviewers_required=policy["reviewers_required"],
            requires_senior=policy.get("requires_senior", False),
            expires_at=datetime.utcnow() + timedelta(hours=4),
        )

        # Notify reviewers via Slack, email, or dashboard
        await self.notify_reviewers(approval_request)
        return {"status": "pending_approval", "request_id": approval_request.id}

    async def assess_risk(self, ai_output, operation_type, context):
        """Classify risk using rules + model judgment"""
        # Rule-based checks first
        if operation_type in ("financial_transaction", "legal_document"):
            return "critical"
        if context.get("customer_tier") == "enterprise":
            return "high"

        # Model-based risk assessment for everything else
        risk_assessment = await self.risk_model.classify(
            output=ai_output,
            operation=operation_type,
            categories=["low", "medium", "high"],
        )
        return risk_assessment.label

Escalation Triggers

Beyond confidence thresholds, specific content patterns or operational conditions should trigger human escalation. Design these as composable rules:

python
class EscalationEngine:
    def __init__(self):
        self.triggers = []

    def add_trigger(self, name, condition, priority="medium"):
        self.triggers.append({"name": name, "condition": condition, "priority": priority})

    async def check(self, request, ai_response):
        fired = []
        for trigger in self.triggers:
            if await trigger["condition"](request, ai_response):
                fired.append(trigger)
        return sorted(fired, key=lambda t: {"critical": 0, "high": 1, "medium": 2}[t["priority"]])

# Example triggers
engine = EscalationEngine()
engine.add_trigger(
    "mentions_legal",
    lambda req, resp: any(term in resp.text.lower() for term in ["lawsuit", "liability", "attorney"]),
    priority="high",
)
engine.add_trigger(
    "sentiment_negative",
    lambda req, resp: resp.sentiment_score < -0.7,
    priority="medium",
)
engine.add_trigger(
    "high_token_output",
    lambda req, resp: resp.usage.completion_tokens > 2000,
    priority="medium",
)

The key principle is that human-in-the-loop is not a failure mode -- it is a design pattern. Systems that route 80% of requests automatically while escalating the remaining 20% typically outperform both fully autonomous and fully manual approaches. For how to observe and trace these routing decisions in production, see Article 40 -- Observability.

Testing Non-Deterministic Systems

Traditional software testing asserts that f(x) == y. LLM-based systems break this contract: the same prompt can produce different outputs on every call. Testing strategies must shift from exact match verification to property-based and statistical assertions.

Tip: Run your LLM test suite with a fixed seed and temperature=0 to maximize reproducibility in CI. Then separately run statistical tests (multiple trials) on temperature > 0 configurations to catch distribution-level regressions.

Property-Based Testing

Instead of testing for specific outputs, test that outputs satisfy invariant properties:

python
import pytest
from hypothesis import given, strategies as st

class TestLLMProperties:
    """Test invariant properties of LLM outputs, not exact values"""

    async def test_json_output_always_valid(self, llm_client):
        """Property: structured output endpoint always returns valid JSON"""
        prompts = [
            "List three countries and their capitals",
            "Describe the water cycle in structured format",
            "Generate a product catalog entry for a laptop",
        ]
        for prompt in prompts:
            result = await llm_client.generate(
                prompt=prompt,
                response_format={"type": "json_object"},
            )
            parsed = json.loads(result.text)  # Must not raise
            assert isinstance(parsed, dict)

    async def test_classification_within_expected_labels(self):
        """Property: classifier outputs only valid categories"""
        VALID_LABELS = {"positive", "negative", "neutral"}
        test_inputs = load_test_corpus("sentiment_samples.jsonl")

        for sample in test_inputs:
            result = await classifier.classify(sample.text)
            assert result.label in VALID_LABELS, (
                f"Unexpected label '{result.label}' for input: {sample.text[:80]}"
            )
            assert 0.0 <= result.confidence <= 1.0

    async def test_summary_shorter_than_input(self):
        """Property: summaries should always be shorter than source text"""
        documents = load_test_corpus("documents.jsonl")
        for doc in documents:
            summary = await summarizer.summarize(doc.text)
            assert len(summary) < len(doc.text), "Summary should be shorter than input"
            assert len(summary) > 50, "Summary should not be trivially short"

    async def test_idempotent_at_zero_temperature(self):
        """Property: temperature=0 should produce near-identical outputs"""
        prompt = "What is 2 + 2? Reply with just the number."
        results = set()
        for _ in range(5):
            result = await llm_client.generate(prompt=prompt, temperature=0)
            results.add(result.text.strip())
        assert len(results) == 1, f"Expected deterministic output, got: {results}"

Snapshot Testing with Similarity Thresholds

Exact snapshot matching does not work for LLM outputs. Instead, use semantic similarity to detect regressions -- if a prompt change causes outputs to drift beyond a threshold, the test fails:

python
from sentence_transformers import SentenceTransformer

class SemanticSnapshotTest:
    def __init__(self, snapshot_dir, similarity_threshold=0.85):
        self.snapshot_dir = snapshot_dir
        self.threshold = similarity_threshold
        self.embedder = SentenceTransformer("all-MiniLM-L6-v2")

    async def assert_semantically_similar(self, test_name, current_output):
        snapshot_path = os.path.join(self.snapshot_dir, f"{test_name}.json")

        if not os.path.exists(snapshot_path):
            # First run: save snapshot
            self.save_snapshot(snapshot_path, current_output)
            return

        saved = self.load_snapshot(snapshot_path)
        similarity = self.cosine_similarity(
            self.embedder.encode(saved["output"]),
            self.embedder.encode(current_output),
        )

        assert similarity >= self.threshold, (
            f"Semantic drift detected for '{test_name}': "
            f"similarity={similarity:.3f}, threshold={self.threshold}\n"
            f"Saved: {saved['output'][:200]}\n"
            f"Current: {current_output[:200]}"
        )

Statistical Assertion Strategies

For non-deterministic outputs, run multiple trials and assert on distributions rather than individual results:

python
class StatisticalAssertions:
    @staticmethod
    async def assert_pass_rate(test_fn, n_trials=20, min_pass_rate=0.90):
        """Run a test multiple times and assert a minimum pass rate"""
        results = []
        for _ in range(n_trials):
            try:
                await test_fn()
                results.append(True)
            except AssertionError:
                results.append(False)

        pass_rate = sum(results) / len(results)
        assert pass_rate >= min_pass_rate, (
            f"Pass rate {pass_rate:.0%} below minimum {min_pass_rate:.0%} "
            f"over {n_trials} trials"
        )

    @staticmethod
    async def assert_quality_distribution(eval_fn, samples, min_median=0.7, min_p10=0.4):
        """Assert that quality scores meet distribution requirements"""
        scores = [await eval_fn(sample) for sample in samples]
        scores.sort()

        median = scores[len(scores) // 2]
        p10 = scores[len(scores) // 10]

        assert median >= min_median, f"Median quality {median:.2f} below threshold {min_median}"
        assert p10 >= min_p10, f"P10 quality {p10:.2f} below threshold {min_p10}"

These testing patterns integrate naturally into CI/CD pipelines. For the full picture of regression testing, continuous evaluation, and eval-driven development workflows, see Article 36 -- CI/CD for AI. For the evaluation metrics and methodology that underpin these test assertions, see Article 31 -- Eval Fundamentals.

Prompt Management Systems

Prompts are not strings -- they are configuration artifacts with versioning, deployment, and lifecycle requirements comparable to feature flags or database migrations. Treating prompts as code checked into version control is a good start, but production systems need more: A/B testing of prompt variants, rollback on quality regression, and separation of prompt authoring from application deployment.

Note: A prompt change that ships silently inside a code deployment is an untested change to a critical system component. Prompt management systems make this visible, measurable, and reversible.

Prompts as First-Class Artifacts

A prompt management system separates prompts from application code, enabling non-engineers (domain experts, content teams) to iterate on prompts without triggering a full deployment cycle:

python
class PromptRegistry:
    """Central registry for versioned, deployable prompts"""

    def __init__(self, store):
        self.store = store  # Database, Langfuse, or config service

    async def get_prompt(self, name, *, version=None, label="production"):
        """Fetch a prompt by name, optionally pinned to a version or label"""
        if version:
            return await self.store.get_prompt_version(name, version)

        # Labels allow promoting prompts through environments:
        # "draft" -> "staging" -> "production"
        return await self.store.get_prompt_by_label(name, label)

    async def create_version(self, name, template, metadata=None):
        """Create a new prompt version (immutable once created)"""
        version = await self.store.create_prompt_version(
            name=name,
            template=template,
            metadata={
                "author": metadata.get("author"),
                "change_reason": metadata.get("change_reason"),
                "created_at": datetime.utcnow().isoformat(),
                **(metadata or {}),
            },
        )
        return version

    async def promote(self, name, version, from_label, to_label):
        """Promote a prompt version from one environment to another"""
        # Validate that it was tested in the source environment
        prompt = await self.store.get_prompt_version(name, version)
        if from_label not in prompt.labels:
            raise ValueError(f"Prompt {name}@{version} not labeled '{from_label}'")

        await self.store.add_label(name, version, to_label)
        # Remove the label from any previous version
        await self.store.remove_label_from_others(name, version, to_label)

Langfuse Prompt Management

Langfuse provides built-in prompt management that links prompts to traces, enabling direct correlation between prompt versions and quality metrics. This is covered in depth in Article 40 -- Observability, but the deployment pattern is worth highlighting here:

python
from langfuse import Langfuse

langfuse = Langfuse()

# Fetch the production prompt -- Langfuse handles versioning
prompt = langfuse.get_prompt("customer-support-router", label="production")

# The prompt object carries its version and metadata
print(f"Using prompt version: {prompt.version}, updated: {prompt.config.get('updated_at')}")

# Compile the prompt with variables
compiled = prompt.compile(
    customer_name=customer.name,
    issue_category=ticket.category,
    account_tier=customer.tier,
)

# When used with tracing, Langfuse links this prompt version to every
# trace that uses it -- enabling quality analysis per prompt version
generation = langfuse.generation(
    name="support-response",
    prompt=prompt,  # Links trace to prompt version
    input=compiled,
)

A/B Testing Prompts

Prompt A/B testing extends the experiment framework from earlier in this article. The critical addition is linking experiment variants to prompt versions so that quality regressions are traceable to specific prompt changes:

python
class PromptExperiment:
    def __init__(self, prompt_registry, experiment_id):
        self.registry = prompt_registry
        self.experiment_id = experiment_id

    async def run(self, user_id, input_data, prompt_name, variants):
        """
        variants: {"control": "v12", "treatment": "v13"}
        Maps variant names to prompt versions.
        """
        variant = self.assign_variant(user_id)
        prompt_version = variants[variant]

        prompt = await self.registry.get_prompt(prompt_name, version=prompt_version)
        compiled = prompt.compile(**input_data)

        result = await self.llm.generate(messages=[{"role": "user", "content": compiled}])

        # Log for analysis: links experiment -> prompt version -> output quality
        await self.log_experiment_exposure(
            experiment_id=self.experiment_id,
            user_id=user_id,
            variant=variant,
            prompt_name=prompt_name,
            prompt_version=prompt_version,
            result_id=result.id,
        )
        return result

For how prompt A/B tests fit into a broader gateway routing architecture, see Article 42 -- AI Gateways.

Structured Output in Production

Getting an LLM to return valid JSON in a demo is easy. Guaranteeing schema compliance at scale โ€” across thousands of requests per hour, with varying input complexity and multiple model providers โ€” is a production engineering problem.

The tooling has matured significantly: Instructor, Outlines, and native JSON mode each offer different tradeoff points between flexibility, reliability, and vendor coupling. For the foundational techniques (JSON mode, constrained decoding, tool-schema tricks), see Article 10 -- Structured Output. This section focuses on the production patterns that wrap those techniques.

Instructor: Pydantic-First Structured Output

Instructor patches LLM client libraries to return validated Pydantic models instead of raw strings. It handles retries on validation failure automatically:

python
import instructor
from pydantic import BaseModel, Field, field_validator
from openai import AsyncOpenAI

client = instructor.from_openai(AsyncOpenAI())

class ExtractedEntity(BaseModel):
    name: str = Field(description="Entity name")
    entity_type: str = Field(description="Type: person, organization, location, or product")
    confidence: float = Field(ge=0.0, le=1.0, description="Extraction confidence")

    @field_validator("entity_type")
    @classmethod
    def validate_entity_type(cls, v):
        allowed = {"person", "organization", "location", "product"}
        if v.lower() not in allowed:
            raise ValueError(f"entity_type must be one of {allowed}, got '{v}'")
        return v.lower()

class ExtractionResult(BaseModel):
    entities: list[ExtractedEntity]
    summary: str = Field(max_length=500)

# Instructor validates the response against the Pydantic model.
# On validation failure, it automatically retries with the error message
# injected into the prompt, giving the model a chance to self-correct.
result = await client.chat.completions.create(
    model="gpt-4o",
    response_model=ExtractionResult,
    max_retries=3,
    messages=[{"role": "user", "content": f"Extract entities from: {document_text}"}],
)

# result is a validated ExtractionResult instance -- guaranteed by Pydantic
assert isinstance(result, ExtractionResult)
for entity in result.entities:
    assert 0.0 <= entity.confidence <= 1.0

Runtime Validation Layer

Even with structured output guarantees from the model, a production system should validate outputs at the application boundary. Defense in depth:

python
class OutputValidator:
    """Validate LLM outputs before they reach downstream consumers"""

    def __init__(self, schema_registry):
        self.schemas = schema_registry

    async def validate_and_repair(self, output, schema_name, repair_attempts=2):
        schema = self.schemas.get(schema_name)

        # Attempt 1: Direct validation
        try:
            return schema.model_validate_json(output)
        except ValidationError as first_error:
            pass

        # Attempt 2: Common repairs (strip markdown fences, fix trailing commas)
        cleaned = self.clean_common_issues(output)
        try:
            return schema.model_validate_json(cleaned)
        except ValidationError:
            pass

        # Attempt 3: Ask the LLM to fix its own output
        for attempt in range(repair_attempts):
            repair_prompt = (
                f"The following JSON is invalid according to the schema.\n"
                f"Schema: {schema.model_json_schema()}\n"
                f"Invalid JSON: {output}\n"
                f"Validation error: {first_error}\n"
                f"Return only the corrected JSON."
            )
            output = await self.llm.generate(prompt=repair_prompt, temperature=0)
            try:
                return schema.model_validate_json(output)
            except ValidationError:
                continue

        raise StructuredOutputError(
            f"Could not produce valid {schema_name} after {repair_attempts + 2} attempts"
        )

    def clean_common_issues(self, text):
        """Fix common LLM JSON formatting issues"""
        # Strip markdown code fences
        text = re.sub(r"^```(?:json)?\s*\n?", "", text.strip())
        text = re.sub(r"\n?```\s*$", "", text)
        # Remove trailing commas before closing brackets
        text = re.sub(r",\s*([}\]])", r"\1", text)
        return text.strip()

The validate-and-repair pattern is essential because no single technique guarantees 100% schema compliance across all models, all inputs, and all edge cases. Instructor handles the common case; the validation layer catches the rest.

Key Takeaways

  • Implement the router pattern first when optimizing costs โ€” routing simple requests to cheaper models is the fastest, lowest-risk path to 50โ€“70% cost reduction
  • Use exponential backoff with jitter for rate limit errors, no retry for content policy violations, and input truncation for context length errors โ€” each error type requires a different strategy
  • Add feature flags for every LLM call's model, temperature, and prompt version before shipping to production; without them, any change requires a full deployment
  • Version and stage prompts independently from code using a prompt registry (draft โ†’ staging โ†’ production) โ€” this enables non-engineers to iterate on prompts without touching application deployments
  • Write property-based tests (valid JSON, expected label set, length constraints) over statistical samples; exact-match tests for LLM outputs are brittle and defeat the purpose
  • Apply defense in depth for structured output: use Instructor/Outlines for the happy path, plus a runtime validation-and-repair layer โ€” no single technique guarantees schema compliance across all edge cases
  • Article 31 -- LLM Evaluation Fundamentals: Metrics, datasets, and methodology for measuring the quality assertions used throughout this article.
  • Article 36 -- CI/CD for AI: Regression testing, continuous evaluation, and eval-driven development workflows that operationalize the testing patterns described here.
  • Article 40 -- Observability: Tracing, logging, and LLM monitoring -- including Langfuse integration for prompt management and quality tracking.
  • Article 42 -- AI Gateways: Rate limiting, fallback chains, and multi-provider routing that complement the router and fallback patterns in this article.
โ† PreviousLLM Serving: API Design, Batching & StreamingNext โ†’CI/CD for AI: Regression Testing, Monitoring & Continuous Eval