Introduction: Context windows are the lifeblood of LLM applications—they determine how much information your model can process at once. Even with 128K+ token models, you’ll hit limits when dealing with long documents, conversation histories, or multi-document RAG. Poor context management leads to truncated information, lost context, and degraded responses. This guide covers practical strategies for maximizing context utilization: smart truncation that preserves important content, sliding window approaches for conversations, content prioritization based on relevance, and compression techniques that maintain semantic meaning. Whether you’re building a document Q&A system, long-running agent, or multi-turn chatbot, these patterns will help you make the most of every token in your context window.

Token Counting and Limits
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
@dataclass
class TokenCount:
"""Token count result."""
total: int
by_section: dict[str, int] = field(default_factory=dict)
@dataclass
class ContextBudget:
"""Context window budget."""
max_tokens: int
reserved_for_output: int = 1000
reserved_for_system: int = 500
@property
def available_for_input(self) -> int:
"""Tokens available for input content."""
return self.max_tokens - self.reserved_for_output - self.reserved_for_system
class TokenCounter(ABC):
"""Abstract token counter."""
@abstractmethod
def count(self, text: str) -> int:
"""Count tokens in text."""
pass
@abstractmethod
def count_messages(self, messages: list[dict]) -> int:
"""Count tokens in message list."""
pass
class TiktokenCounter(TokenCounter):
"""Token counter using tiktoken."""
def __init__(self, model: str = "gpt-4"):
import tiktoken
try:
self.encoding = tiktoken.encoding_for_model(model)
except KeyError:
self.encoding = tiktoken.get_encoding("cl100k_base")
self.model = model
def count(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoding.encode(text))
def count_messages(self, messages: list[dict]) -> int:
"""Count tokens in messages with overhead."""
# Message overhead varies by model
tokens_per_message = 3 # <|start|>role<|end|>
tokens_per_name = 1
total = 0
for message in messages:
total += tokens_per_message
for key, value in message.items():
total += self.count(str(value))
if key == "name":
total += tokens_per_name
total += 3 # Reply priming
return total
class ApproximateCounter(TokenCounter):
"""Fast approximate token counter."""
def __init__(self, chars_per_token: float = 4.0):
self.chars_per_token = chars_per_token
def count(self, text: str) -> int:
"""Approximate token count."""
return int(len(text) / self.chars_per_token)
def count_messages(self, messages: list[dict]) -> int:
"""Approximate message token count."""
total = 0
for message in messages:
# Add overhead per message
total += 4
for value in message.values():
total += self.count(str(value))
return total
class ContextManager:
"""Manage context window usage."""
def __init__(
self,
budget: ContextBudget,
counter: TokenCounter = None
):
self.budget = budget
self.counter = counter or ApproximateCounter()
def check_fits(self, content: str) -> tuple[bool, int]:
"""Check if content fits in budget."""
tokens = self.counter.count(content)
fits = tokens <= self.budget.available_for_input
return fits, tokens
def get_usage(self, content: str) -> dict:
"""Get detailed usage stats."""
tokens = self.counter.count(content)
return {
"used": tokens,
"available": self.budget.available_for_input,
"remaining": self.budget.available_for_input - tokens,
"utilization": tokens / self.budget.available_for_input,
"fits": tokens <= self.budget.available_for_input
}
def allocate(
self,
sections: dict[str, str],
priorities: dict[str, int] = None
) -> dict[str, int]:
"""Allocate tokens to sections by priority."""
if priorities is None:
priorities = {name: 1 for name in sections}
# Count tokens per section
section_tokens = {
name: self.counter.count(content)
for name, content in sections.items()
}
total_needed = sum(section_tokens.values())
available = self.budget.available_for_input
if total_needed <= available:
# Everything fits
return section_tokens
# Need to allocate proportionally by priority
total_priority = sum(priorities.values())
allocations = {}
remaining = available
# Sort by priority (highest first)
sorted_sections = sorted(
sections.keys(),
key=lambda x: priorities.get(x, 1),
reverse=True
)
for name in sorted_sections:
priority_share = priorities.get(name, 1) / total_priority
max_allocation = int(available * priority_share)
actual = min(section_tokens[name], max_allocation, remaining)
allocations[name] = actual
remaining -= actual
return allocations
Smart Truncation
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class TruncationStrategy(Enum):
"""Truncation strategies."""
HEAD = "head" # Keep beginning
TAIL = "tail" # Keep end
MIDDLE = "middle" # Keep beginning and end
SMART = "smart" # Content-aware
class Truncator:
"""Smart text truncation."""
def __init__(self, counter: TokenCounter):
self.counter = counter
def truncate(
self,
text: str,
max_tokens: int,
strategy: TruncationStrategy = TruncationStrategy.SMART
) -> str:
"""Truncate text to fit token limit."""
current_tokens = self.counter.count(text)
if current_tokens <= max_tokens:
return text
if strategy == TruncationStrategy.HEAD:
return self._truncate_head(text, max_tokens)
elif strategy == TruncationStrategy.TAIL:
return self._truncate_tail(text, max_tokens)
elif strategy == TruncationStrategy.MIDDLE:
return self._truncate_middle(text, max_tokens)
else:
return self._truncate_smart(text, max_tokens)
def _truncate_head(self, text: str, max_tokens: int) -> str:
"""Keep beginning of text."""
# Binary search for cutoff point
words = text.split()
low, high = 0, len(words)
while low < high:
mid = (low + high + 1) // 2
candidate = " ".join(words[:mid])
if self.counter.count(candidate) <= max_tokens:
low = mid
else:
high = mid - 1
result = " ".join(words[:low])
return result + "..." if low < len(words) else result
def _truncate_tail(self, text: str, max_tokens: int) -> str:
"""Keep end of text."""
words = text.split()
low, high = 0, len(words)
while low < high:
mid = (low + high + 1) // 2
candidate = " ".join(words[-mid:])
if self.counter.count(candidate) <= max_tokens:
low = mid
else:
high = mid - 1
result = " ".join(words[-low:]) if low > 0 else ""
return "..." + result if low < len(words) else result
def _truncate_middle(self, text: str, max_tokens: int) -> str:
"""Keep beginning and end."""
# Reserve tokens for ellipsis
available = max_tokens - 5
head_tokens = available // 2
tail_tokens = available - head_tokens
head = self._truncate_head(text, head_tokens)
tail = self._truncate_tail(text, tail_tokens)
return f"{head}\n\n[...content truncated...]\n\n{tail}"
def _truncate_smart(self, text: str, max_tokens: int) -> str:
"""Content-aware truncation."""
# Split into paragraphs
paragraphs = text.split('\n\n')
if len(paragraphs) <= 2:
return self._truncate_middle(text, max_tokens)
# Score paragraphs by importance
scored = []
for i, para in enumerate(paragraphs):
score = self._score_paragraph(para, i, len(paragraphs))
scored.append((i, para, score))
# Sort by score
scored.sort(key=lambda x: x[2], reverse=True)
# Greedily add paragraphs
selected = []
used_tokens = 0
for idx, para, score in scored:
para_tokens = self.counter.count(para)
if used_tokens + para_tokens <= max_tokens:
selected.append((idx, para))
used_tokens += para_tokens
# Sort by original order
selected.sort(key=lambda x: x[0])
return "\n\n".join(para for _, para in selected)
def _score_paragraph(
self,
paragraph: str,
position: int,
total: int
) -> float:
"""Score paragraph importance."""
score = 0.0
# Position score (first and last are important)
if position == 0:
score += 2.0
elif position == total - 1:
score += 1.5
elif position == 1:
score += 1.0
# Length score (medium length preferred)
words = len(paragraph.split())
if 20 <= words <= 100:
score += 1.0
elif words > 100:
score += 0.5
# Content signals
important_markers = [
"important", "key", "main", "summary",
"conclusion", "result", "finding"
]
para_lower = paragraph.lower()
for marker in important_markers:
if marker in para_lower:
score += 0.5
return score
class SentenceTruncator:
"""Truncate at sentence boundaries."""
def __init__(self, counter: TokenCounter):
self.counter = counter
def truncate(
self,
text: str,
max_tokens: int,
keep_end: bool = False
) -> str:
"""Truncate at sentence boundaries."""
import re
# Split into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
if keep_end:
sentences = list(reversed(sentences))
selected = []
used_tokens = 0
for sentence in sentences:
sentence_tokens = self.counter.count(sentence)
if used_tokens + sentence_tokens <= max_tokens:
selected.append(sentence)
used_tokens += sentence_tokens
else:
break
if keep_end:
selected = list(reversed(selected))
result = " ".join(selected)
if len(selected) < len(sentences):
if keep_end:
result = "..." + result
else:
result = result + "..."
return result
Conversation Window Management
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from collections import deque
@dataclass
class Message:
"""A conversation message."""
role: str
content: str
timestamp: datetime = field(default_factory=datetime.now)
tokens: int = 0
metadata: dict = field(default_factory=dict)
class SlidingWindowManager:
"""Manage conversation with sliding window."""
def __init__(
self,
max_tokens: int,
counter: TokenCounter,
system_prompt: str = ""
):
self.max_tokens = max_tokens
self.counter = counter
self.system_prompt = system_prompt
self.system_tokens = counter.count(system_prompt)
self.messages: deque[Message] = deque()
self.total_tokens = self.system_tokens
def add_message(self, role: str, content: str) -> bool:
"""Add message to conversation."""
tokens = self.counter.count(content) + 4 # Message overhead
message = Message(
role=role,
content=content,
tokens=tokens
)
# Check if we need to evict old messages
while (self.total_tokens + tokens > self.max_tokens
and len(self.messages) > 0):
evicted = self.messages.popleft()
self.total_tokens -= evicted.tokens
self.messages.append(message)
self.total_tokens += tokens
return True
def get_messages(self) -> list[dict]:
"""Get messages for API call."""
result = []
if self.system_prompt:
result.append({
"role": "system",
"content": self.system_prompt
})
for msg in self.messages:
result.append({
"role": msg.role,
"content": msg.content
})
return result
def get_usage(self) -> dict:
"""Get window usage stats."""
return {
"total_tokens": self.total_tokens,
"max_tokens": self.max_tokens,
"message_count": len(self.messages),
"utilization": self.total_tokens / self.max_tokens
}
class SummarizingWindowManager:
"""Summarize old messages to save space."""
def __init__(
self,
max_tokens: int,
counter: TokenCounter,
summarizer: Any, # LLM client
summary_threshold: float = 0.8
):
self.max_tokens = max_tokens
self.counter = counter
self.summarizer = summarizer
self.summary_threshold = summary_threshold
self.messages: list[Message] = []
self.summary: str = ""
self.summary_tokens = 0
async def add_message(self, role: str, content: str):
"""Add message, summarizing if needed."""
tokens = self.counter.count(content) + 4
message = Message(
role=role,
content=content,
tokens=tokens
)
self.messages.append(message)
# Check if we need to summarize
total = self._get_total_tokens()
if total / self.max_tokens > self.summary_threshold:
await self._summarize_old_messages()
def _get_total_tokens(self) -> int:
"""Get total token count."""
return self.summary_tokens + sum(m.tokens for m in self.messages)
async def _summarize_old_messages(self):
"""Summarize older messages."""
if len(self.messages) < 4:
return
# Keep last 2 messages, summarize the rest
to_summarize = self.messages[:-2]
self.messages = self.messages[-2:]
# Build summary prompt
conversation = "\n".join(
f"{m.role}: {m.content}"
for m in to_summarize
)
prompt = f"""Summarize this conversation concisely, preserving key information:
{conversation}
Summary:"""
response = await self.summarizer.complete(prompt)
new_summary = response.content
# Combine with existing summary
if self.summary:
self.summary = f"{self.summary}\n\n{new_summary}"
else:
self.summary = new_summary
self.summary_tokens = self.counter.count(self.summary)
def get_messages(self) -> list[dict]:
"""Get messages for API call."""
result = []
if self.summary:
result.append({
"role": "system",
"content": f"Previous conversation summary:\n{self.summary}"
})
for msg in self.messages:
result.append({
"role": msg.role,
"content": msg.content
})
return result
class ImportanceBasedWindow:
"""Keep messages based on importance."""
def __init__(
self,
max_tokens: int,
counter: TokenCounter
):
self.max_tokens = max_tokens
self.counter = counter
self.messages: list[Message] = []
def add_message(
self,
role: str,
content: str,
importance: float = 1.0
):
"""Add message with importance score."""
tokens = self.counter.count(content) + 4
message = Message(
role=role,
content=content,
tokens=tokens,
metadata={"importance": importance}
)
self.messages.append(message)
self._prune_if_needed()
def _prune_if_needed(self):
"""Remove low-importance messages if over budget."""
total = sum(m.tokens for m in self.messages)
while total > self.max_tokens and len(self.messages) > 2:
# Find lowest importance message (not first or last)
candidates = self.messages[1:-1]
if not candidates:
break
lowest = min(
candidates,
key=lambda m: m.metadata.get("importance", 1.0)
)
self.messages.remove(lowest)
total -= lowest.tokens
def get_messages(self) -> list[dict]:
"""Get messages for API call."""
return [
{"role": m.role, "content": m.content}
for m in self.messages
]
RAG Context Optimization
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class RetrievedChunk:
"""A retrieved document chunk."""
content: str
score: float
source: str
tokens: int = 0
class RAGContextOptimizer:
"""Optimize context for RAG."""
def __init__(
self,
max_context_tokens: int,
counter: TokenCounter
):
self.max_tokens = max_context_tokens
self.counter = counter
def select_chunks(
self,
chunks: list[RetrievedChunk],
query: str
) -> list[RetrievedChunk]:
"""Select chunks that fit in context."""
# Count tokens for each chunk
for chunk in chunks:
chunk.tokens = self.counter.count(chunk.content)
# Greedy selection by score
selected = []
used_tokens = 0
# Sort by score
sorted_chunks = sorted(chunks, key=lambda c: c.score, reverse=True)
for chunk in sorted_chunks:
if used_tokens + chunk.tokens <= self.max_tokens:
selected.append(chunk)
used_tokens += chunk.tokens
return selected
def deduplicate_chunks(
self,
chunks: list[RetrievedChunk],
similarity_threshold: float = 0.9
) -> list[RetrievedChunk]:
"""Remove near-duplicate chunks."""
if len(chunks) <= 1:
return chunks
# Simple deduplication based on content overlap
unique = [chunks[0]]
for chunk in chunks[1:]:
is_duplicate = False
for existing in unique:
overlap = self._compute_overlap(chunk.content, existing.content)
if overlap > similarity_threshold:
is_duplicate = True
break
if not is_duplicate:
unique.append(chunk)
return unique
def _compute_overlap(self, text1: str, text2: str) -> float:
"""Compute word overlap ratio."""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union
def reorder_chunks(
self,
chunks: list[RetrievedChunk],
strategy: str = "score"
) -> list[RetrievedChunk]:
"""Reorder chunks for optimal context."""
if strategy == "score":
# Highest score first
return sorted(chunks, key=lambda c: c.score, reverse=True)
elif strategy == "lost_in_middle":
# Important at beginning and end
sorted_chunks = sorted(chunks, key=lambda c: c.score, reverse=True)
if len(sorted_chunks) <= 2:
return sorted_chunks
# Interleave: best at start, second best at end, etc.
result = []
for i, chunk in enumerate(sorted_chunks):
if i % 2 == 0:
result.append(chunk)
else:
result.insert(len(result) // 2, chunk)
return result
elif strategy == "chronological":
# By source order (if available)
return sorted(
chunks,
key=lambda c: c.source
)
return chunks
class ContextBuilder:
"""Build optimized context from multiple sources."""
def __init__(
self,
budget: ContextBudget,
counter: TokenCounter
):
self.budget = budget
self.counter = counter
def build(
self,
system_prompt: str,
retrieved_context: str,
conversation_history: list[dict],
user_query: str
) -> list[dict]:
"""Build optimized message list."""
# Calculate token budgets
system_tokens = self.counter.count(system_prompt)
query_tokens = self.counter.count(user_query)
available = self.budget.available_for_input
remaining = available - system_tokens - query_tokens - 50 # Buffer
# Allocate remaining between context and history
context_budget = int(remaining * 0.6)
history_budget = remaining - context_budget
# Truncate context if needed
truncator = Truncator(self.counter)
truncated_context = truncator.truncate(
retrieved_context,
context_budget,
TruncationStrategy.SMART
)
# Truncate history if needed
history_tokens = sum(
self.counter.count(m["content"]) + 4
for m in conversation_history
)
if history_tokens > history_budget:
# Keep most recent messages
kept_history = []
used = 0
for msg in reversed(conversation_history):
msg_tokens = self.counter.count(msg["content"]) + 4
if used + msg_tokens <= history_budget:
kept_history.insert(0, msg)
used += msg_tokens
else:
break
conversation_history = kept_history
# Build final message list
messages = [
{"role": "system", "content": system_prompt}
]
if truncated_context:
messages.append({
"role": "system",
"content": f"Relevant context:\n{truncated_context}"
})
messages.extend(conversation_history)
messages.append({
"role": "user",
"content": user_query
})
return messages
Long Document Processing
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
@dataclass
class DocumentSection:
"""A document section."""
title: str
content: str
level: int # Heading level
tokens: int = 0
class LongDocumentProcessor:
"""Process documents longer than context window."""
def __init__(
self,
max_tokens: int,
counter: TokenCounter,
llm_client: Any
):
self.max_tokens = max_tokens
self.counter = counter
self.llm = llm_client
def split_by_sections(self, document: str) -> list[DocumentSection]:
"""Split document into sections."""
import re
# Find markdown headers
header_pattern = r'^(#{1,6})\s+(.+)$'
lines = document.split('\n')
sections = []
current_section = DocumentSection(title="", content="", level=0)
for line in lines:
match = re.match(header_pattern, line)
if match:
# Save current section
if current_section.content.strip():
current_section.tokens = self.counter.count(current_section.content)
sections.append(current_section)
# Start new section
level = len(match.group(1))
title = match.group(2)
current_section = DocumentSection(
title=title,
content="",
level=level
)
else:
current_section.content += line + "\n"
# Don't forget last section
if current_section.content.strip():
current_section.tokens = self.counter.count(current_section.content)
sections.append(current_section)
return sections
async def map_reduce_summarize(
self,
document: str,
query: str = None
) -> str:
"""Summarize using map-reduce."""
sections = self.split_by_sections(document)
# Map: Summarize each section
section_summaries = []
for section in sections:
if section.tokens > self.max_tokens:
# Section too large, chunk it
chunks = self._chunk_text(section.content, self.max_tokens // 2)
chunk_summaries = []
for chunk in chunks:
summary = await self._summarize_chunk(chunk, query)
chunk_summaries.append(summary)
section_summary = "\n".join(chunk_summaries)
else:
section_summary = await self._summarize_chunk(
section.content,
query
)
section_summaries.append(f"## {section.title}\n{section_summary}")
# Reduce: Combine summaries
combined = "\n\n".join(section_summaries)
if self.counter.count(combined) > self.max_tokens:
# Need another reduction pass
return await self._final_reduce(combined, query)
return combined
async def _summarize_chunk(self, chunk: str, query: str = None) -> str:
"""Summarize a single chunk."""
if query:
prompt = f"""Summarize the following text, focusing on information relevant to: {query}
Text:
{chunk}
Summary:"""
else:
prompt = f"""Summarize the following text concisely:
Text:
{chunk}
Summary:"""
response = await self.llm.complete(prompt)
return response.content
async def _final_reduce(self, summaries: str, query: str = None) -> str:
"""Final reduction of summaries."""
prompt = f"""Combine these section summaries into a coherent overall summary:
{summaries}
Combined summary:"""
response = await self.llm.complete(prompt)
return response.content
def _chunk_text(self, text: str, max_tokens: int) -> list[str]:
"""Chunk text to fit token limit."""
truncator = SentenceTruncator(self.counter)
chunks = []
remaining = text
while remaining:
chunk = truncator.truncate(remaining, max_tokens)
chunks.append(chunk)
# Remove processed content
if len(chunk) >= len(remaining):
break
remaining = remaining[len(chunk):].strip()
return chunks
async def iterative_refinement(
self,
document: str,
query: str
) -> AsyncIterator[str]:
"""Process document iteratively, yielding partial results."""
sections = self.split_by_sections(document)
accumulated_context = ""
for section in sections:
# Process section with accumulated context
prompt = f"""Based on the document so far:
{accumulated_context}
And this new section:
{section.content}
Answer the question: {query}
Provide your current best answer:"""
response = await self.llm.complete(prompt)
yield response.content
# Update accumulated context
summary = await self._summarize_chunk(section.content, query)
accumulated_context += f"\n\n{section.title}: {summary}"
# Truncate if needed
if self.counter.count(accumulated_context) > self.max_tokens // 2:
accumulated_context = await self._summarize_chunk(
accumulated_context,
query
)
Production Context Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class TruncateRequest(BaseModel):
text: str
max_tokens: int
strategy: str = "smart"
class ContextRequest(BaseModel):
system_prompt: str
retrieved_context: str
conversation_history: list[dict]
user_query: str
max_tokens: int = 8000
class WindowRequest(BaseModel):
messages: list[dict]
max_tokens: int = 4000
new_message: Optional[dict] = None
# Initialize components
counter = ApproximateCounter()
truncator = Truncator(counter)
@app.post("/v1/truncate")
async def truncate_text(request: TruncateRequest) -> dict:
"""Truncate text to fit token limit."""
strategy = TruncationStrategy[request.strategy.upper()]
result = truncator.truncate(
request.text,
request.max_tokens,
strategy
)
return {
"truncated": result,
"original_tokens": counter.count(request.text),
"result_tokens": counter.count(result)
}
@app.post("/v1/context/build")
async def build_context(request: ContextRequest) -> dict:
"""Build optimized context."""
budget = ContextBudget(
max_tokens=request.max_tokens,
reserved_for_output=1000
)
builder = ContextBuilder(budget, counter)
messages = builder.build(
request.system_prompt,
request.retrieved_context,
request.conversation_history,
request.user_query
)
total_tokens = sum(
counter.count(m["content"]) + 4
for m in messages
)
return {
"messages": messages,
"total_tokens": total_tokens,
"utilization": total_tokens / budget.available_for_input
}
@app.post("/v1/window/manage")
async def manage_window(request: WindowRequest) -> dict:
"""Manage conversation window."""
window = SlidingWindowManager(
max_tokens=request.max_tokens,
counter=counter
)
# Add existing messages
for msg in request.messages:
window.add_message(msg["role"], msg["content"])
# Add new message if provided
if request.new_message:
window.add_message(
request.new_message["role"],
request.new_message["content"]
)
return {
"messages": window.get_messages(),
"usage": window.get_usage()
}
@app.post("/v1/tokens/count")
async def count_tokens(text: str) -> dict:
"""Count tokens in text."""
return {
"tokens": counter.count(text),
"characters": len(text)
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- OpenAI Tokenizer: https://platform.openai.com/tokenizer
- tiktoken: https://github.com/openai/tiktoken
- Lost in the Middle: https://arxiv.org/abs/2307.03172
- LangChain Context: https://python.langchain.com/docs/modules/memory/
Conclusion
Context window management is a critical skill for building effective LLM applications. Start with accurate token counting—approximate methods work for rough estimates, but use tiktoken for precise budgeting. Implement smart truncation that preserves important content rather than blindly cutting at character limits. For conversations, use sliding windows with summarization to maintain context across long sessions. In RAG systems, deduplicate and reorder chunks to maximize information density. Be aware of the "lost in the middle" phenomenon—models pay more attention to content at the beginning and end of context. For documents longer than your context window, use map-reduce patterns to process in chunks and combine results. Monitor your context utilization and adjust budgets based on actual usage patterns. The key insight is that context management isn't just about fitting content—it's about maximizing the signal-to-noise ratio in your context window. Every token should earn its place by contributing to better responses.
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.