LLM Cost Optimization: Reducing API Spend Without Sacrificing Quality

Introduction: LLM API costs can spiral quickly—a chatbot handling 10,000 daily users at $0.01 per conversation costs $3,000 monthly. Production systems need cost optimization without sacrificing quality. This guide covers practical strategies: semantic caching to avoid redundant calls, model routing to use cheaper models when possible, prompt compression to reduce token counts, and monitoring to catch cost anomalies early. These techniques can reduce costs by 50-80% while maintaining user experience.

LLM Cost Optimization
LLM Cost Optimization: Caching, Routing, and Compression

Cost Tracking Foundation

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
import json

@dataclass
class ModelPricing:
    input_per_1k: float
    output_per_1k: float
    cached_input_per_1k: float = 0.0  # For providers with prompt caching

PRICING = {
    "gpt-4o": ModelPricing(0.0025, 0.01),
    "gpt-4o-mini": ModelPricing(0.00015, 0.0006),
    "gpt-4-turbo": ModelPricing(0.01, 0.03),
    "gpt-3.5-turbo": ModelPricing(0.0005, 0.0015),
    "claude-3-5-sonnet": ModelPricing(0.003, 0.015),
    "claude-3-haiku": ModelPricing(0.00025, 0.00125),
}

@dataclass
class UsageRecord:
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    cost: float
    endpoint: str = ""
    user_id: str = ""

class CostTracker:
    """Track and analyze LLM costs."""
    
    def __init__(self):
        self.records: list[UsageRecord] = []
        self.daily_budget: float = 100.0
        self.alert_threshold: float = 0.8
    
    def record(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        endpoint: str = "",
        user_id: str = ""
    ) -> float:
        """Record usage and return cost."""
        
        pricing = PRICING.get(model, ModelPricing(0.01, 0.03))
        
        cost = (
            (input_tokens / 1000) * pricing.input_per_1k +
            (output_tokens / 1000) * pricing.output_per_1k
        )
        
        record = UsageRecord(
            timestamp=datetime.now(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            endpoint=endpoint,
            user_id=user_id
        )
        
        self.records.append(record)
        
        # Check budget
        daily_cost = self.get_daily_cost()
        if daily_cost > self.daily_budget * self.alert_threshold:
            print(f"WARNING: Daily cost ${daily_cost:.2f} approaching budget ${self.daily_budget:.2f}")
        
        return cost
    
    def get_daily_cost(self, date: datetime = None) -> float:
        """Get total cost for a day."""
        date = date or datetime.now()
        start = date.replace(hour=0, minute=0, second=0, microsecond=0)
        end = start + timedelta(days=1)
        
        return sum(
            r.cost for r in self.records
            if start <= r.timestamp < end
        )
    
    def get_cost_by_model(self, days: int = 7) -> dict[str, float]:
        """Get cost breakdown by model."""
        cutoff = datetime.now() - timedelta(days=days)
        
        costs = defaultdict(float)
        for r in self.records:
            if r.timestamp >= cutoff:
                costs[r.model] += r.cost
        
        return dict(costs)
    
    def get_cost_by_endpoint(self, days: int = 7) -> dict[str, float]:
        """Get cost breakdown by endpoint."""
        cutoff = datetime.now() - timedelta(days=days)
        
        costs = defaultdict(float)
        for r in self.records:
            if r.timestamp >= cutoff:
                costs[r.endpoint] += r.cost
        
        return dict(costs)
    
    def report(self) -> str:
        """Generate cost report."""
        
        today = self.get_daily_cost()
        by_model = self.get_cost_by_model(7)
        by_endpoint = self.get_cost_by_endpoint(7)
        
        return f"""
Cost Report
===========
Today: ${today:.4f}
Budget: ${self.daily_budget:.2f} ({today/self.daily_budget*100:.1f}% used)

Last 7 Days by Model:
{json.dumps(by_model, indent=2)}

Last 7 Days by Endpoint:
{json.dumps(by_endpoint, indent=2)}
"""

# Global tracker
tracker = CostTracker()

Semantic Caching

import hashlib
import numpy as np
from openai import OpenAI
from typing import Optional

client = OpenAI()

class SemanticCache:
    """Cache responses for semantically similar queries."""
    
    def __init__(
        self,
        similarity_threshold: float = 0.95,
        max_entries: int = 10000,
        ttl_hours: int = 24
    ):
        self.threshold = similarity_threshold
        self.max_entries = max_entries
        self.ttl = timedelta(hours=ttl_hours)
        
        self.cache: dict[str, dict] = {}  # hash -> {embedding, response, timestamp}
        self.embeddings: list[tuple[str, list[float]]] = []  # (hash, embedding)
    
    def _get_embedding(self, text: str) -> list[float]:
        """Get embedding for cache lookup."""
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        a, b = np.array(a), np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _find_similar(self, embedding: list[float]) -> Optional[str]:
        """Find similar cached query."""
        
        for cache_hash, cache_emb in self.embeddings:
            if cache_hash in self.cache:
                # Check TTL
                if datetime.now() - self.cache[cache_hash]["timestamp"] > self.ttl:
                    continue
                
                similarity = self._cosine_similarity(embedding, cache_emb)
                if similarity >= self.threshold:
                    return cache_hash
        
        return None
    
    def get(self, query: str) -> Optional[str]:
        """Get cached response for similar query."""
        
        embedding = self._get_embedding(query)
        similar_hash = self._find_similar(embedding)
        
        if similar_hash:
            return self.cache[similar_hash]["response"]
        
        return None
    
    def set(self, query: str, response: str):
        """Cache a response."""
        
        embedding = self._get_embedding(query)
        query_hash = hashlib.md5(query.encode()).hexdigest()
        
        self.cache[query_hash] = {
            "response": response,
            "timestamp": datetime.now()
        }
        self.embeddings.append((query_hash, embedding))
        
        # Evict old entries if needed
        if len(self.cache) > self.max_entries:
            oldest = min(self.cache.items(), key=lambda x: x[1]["timestamp"])
            del self.cache[oldest[0]]
            self.embeddings = [(h, e) for h, e in self.embeddings if h != oldest[0]]

# Usage with caching
cache = SemanticCache(similarity_threshold=0.92)

def cached_completion(prompt: str, model: str = "gpt-4o") -> str:
    """Get completion with semantic caching."""
    
    # Check cache
    cached = cache.get(prompt)
    if cached:
        print("Cache hit!")
        return cached
    
    # Call API
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    
    result = response.choices[0].message.content
    
    # Cache result
    cache.set(prompt, result)
    
    # Track cost
    tracker.record(model, response.usage.prompt_tokens, response.usage.completion_tokens)
    
    return result

Model Routing

from enum import Enum

class TaskComplexity(Enum):
    SIMPLE = "simple"      # Formatting, extraction, simple Q&A
    MEDIUM = "medium"      # Summarization, basic analysis
    COMPLEX = "complex"    # Reasoning, coding, creative writing

class CostOptimizedRouter:
    """Route requests to appropriate models based on complexity."""
    
    MODEL_MAP = {
        TaskComplexity.SIMPLE: "gpt-4o-mini",
        TaskComplexity.MEDIUM: "gpt-4o-mini",
        TaskComplexity.COMPLEX: "gpt-4o"
    }
    
    def __init__(self):
        self.classifier_model = "gpt-4o-mini"
    
    def classify_complexity(self, prompt: str) -> TaskComplexity:
        """Classify task complexity."""
        
        # Quick heuristics first
        prompt_lower = prompt.lower()
        
        # Simple tasks
        simple_indicators = ["format", "extract", "list", "convert", "translate"]
        if any(ind in prompt_lower for ind in simple_indicators) and len(prompt) < 500:
            return TaskComplexity.SIMPLE
        
        # Complex tasks
        complex_indicators = ["analyze", "reason", "code", "debug", "explain why", "compare"]
        if any(ind in prompt_lower for ind in complex_indicators) or len(prompt) > 2000:
            return TaskComplexity.COMPLEX
        
        return TaskComplexity.MEDIUM
    
    def route(self, prompt: str, force_model: str = None) -> str:
        """Route to appropriate model."""
        
        if force_model:
            return force_model
        
        complexity = self.classify_complexity(prompt)
        return self.MODEL_MAP[complexity]
    
    def complete(self, prompt: str, force_model: str = None) -> tuple[str, str]:
        """Get completion with automatic routing."""
        
        model = self.route(prompt, force_model)
        
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        result = response.choices[0].message.content
        
        tracker.record(
            model,
            response.usage.prompt_tokens,
            response.usage.completion_tokens
        )
        
        return result, model

# Usage
router = CostOptimizedRouter()

# Simple task -> gpt-4o-mini
result, model = router.complete("Format this JSON: {name: 'John', age: 30}")
print(f"Used {model}")  # gpt-4o-mini

# Complex task -> gpt-4o
result, model = router.complete("Analyze this code for security vulnerabilities and explain the reasoning...")
print(f"Used {model}")  # gpt-4o

Prompt Compression

import tiktoken

class PromptCompressor:
    """Reduce prompt token count while preserving meaning."""
    
    def __init__(self, model: str = "gpt-4o"):
        self.encoding = tiktoken.encoding_for_model(model)
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoding.encode(text))
    
    def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
        """Truncate text to max tokens."""
        tokens = self.encoding.encode(text)
        if len(tokens) <= max_tokens:
            return text
        return self.encoding.decode(tokens[:max_tokens])
    
    def compress_context(
        self,
        context: str,
        max_tokens: int,
        preserve_start: int = 500,
        preserve_end: int = 500
    ) -> str:
        """Compress context keeping start and end."""
        
        current_tokens = self.count_tokens(context)
        
        if current_tokens <= max_tokens:
            return context
        
        # Keep start and end, summarize middle
        start = self.truncate_to_tokens(context, preserve_start)
        
        # Get end portion
        tokens = self.encoding.encode(context)
        end = self.encoding.decode(tokens[-preserve_end:])
        
        # Calculate remaining budget for middle summary
        middle_budget = max_tokens - preserve_start - preserve_end - 50
        
        if middle_budget > 100:
            # Summarize middle
            middle_start = len(self.encoding.encode(start))
            middle_end = len(tokens) - preserve_end
            middle = self.encoding.decode(tokens[middle_start:middle_end])
            
            summary_response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{
                    "role": "user",
                    "content": f"Summarize in {middle_budget} tokens:\n{middle[:5000]}"
                }],
                max_tokens=middle_budget
            )
            middle_summary = summary_response.choices[0].message.content
            
            return f"{start}\n\n[...summarized...]\n{middle_summary}\n\n[...]\n{end}"
        
        return f"{start}\n\n[...truncated...]\n\n{end}"
    
    def remove_redundancy(self, messages: list[dict]) -> list[dict]:
        """Remove redundant content from conversation history."""
        
        compressed = []
        seen_content = set()
        
        for msg in messages:
            content = msg["content"]
            
            # Hash content to detect duplicates
            content_hash = hash(content[:200])
            
            if content_hash not in seen_content:
                compressed.append(msg)
                seen_content.add(content_hash)
            else:
                # Keep but truncate duplicate
                compressed.append({
                    "role": msg["role"],
                    "content": content[:100] + "... [duplicate content removed]"
                })
        
        return compressed

# Usage
compressor = PromptCompressor()

# Compress long context
long_document = "..." * 10000  # Very long document
compressed = compressor.compress_context(long_document, max_tokens=4000)

print(f"Original: {compressor.count_tokens(long_document)} tokens")
print(f"Compressed: {compressor.count_tokens(compressed)} tokens")

Complete Cost-Optimized Client

class OptimizedLLMClient:
    """Production LLM client with all cost optimizations."""
    
    def __init__(
        self,
        daily_budget: float = 100.0,
        cache_threshold: float = 0.92,
        enable_routing: bool = True,
        enable_caching: bool = True,
        enable_compression: bool = True
    ):
        self.client = OpenAI()
        self.tracker = CostTracker()
        self.tracker.daily_budget = daily_budget
        
        self.enable_routing = enable_routing
        self.enable_caching = enable_caching
        self.enable_compression = enable_compression
        
        if enable_caching:
            self.cache = SemanticCache(similarity_threshold=cache_threshold)
        
        if enable_routing:
            self.router = CostOptimizedRouter()
        
        if enable_compression:
            self.compressor = PromptCompressor()
    
    def complete(
        self,
        prompt: str,
        model: str = None,
        max_tokens: int = 1000,
        skip_cache: bool = False
    ) -> dict:
        """Get completion with all optimizations."""
        
        stats = {
            "cache_hit": False,
            "model_used": model,
            "original_tokens": 0,
            "compressed_tokens": 0,
            "cost": 0.0
        }
        
        # Check cache
        if self.enable_caching and not skip_cache:
            cached = self.cache.get(prompt)
            if cached:
                stats["cache_hit"] = True
                return {"content": cached, "stats": stats}
        
        # Route to model
        if self.enable_routing and not model:
            model = self.router.route(prompt)
        else:
            model = model or "gpt-4o"
        
        stats["model_used"] = model
        
        # Compress if needed
        if self.enable_compression:
            stats["original_tokens"] = self.compressor.count_tokens(prompt)
            
            if stats["original_tokens"] > 3000:
                prompt = self.compressor.compress_context(prompt, max_tokens=3000)
                stats["compressed_tokens"] = self.compressor.count_tokens(prompt)
        
        # Call API
        response = self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=max_tokens
        )
        
        result = response.choices[0].message.content
        
        # Track cost
        stats["cost"] = self.tracker.record(
            model,
            response.usage.prompt_tokens,
            response.usage.completion_tokens
        )
        
        # Cache result
        if self.enable_caching:
            self.cache.set(prompt, result)
        
        return {"content": result, "stats": stats}
    
    def get_cost_report(self) -> str:
        return self.tracker.report()

# Usage
llm = OptimizedLLMClient(
    daily_budget=50.0,
    enable_routing=True,
    enable_caching=True
)

# First call - cache miss, routed to appropriate model
result = llm.complete("What is machine learning?")
print(f"Cost: ${result['stats']['cost']:.6f}")

# Similar query - cache hit
result = llm.complete("Explain machine learning")
print(f"Cache hit: {result['stats']['cache_hit']}")

# Check costs
print(llm.get_cost_report())

References

Conclusion

LLM cost optimization is essential for sustainable production systems. Start with tracking—you can’t optimize what you don’t measure. Implement semantic caching for repetitive queries; even a 20% cache hit rate significantly reduces costs. Use model routing to send simple tasks to cheaper models; GPT-4o-mini handles most formatting and extraction tasks well at 1/15th the cost of GPT-4o. Compress long contexts to reduce token counts. Combine these techniques for multiplicative savings. Monitor daily costs and set alerts before hitting budget limits. The goal isn’t minimum cost—it’s optimal cost-to-quality ratio. Some tasks genuinely need expensive models, and that’s fine. The savings from optimizing routine tasks fund the premium calls that matter.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.