Introduction: LLM APIs have strict rate limits—requests per minute, tokens per minute, and concurrent request caps. Hit these limits and your application grinds to a halt with 429 errors. Worse, aggressive retry logic can trigger longer cooldowns. Proper rate limiting isn’t just about staying under limits; it’s about maximizing throughput while gracefully handling bursts, prioritizing important requests, and providing good user experience even under load. This guide covers implementing robust rate limiting for LLM applications: token bucket algorithms, sliding windows, request queuing, exponential backoff, and production patterns for high-throughput systems.

Token Bucket Rate Limiter
import time
import threading
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
requests_per_minute: int = 60
tokens_per_minute: int = 90000
max_concurrent: int = 10
class TokenBucketLimiter:
"""Token bucket rate limiter for LLM APIs."""
def __init__(self, config: RateLimitConfig):
self.config = config
# Request bucket
self.request_tokens = config.requests_per_minute
self.request_capacity = config.requests_per_minute
self.request_refill_rate = config.requests_per_minute / 60 # per second
# Token bucket
self.token_tokens = config.tokens_per_minute
self.token_capacity = config.tokens_per_minute
self.token_refill_rate = config.tokens_per_minute / 60 # per second
# Concurrency
self.concurrent = 0
self.max_concurrent = config.max_concurrent
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_refill
self.request_tokens = min(
self.request_capacity,
self.request_tokens + elapsed * self.request_refill_rate
)
self.token_tokens = min(
self.token_capacity,
self.token_tokens + elapsed * self.token_refill_rate
)
self.last_refill = now
def acquire(self, estimated_tokens: int = 1000, timeout: float = 60) -> bool:
"""Try to acquire rate limit tokens."""
deadline = time.time() + timeout
while time.time() < deadline:
with self.lock:
self._refill()
# Check all limits
if (self.request_tokens >= 1 and
self.token_tokens >= estimated_tokens and
self.concurrent < self.max_concurrent):
self.request_tokens -= 1
self.token_tokens -= estimated_tokens
self.concurrent += 1
return True
# Wait and retry
time.sleep(0.1)
return False
def release(self, actual_tokens: int = 0):
"""Release after request completes."""
with self.lock:
self.concurrent -= 1
# Adjust token count if we overestimated
# (tokens are already consumed, this is just tracking)
def wait_time(self, estimated_tokens: int = 1000) -> float:
"""Estimate wait time until request can proceed."""
with self.lock:
self._refill()
wait_for_request = max(0, (1 - self.request_tokens) / self.request_refill_rate)
wait_for_tokens = max(0, (estimated_tokens - self.token_tokens) / self.token_refill_rate)
return max(wait_for_request, wait_for_tokens)
# Usage
limiter = TokenBucketLimiter(RateLimitConfig(
requests_per_minute=60,
tokens_per_minute=90000,
max_concurrent=5
))
def rate_limited_completion(prompt: str) -> str:
"""Make rate-limited LLM call."""
from openai import OpenAI
client = OpenAI()
estimated_tokens = len(prompt) // 4 + 500
if not limiter.acquire(estimated_tokens, timeout=30):
raise Exception("Rate limit timeout")
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
actual_tokens = response.usage.total_tokens
limiter.release(actual_tokens)
return response.choices[0].message.content
except Exception as e:
limiter.release()
raise
Sliding Window Rate Limiter
from collections import deque
from dataclasses import dataclass
import time
import threading
class SlidingWindowLimiter:
"""Sliding window rate limiter with precise tracking."""
def __init__(self, requests_per_minute: int, tokens_per_minute: int):
self.rpm_limit = requests_per_minute
self.tpm_limit = tokens_per_minute
# Track timestamps and token counts
self.request_times: deque = deque()
self.token_usage: deque = deque() # (timestamp, tokens)
self.lock = threading.Lock()
def _cleanup(self):
"""Remove entries older than 1 minute."""
cutoff = time.time() - 60
while self.request_times and self.request_times[0] < cutoff:
self.request_times.popleft()
while self.token_usage and self.token_usage[0][0] < cutoff:
self.token_usage.popleft()
def can_proceed(self, estimated_tokens: int) -> tuple[bool, float]:
"""Check if request can proceed. Returns (can_proceed, wait_time)."""
with self.lock:
self._cleanup()
current_rpm = len(self.request_times)
current_tpm = sum(tokens for _, tokens in self.token_usage)
if current_rpm >= self.rpm_limit:
# Wait until oldest request expires
wait_time = self.request_times[0] + 60 - time.time()
return False, max(0, wait_time)
if current_tpm + estimated_tokens > self.tpm_limit:
# Wait until enough tokens free up
wait_time = self.token_usage[0][0] + 60 - time.time()
return False, max(0, wait_time)
return True, 0
def record(self, tokens_used: int):
"""Record a completed request."""
with self.lock:
now = time.time()
self.request_times.append(now)
self.token_usage.append((now, tokens_used))
def get_usage(self) -> dict:
"""Get current usage stats."""
with self.lock:
self._cleanup()
return {
"requests_used": len(self.request_times),
"requests_limit": self.rpm_limit,
"tokens_used": sum(t for _, t in self.token_usage),
"tokens_limit": self.tpm_limit
}
# Async-friendly sliding window
import asyncio
class AsyncSlidingWindowLimiter:
"""Async sliding window rate limiter."""
def __init__(self, requests_per_minute: int, tokens_per_minute: int):
self.rpm_limit = requests_per_minute
self.tpm_limit = tokens_per_minute
self.request_times: deque = deque()
self.token_usage: deque = deque()
self.lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int, timeout: float = 60):
"""Acquire rate limit slot, waiting if necessary."""
deadline = time.time() + timeout
while time.time() < deadline:
async with self.lock:
self._cleanup()
current_rpm = len(self.request_times)
current_tpm = sum(t for _, t in self.token_usage)
if current_rpm < self.rpm_limit and current_tpm + estimated_tokens <= self.tpm_limit:
now = time.time()
self.request_times.append(now)
self.token_usage.append((now, estimated_tokens))
return True
await asyncio.sleep(0.1)
raise TimeoutError("Rate limit acquisition timeout")
def _cleanup(self):
cutoff = time.time() - 60
while self.request_times and self.request_times[0] < cutoff:
self.request_times.popleft()
while self.token_usage and self.token_usage[0][0] < cutoff:
self.token_usage.popleft()
Request Queue with Priority
import heapq
from dataclasses import dataclass, field
from typing import Any, Callable
import threading
import queue
@dataclass(order=True)
class PrioritizedRequest:
priority: int
timestamp: float = field(compare=False)
request_id: str = field(compare=False)
callback: Callable = field(compare=False)
args: tuple = field(compare=False)
kwargs: dict = field(compare=False)
class PriorityRequestQueue:
"""Priority queue for rate-limited requests."""
def __init__(self, limiter: TokenBucketLimiter, workers: int = 3):
self.limiter = limiter
self.queue: list = []
self.lock = threading.Lock()
self.workers = workers
self.running = False
self.results: dict = {}
def submit(
self,
callback: Callable,
*args,
priority: int = 5, # 1 = highest, 10 = lowest
**kwargs
) -> str:
"""Submit a request to the queue."""
import uuid
request_id = str(uuid.uuid4())
request = PrioritizedRequest(
priority=priority,
timestamp=time.time(),
request_id=request_id,
callback=callback,
args=args,
kwargs=kwargs
)
with self.lock:
heapq.heappush(self.queue, request)
return request_id
def start(self):
"""Start worker threads."""
self.running = True
for i in range(self.workers):
thread = threading.Thread(target=self._worker, daemon=True)
thread.start()
def stop(self):
"""Stop workers."""
self.running = False
def _worker(self):
"""Worker thread that processes queue."""
while self.running:
request = None
with self.lock:
if self.queue:
request = heapq.heappop(self.queue)
if request is None:
time.sleep(0.1)
continue
# Wait for rate limit
estimated_tokens = request.kwargs.get("estimated_tokens", 1000)
if self.limiter.acquire(estimated_tokens, timeout=60):
try:
result = request.callback(*request.args, **request.kwargs)
self.results[request.request_id] = {"success": True, "result": result}
except Exception as e:
self.results[request.request_id] = {"success": False, "error": str(e)}
finally:
self.limiter.release()
else:
# Timeout - requeue with lower priority
request.priority += 1
with self.lock:
heapq.heappush(self.queue, request)
def get_result(self, request_id: str, timeout: float = 60) -> Any:
"""Wait for and get result."""
deadline = time.time() + timeout
while time.time() < deadline:
if request_id in self.results:
return self.results.pop(request_id)
time.sleep(0.1)
raise TimeoutError("Result not available")
# Usage
limiter = TokenBucketLimiter(RateLimitConfig())
queue = PriorityRequestQueue(limiter)
queue.start()
# Submit requests with different priorities
high_priority_id = queue.submit(
rate_limited_completion,
"Important query",
priority=1
)
low_priority_id = queue.submit(
rate_limited_completion,
"Background task",
priority=10
)
# Get results
result = queue.get_result(high_priority_id)
Exponential Backoff with Jitter
import random
from functools import wraps
def exponential_backoff(
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""Decorator for exponential backoff with jitter."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
error_str = str(e).lower()
# Check if retryable
retryable = any(msg in error_str for msg in [
"rate limit", "429", "too many requests",
"overloaded", "timeout", "503", "529"
])
if not retryable:
raise
if attempt == max_retries - 1:
raise
# Calculate delay
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
# Add jitter
if jitter:
delay = delay * (0.5 + random.random())
print(f"Rate limited, retrying in {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
@exponential_backoff(max_retries=5, base_delay=1.0)
def resilient_completion(prompt: str) -> str:
"""LLM call with automatic retry."""
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Async version
async def async_exponential_backoff(
func,
*args,
max_retries: int = 5,
base_delay: float = 1.0,
**kwargs
):
"""Async exponential backoff."""
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
if "rate limit" not in str(e).lower():
raise
delay = base_delay * (2 ** attempt) * (0.5 + random.random())
await asyncio.sleep(delay)
Production Rate Limiting
from dataclasses import dataclass
import redis
class RedisRateLimiter:
"""Distributed rate limiter using Redis."""
def __init__(self, redis_client: redis.Redis, key_prefix: str = "ratelimit"):
self.redis = redis_client
self.prefix = key_prefix
def check_rate_limit(
self,
identifier: str, # user_id, api_key, etc.
requests_per_minute: int,
tokens_per_minute: int,
estimated_tokens: int
) -> tuple[bool, dict]:
"""Check if request is within rate limits."""
now = int(time.time())
minute_key = f"{self.prefix}:{identifier}:{now // 60}"
pipe = self.redis.pipeline()
# Get current counts
pipe.hget(minute_key, "requests")
pipe.hget(minute_key, "tokens")
results = pipe.execute()
current_requests = int(results[0] or 0)
current_tokens = int(results[1] or 0)
# Check limits
if current_requests >= requests_per_minute:
return False, {
"reason": "requests_per_minute",
"current": current_requests,
"limit": requests_per_minute,
"retry_after": 60 - (now % 60)
}
if current_tokens + estimated_tokens > tokens_per_minute:
return False, {
"reason": "tokens_per_minute",
"current": current_tokens,
"limit": tokens_per_minute,
"retry_after": 60 - (now % 60)
}
return True, {}
def record_usage(self, identifier: str, tokens_used: int):
"""Record usage after successful request."""
now = int(time.time())
minute_key = f"{self.prefix}:{identifier}:{now // 60}"
pipe = self.redis.pipeline()
pipe.hincrby(minute_key, "requests", 1)
pipe.hincrby(minute_key, "tokens", tokens_used)
pipe.expire(minute_key, 120) # Expire after 2 minutes
pipe.execute()
# FastAPI middleware for rate limiting
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
redis_client = redis.Redis()
rate_limiter = RedisRateLimiter(redis_client)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""Rate limiting middleware."""
# Get identifier (API key, user ID, IP)
identifier = request.headers.get("X-API-Key") or request.client.host
# Estimate tokens from request body
body = await request.body()
estimated_tokens = len(body) // 4 + 500
# Check rate limit
allowed, info = rate_limiter.check_rate_limit(
identifier=identifier,
requests_per_minute=60,
tokens_per_minute=90000,
estimated_tokens=estimated_tokens
)
if not allowed:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded", **info},
headers={"Retry-After": str(info.get("retry_after", 60))}
)
response = await call_next(request)
# Record usage (would need actual token count from response)
rate_limiter.record_usage(identifier, estimated_tokens)
return response
References
- OpenAI Rate Limits: https://platform.openai.com/docs/guides/rate-limits
- Anthropic Rate Limits: https://docs.anthropic.com/claude/reference/rate-limits
- Token Bucket Algorithm: https://en.wikipedia.org/wiki/Token_bucket
- Redis Rate Limiting: https://redis.io/docs/manual/patterns/rate-limiting/
Conclusion
Rate limiting is the unsung hero of production LLM applications. Without it, traffic spikes trigger cascading 429 errors, aggressive retries make things worse, and your application becomes unreliable. A well-designed rate limiter smooths traffic, prioritizes important requests, and gracefully degrades under load. Start with a simple token bucket for single-instance applications. Add sliding windows for more precise tracking. Implement request queues with priority for complex workloads. Use Redis for distributed rate limiting across multiple instances. Always implement exponential backoff with jitter for retries—it's the difference between recovering gracefully and hammering an already-stressed API. Monitor your rate limit usage, set alerts before hitting limits, and consider tiered limits for different user classes. The goal isn't just staying under limits—it's maximizing throughput while providing consistent, reliable service.
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.