LLM Observability: Tracing, Cost Tracking, and Quality Monitoring for Production AI

Introduction: You can’t improve what you can’t measure. LLM applications are notoriously difficult to debug—prompts are opaque, responses are non-deterministic, and failures often manifest as subtle quality degradation rather than crashes. Observability gives you visibility into every LLM call: what prompts were sent, what responses came back, how long it took, how much it cost, and whether users were satisfied. This guide covers implementing comprehensive LLM observability using tools like LangSmith, Langfuse, and custom instrumentation, with patterns for tracing, cost tracking, and quality monitoring.

LLM Observability
LLM Observability: Traces, Metrics, and Insights

Basic Instrumentation

import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Any
import json

@dataclass
class LLMSpan:
    """A single LLM operation span."""
    span_id: str
    trace_id: str
    name: str
    start_time: datetime
    end_time: Optional[datetime] = None
    model: str = ""
    prompt: str = ""
    response: str = ""
    tokens_input: int = 0
    tokens_output: int = 0
    latency_ms: float = 0
    cost_usd: float = 0
    metadata: dict = field(default_factory=dict)
    error: Optional[str] = None

class LLMTracer:
    """Simple LLM tracing implementation."""
    
    # Cost per 1K tokens (approximate)
    COSTS = {
        "gpt-4-turbo-preview": {"input": 0.01, "output": 0.03},
        "gpt-4o": {"input": 0.005, "output": 0.015},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
    }
    
    def __init__(self):
        self.spans: list[LLMSpan] = []
        self.current_trace_id: Optional[str] = None
    
    def start_trace(self, name: str = "default") -> str:
        """Start a new trace."""
        self.current_trace_id = str(uuid.uuid4())
        return self.current_trace_id
    
    def start_span(self, name: str, model: str = "", prompt: str = "") -> LLMSpan:
        """Start a new span within current trace."""
        span = LLMSpan(
            span_id=str(uuid.uuid4()),
            trace_id=self.current_trace_id or str(uuid.uuid4()),
            name=name,
            start_time=datetime.now(),
            model=model,
            prompt=prompt
        )
        return span
    
    def end_span(
        self,
        span: LLMSpan,
        response: str,
        tokens_input: int,
        tokens_output: int,
        error: Optional[str] = None
    ):
        """End a span and calculate metrics."""
        span.end_time = datetime.now()
        span.response = response
        span.tokens_input = tokens_input
        span.tokens_output = tokens_output
        span.error = error
        
        # Calculate latency
        span.latency_ms = (span.end_time - span.start_time).total_seconds() * 1000
        
        # Calculate cost
        if span.model in self.COSTS:
            costs = self.COSTS[span.model]
            span.cost_usd = (
                (tokens_input / 1000) * costs["input"] +
                (tokens_output / 1000) * costs["output"]
            )
        
        self.spans.append(span)
        return span
    
    def get_trace_summary(self, trace_id: str) -> dict:
        """Get summary for a trace."""
        trace_spans = [s for s in self.spans if s.trace_id == trace_id]
        
        return {
            "trace_id": trace_id,
            "span_count": len(trace_spans),
            "total_latency_ms": sum(s.latency_ms for s in trace_spans),
            "total_tokens": sum(s.tokens_input + s.tokens_output for s in trace_spans),
            "total_cost_usd": sum(s.cost_usd for s in trace_spans),
            "errors": [s.error for s in trace_spans if s.error]
        }

# Decorator for automatic tracing
def trace_llm_call(tracer: LLMTracer):
    """Decorator to automatically trace LLM calls."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            span = tracer.start_span(
                name=func.__name__,
                model=kwargs.get("model", "unknown"),
                prompt=str(kwargs.get("messages", kwargs.get("prompt", "")))[:1000]
            )
            
            try:
                result = func(*args, **kwargs)
                
                # Extract response info
                if hasattr(result, "choices"):
                    response = result.choices[0].message.content
                    tokens_in = result.usage.prompt_tokens
                    tokens_out = result.usage.completion_tokens
                else:
                    response = str(result)
                    tokens_in = tokens_out = 0
                
                tracer.end_span(span, response, tokens_in, tokens_out)
                return result
                
            except Exception as e:
                tracer.end_span(span, "", 0, 0, error=str(e))
                raise
        
        return wrapper
    return decorator

# Usage
tracer = LLMTracer()

@trace_llm_call(tracer)
def call_openai(**kwargs):
    from openai import OpenAI
    client = OpenAI()
    return client.chat.completions.create(**kwargs)

# Make traced call
tracer.start_trace("user_query")
response = call_openai(
    model="gpt-4-turbo-preview",
    messages=[{"role": "user", "content": "Hello!"}]
)
print(tracer.get_trace_summary(tracer.current_trace_id))

LangSmith Integration

# pip install langsmith

import os
from langsmith import Client
from langsmith.wrappers import wrap_openai
from openai import OpenAI

# Set environment variables
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-llm-app"

# Wrap OpenAI client for automatic tracing
client = wrap_openai(OpenAI())

# All calls are now automatically traced
response = client.chat.completions.create(
    model="gpt-4-turbo-preview",
    messages=[{"role": "user", "content": "What is LangSmith?"}]
)

# Manual run logging
from langsmith import traceable

@traceable(name="process_query")
def process_query(query: str) -> str:
    """Process a user query with tracing."""
    
    # This will be traced as a child span
    response = client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{"role": "user", "content": query}]
    )
    
    return response.choices[0].message.content

# Add feedback/evaluation
ls_client = Client()

# Log feedback on a run
ls_client.create_feedback(
    run_id="run-uuid-here",
    key="user_rating",
    score=1.0,  # 0-1 scale
    comment="Helpful response"
)

# Create dataset for evaluation
dataset = ls_client.create_dataset("qa-examples")

ls_client.create_example(
    inputs={"question": "What is Python?"},
    outputs={"answer": "Python is a programming language."},
    dataset_id=dataset.id
)

# Run evaluation
from langsmith.evaluation import evaluate

def my_app(inputs: dict) -> dict:
    response = process_query(inputs["question"])
    return {"answer": response}

results = evaluate(
    my_app,
    data="qa-examples",
    evaluators=["qa"],  # Built-in QA evaluator
)

Langfuse Integration

# pip install langfuse

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from langfuse.openai import openai

# Initialize Langfuse
langfuse = Langfuse(
    public_key="pk-...",
    secret_key="sk-...",
    host="https://cloud.langfuse.com"
)

# Automatic OpenAI tracing
from langfuse.openai import OpenAI

client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4-turbo-preview",
    messages=[{"role": "user", "content": "Hello!"}],
    # Langfuse-specific parameters
    name="greeting",
    metadata={"user_id": "user123"},
    tags=["production"]
)

# Decorator-based tracing
@observe()
def process_document(doc: str) -> str:
    """Process document with full tracing."""
    
    # Add custom metadata
    langfuse_context.update_current_observation(
        metadata={"doc_length": len(doc)}
    )
    
    # Nested LLM call (automatically traced)
    summary = client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[
            {"role": "system", "content": "Summarize the document."},
            {"role": "user", "content": doc}
        ],
        name="summarize"
    )
    
    return summary.choices[0].message.content

# Manual span creation
@observe()
def complex_pipeline(query: str) -> str:
    """Pipeline with manual spans."""
    
    # Create child span for retrieval
    with langfuse_context.observe(name="retrieval") as span:
        docs = retrieve_documents(query)
        span.update(output={"doc_count": len(docs)})
    
    # Create child span for generation
    with langfuse_context.observe(name="generation") as span:
        response = generate_response(query, docs)
        span.update(output={"response_length": len(response)})
    
    return response

# Score traces
langfuse.score(
    trace_id="trace-id",
    name="user_feedback",
    value=1,
    comment="User marked as helpful"
)

# Flush before exit
langfuse.flush()

Cost Tracking and Budgets

from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
import threading

@dataclass
class CostBudget:
    daily_limit_usd: float
    monthly_limit_usd: float
    alert_threshold: float = 0.8  # Alert at 80%

class CostTracker:
    """Track and enforce LLM cost budgets."""
    
    COSTS_PER_1K = {
        "gpt-4-turbo-preview": {"input": 0.01, "output": 0.03},
        "gpt-4o": {"input": 0.005, "output": 0.015},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
        "claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
        "claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125},
    }
    
    def __init__(self, budget: CostBudget):
        self.budget = budget
        self.costs: dict[str, list[tuple[datetime, float]]] = defaultdict(list)
        self.lock = threading.Lock()
    
    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """Calculate cost for a single call."""
        if model not in self.COSTS_PER_1K:
            return 0.0
        
        costs = self.COSTS_PER_1K[model]
        return (
            (input_tokens / 1000) * costs["input"] +
            (output_tokens / 1000) * costs["output"]
        )
    
    def record_cost(self, model: str, input_tokens: int, output_tokens: int, user_id: str = "default"):
        """Record a cost entry."""
        cost = self.calculate_cost(model, input_tokens, output_tokens)
        
        with self.lock:
            self.costs[user_id].append((datetime.now(), cost))
        
        # Check budgets
        self._check_budgets(user_id)
        
        return cost
    
    def get_daily_cost(self, user_id: str = "default") -> float:
        """Get total cost for today."""
        today = datetime.now().date()
        
        with self.lock:
            return sum(
                cost for ts, cost in self.costs[user_id]
                if ts.date() == today
            )
    
    def get_monthly_cost(self, user_id: str = "default") -> float:
        """Get total cost for this month."""
        now = datetime.now()
        month_start = now.replace(day=1, hour=0, minute=0, second=0)
        
        with self.lock:
            return sum(
                cost for ts, cost in self.costs[user_id]
                if ts >= month_start
            )
    
    def _check_budgets(self, user_id: str):
        """Check if budgets are exceeded."""
        daily = self.get_daily_cost(user_id)
        monthly = self.get_monthly_cost(user_id)
        
        if daily >= self.budget.daily_limit_usd:
            raise BudgetExceededError(f"Daily budget exceeded: ${daily:.2f}")
        
        if monthly >= self.budget.monthly_limit_usd:
            raise BudgetExceededError(f"Monthly budget exceeded: ${monthly:.2f}")
        
        # Alert at threshold
        if daily >= self.budget.daily_limit_usd * self.budget.alert_threshold:
            self._send_alert(f"Daily budget at {daily/self.budget.daily_limit_usd:.0%}")
    
    def _send_alert(self, message: str):
        """Send budget alert."""
        print(f"ALERT: {message}")  # Replace with actual alerting
    
    def get_report(self) -> dict:
        """Generate cost report."""
        report = {
            "daily_cost": self.get_daily_cost(),
            "monthly_cost": self.get_monthly_cost(),
            "daily_budget": self.budget.daily_limit_usd,
            "monthly_budget": self.budget.monthly_limit_usd,
            "daily_utilization": self.get_daily_cost() / self.budget.daily_limit_usd,
            "monthly_utilization": self.get_monthly_cost() / self.budget.monthly_limit_usd,
        }
        return report

class BudgetExceededError(Exception):
    pass

# Usage
budget = CostBudget(daily_limit_usd=10.0, monthly_limit_usd=200.0)
tracker = CostTracker(budget)

# Wrap LLM calls
def tracked_completion(client, **kwargs):
    response = client.chat.completions.create(**kwargs)
    
    tracker.record_cost(
        model=kwargs["model"],
        input_tokens=response.usage.prompt_tokens,
        output_tokens=response.usage.completion_tokens
    )
    
    return response

Quality Monitoring

from dataclasses import dataclass
from typing import Callable
import statistics

@dataclass
class QualityMetrics:
    response_length: int
    latency_ms: float
    has_error: bool
    user_rating: Optional[float] = None
    relevance_score: Optional[float] = None

class QualityMonitor:
    """Monitor LLM response quality over time."""
    
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.metrics: list[QualityMetrics] = []
        self.alerts: list[Callable] = []
    
    def record(self, metrics: QualityMetrics):
        """Record quality metrics."""
        self.metrics.append(metrics)
        
        # Keep window size
        if len(self.metrics) > self.window_size * 2:
            self.metrics = self.metrics[-self.window_size:]
        
        # Check for anomalies
        self._check_anomalies()
    
    def _check_anomalies(self):
        """Check for quality anomalies."""
        if len(self.metrics) < 10:
            return
        
        recent = self.metrics[-10:]
        
        # Error rate spike
        error_rate = sum(1 for m in recent if m.has_error) / len(recent)
        if error_rate > 0.2:
            self._alert(f"High error rate: {error_rate:.0%}")
        
        # Latency spike
        recent_latency = [m.latency_ms for m in recent]
        historical_latency = [m.latency_ms for m in self.metrics[:-10]]
        
        if historical_latency:
            avg_historical = statistics.mean(historical_latency)
            avg_recent = statistics.mean(recent_latency)
            
            if avg_recent > avg_historical * 2:
                self._alert(f"Latency spike: {avg_recent:.0f}ms vs {avg_historical:.0f}ms")
        
        # Quality drop
        recent_ratings = [m.user_rating for m in recent if m.user_rating is not None]
        if len(recent_ratings) >= 5:
            avg_rating = statistics.mean(recent_ratings)
            if avg_rating < 0.6:
                self._alert(f"Low user ratings: {avg_rating:.2f}")
    
    def _alert(self, message: str):
        """Trigger alert."""
        for callback in self.alerts:
            callback(message)
    
    def get_dashboard_data(self) -> dict:
        """Get data for monitoring dashboard."""
        if not self.metrics:
            return {}
        
        recent = self.metrics[-self.window_size:]
        
        return {
            "total_requests": len(self.metrics),
            "error_rate": sum(1 for m in recent if m.has_error) / len(recent),
            "avg_latency_ms": statistics.mean(m.latency_ms for m in recent),
            "p95_latency_ms": statistics.quantiles([m.latency_ms for m in recent], n=20)[18],
            "avg_response_length": statistics.mean(m.response_length for m in recent),
            "avg_user_rating": statistics.mean(
                m.user_rating for m in recent if m.user_rating is not None
            ) if any(m.user_rating for m in recent) else None,
        }

# Prometheus metrics export
from prometheus_client import Counter, Histogram, Gauge

llm_requests_total = Counter(
    'llm_requests_total',
    'Total LLM requests',
    ['model', 'status']
)

llm_latency_seconds = Histogram(
    'llm_latency_seconds',
    'LLM request latency',
    ['model'],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30]
)

llm_tokens_total = Counter(
    'llm_tokens_total',
    'Total tokens used',
    ['model', 'type']
)

llm_cost_usd = Counter(
    'llm_cost_usd_total',
    'Total cost in USD',
    ['model']
)

def record_prometheus_metrics(model: str, latency: float, tokens_in: int, tokens_out: int, cost: float, error: bool = False):
    """Record metrics to Prometheus."""
    status = "error" if error else "success"
    
    llm_requests_total.labels(model=model, status=status).inc()
    llm_latency_seconds.labels(model=model).observe(latency)
    llm_tokens_total.labels(model=model, type="input").inc(tokens_in)
    llm_tokens_total.labels(model=model, type="output").inc(tokens_out)
    llm_cost_usd.labels(model=model).inc(cost)

References

Conclusion

LLM observability is the foundation for building reliable AI applications. Without visibility into what your models are doing, you’re flying blind—unable to debug issues, optimize costs, or improve quality. Start with basic instrumentation that captures prompts, responses, latency, and tokens for every call. Add cost tracking early to avoid surprise bills. Use platforms like LangSmith or Langfuse for rich tracing and evaluation capabilities. Implement quality monitoring to catch degradation before users complain. The investment in observability pays dividends throughout the application lifecycle—from development debugging to production incident response to continuous improvement. Remember that LLM behavior can drift over time as providers update models, so ongoing monitoring is essential even for stable applications.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.