Introduction: RAG systems live or die by their chunking strategy. Chunk too large and you waste context window space with irrelevant content. Chunk too small and you lose semantic coherence, making it hard for the LLM to understand context. The right chunking strategy depends on your document types, query patterns, and retrieval approach. This guide covers practical chunking techniques: fixed-size chunking for simplicity, recursive splitting that respects document structure, semantic chunking that groups related content, and specialized strategies for code, tables, and structured documents. Whether you’re building a knowledge base, document Q&A system, or code search, choosing the right chunking strategy can dramatically improve retrieval quality.

Fixed-Size Chunking
from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod
@dataclass
class Chunk:
"""A document chunk."""
content: str
metadata: dict = field(default_factory=dict)
start_index: int = 0
end_index: int = 0
chunk_index: int = 0
@property
def token_count(self) -> int:
"""Estimate token count."""
return len(self.content) // 4
class Chunker(ABC):
"""Abstract document chunker."""
@abstractmethod
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split text into chunks."""
pass
class FixedSizeChunker(Chunker):
"""Fixed-size character chunking."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split into fixed-size chunks."""
chunks = []
start = 0
chunk_index = 0
while start < len(text):
end = start + self.chunk_size
# Don't exceed text length
if end > len(text):
end = len(text)
chunk_content = text[start:end]
chunks.append(Chunk(
content=chunk_content,
metadata=metadata or {},
start_index=start,
end_index=end,
chunk_index=chunk_index
))
# Move start, accounting for overlap
start = end - self.chunk_overlap
chunk_index += 1
# Prevent infinite loop
if start >= len(text):
break
return chunks
class TokenChunker(Chunker):
"""Token-based chunking."""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 50,
tokenizer: Any = None
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.tokenizer = tokenizer
def _tokenize(self, text: str) -> list[str]:
"""Tokenize text."""
if self.tokenizer:
return self.tokenizer.encode(text)
# Simple word tokenization
return text.split()
def _detokenize(self, tokens: list) -> str:
"""Convert tokens back to text."""
if self.tokenizer:
return self.tokenizer.decode(tokens)
return " ".join(tokens)
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split into token-based chunks."""
tokens = self._tokenize(text)
chunks = []
start = 0
chunk_index = 0
while start < len(tokens):
end = min(start + self.chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_content = self._detokenize(chunk_tokens)
chunks.append(Chunk(
content=chunk_content,
metadata=metadata or {},
start_index=start,
end_index=end,
chunk_index=chunk_index
))
start = end - self.chunk_overlap
chunk_index += 1
if start >= len(tokens):
break
return chunks
class SentenceChunker(Chunker):
"""Sentence-aware chunking."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
min_chunk_size: int = 100
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
def _split_sentences(self, text: str) -> list[str]:
"""Split text into sentences."""
import re
# Split on sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split into sentence-aligned chunks."""
sentences = self._split_sentences(text)
chunks = []
current_chunk = []
current_length = 0
chunk_index = 0
for sentence in sentences:
sentence_length = len(sentence)
# If adding this sentence exceeds chunk size
if current_length + sentence_length > self.chunk_size and current_chunk:
# Save current chunk
chunk_content = " ".join(current_chunk)
chunks.append(Chunk(
content=chunk_content,
metadata=metadata or {},
chunk_index=chunk_index
))
chunk_index += 1
# Start new chunk with overlap
overlap_sentences = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) <= self.chunk_overlap:
overlap_sentences.insert(0, s)
overlap_length += len(s)
else:
break
current_chunk = overlap_sentences
current_length = overlap_length
current_chunk.append(sentence)
current_length += sentence_length
# Don't forget last chunk
if current_chunk:
chunk_content = " ".join(current_chunk)
if len(chunk_content) >= self.min_chunk_size:
chunks.append(Chunk(
content=chunk_content,
metadata=metadata or {},
chunk_index=chunk_index
))
return chunks
Recursive Chunking
from dataclasses import dataclass
from typing import Any, Optional
class RecursiveChunker(Chunker):
"""Recursively split on different separators."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: list[str] = None
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Default separators in order of preference
self.separators = separators or [
"\n\n", # Paragraphs
"\n", # Lines
". ", # Sentences
", ", # Clauses
" ", # Words
"" # Characters
]
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Recursively split text."""
chunks = self._split_recursive(text, self.separators)
# Add metadata and indices
result = []
for i, chunk_content in enumerate(chunks):
result.append(Chunk(
content=chunk_content,
metadata=metadata or {},
chunk_index=i
))
return result
def _split_recursive(
self,
text: str,
separators: list[str]
) -> list[str]:
"""Recursively split on separators."""
if not text:
return []
# If text fits in chunk, return it
if len(text) <= self.chunk_size:
return [text]
# Try each separator
for separator in separators:
if separator == "":
# Character-level split as last resort
return self._split_by_size(text)
if separator in text:
splits = text.split(separator)
# Merge small splits
chunks = self._merge_splits(splits, separator)
# Recursively process chunks that are still too large
result = []
remaining_separators = separators[separators.index(separator) + 1:]
for chunk in chunks:
if len(chunk) > self.chunk_size:
result.extend(
self._split_recursive(chunk, remaining_separators)
)
else:
result.append(chunk)
return result
# No separator found, split by size
return self._split_by_size(text)
def _merge_splits(
self,
splits: list[str],
separator: str
) -> list[str]:
"""Merge small splits into chunks."""
chunks = []
current = []
current_length = 0
for split in splits:
split_length = len(split) + len(separator)
if current_length + split_length > self.chunk_size and current:
chunks.append(separator.join(current))
# Overlap
overlap = []
overlap_length = 0
for s in reversed(current):
if overlap_length + len(s) <= self.chunk_overlap:
overlap.insert(0, s)
overlap_length += len(s)
else:
break
current = overlap
current_length = overlap_length
current.append(split)
current_length += split_length
if current:
chunks.append(separator.join(current))
return chunks
def _split_by_size(self, text: str) -> list[str]:
"""Split by character size."""
chunks = []
start = 0
while start < len(text):
end = min(start + self.chunk_size, len(text))
chunks.append(text[start:end])
start = end - self.chunk_overlap
return chunks
class MarkdownChunker(Chunker):
"""Chunk markdown documents by structure."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
include_headers: bool = True
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.include_headers = include_headers
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split markdown by headers."""
import re
# Split by headers
header_pattern = r'^(#{1,6})\s+(.+)$'
lines = text.split('\n')
sections = []
current_section = {"headers": [], "content": []}
for line in lines:
header_match = re.match(header_pattern, line)
if header_match:
# Save current section
if current_section["content"]:
sections.append(current_section)
level = len(header_match.group(1))
header_text = header_match.group(2)
# Start new section
current_section = {
"headers": current_section["headers"][:level-1] + [header_text],
"content": []
}
else:
current_section["content"].append(line)
# Don't forget last section
if current_section["content"]:
sections.append(current_section)
# Convert sections to chunks
chunks = []
chunk_index = 0
for section in sections:
content = "\n".join(section["content"]).strip()
if not content:
continue
# Add header context if enabled
if self.include_headers and section["headers"]:
header_context = " > ".join(section["headers"])
content = f"[{header_context}]\n\n{content}"
# Split large sections
if len(content) > self.chunk_size:
sub_chunker = RecursiveChunker(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap
)
sub_chunks = sub_chunker.chunk(content)
for sub_chunk in sub_chunks:
sub_chunk.chunk_index = chunk_index
sub_chunk.metadata = {
**(metadata or {}),
"headers": section["headers"]
}
chunks.append(sub_chunk)
chunk_index += 1
else:
chunks.append(Chunk(
content=content,
metadata={
**(metadata or {}),
"headers": section["headers"]
},
chunk_index=chunk_index
))
chunk_index += 1
return chunks
Semantic Chunking
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
class SemanticChunker(Chunker):
"""Chunk based on semantic similarity."""
def __init__(
self,
embedding_model: Any,
similarity_threshold: float = 0.5,
min_chunk_size: int = 100,
max_chunk_size: int = 2000
):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
async def chunk_async(
self,
text: str,
metadata: dict = None
) -> list[Chunk]:
"""Split based on semantic boundaries."""
# Split into sentences
sentences = self._split_sentences(text)
if len(sentences) <= 1:
return [Chunk(content=text, metadata=metadata or {})]
# Get embeddings for all sentences
embeddings = await self.embedding_model.embed(sentences)
embeddings = np.array(embeddings)
# Find semantic boundaries
boundaries = self._find_boundaries(embeddings)
# Create chunks from boundaries
chunks = []
chunk_index = 0
start = 0
for boundary in boundaries:
chunk_sentences = sentences[start:boundary + 1]
chunk_content = " ".join(chunk_sentences)
# Respect size limits
if len(chunk_content) > self.max_chunk_size:
# Split large chunk
sub_chunks = self._split_large_chunk(chunk_sentences)
for sub_content in sub_chunks:
chunks.append(Chunk(
content=sub_content,
metadata=metadata or {},
chunk_index=chunk_index
))
chunk_index += 1
elif len(chunk_content) >= self.min_chunk_size:
chunks.append(Chunk(
content=chunk_content,
metadata=metadata or {},
chunk_index=chunk_index
))
chunk_index += 1
start = boundary + 1
# Handle remaining sentences
if start < len(sentences):
remaining = " ".join(sentences[start:])
if len(remaining) >= self.min_chunk_size:
chunks.append(Chunk(
content=remaining,
metadata=metadata or {},
chunk_index=chunk_index
))
return chunks
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Sync wrapper."""
import asyncio
return asyncio.run(self.chunk_async(text, metadata))
def _split_sentences(self, text: str) -> list[str]:
"""Split into sentences."""
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def _find_boundaries(self, embeddings: np.ndarray) -> list[int]:
"""Find semantic boundaries using similarity drops."""
boundaries = []
for i in range(len(embeddings) - 1):
# Cosine similarity between consecutive sentences
sim = np.dot(embeddings[i], embeddings[i + 1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i + 1])
)
# If similarity drops below threshold, mark boundary
if sim < self.similarity_threshold:
boundaries.append(i)
return boundaries
def _split_large_chunk(self, sentences: list[str]) -> list[str]:
"""Split large chunk into smaller pieces."""
chunks = []
current = []
current_length = 0
for sentence in sentences:
if current_length + len(sentence) > self.max_chunk_size and current:
chunks.append(" ".join(current))
current = []
current_length = 0
current.append(sentence)
current_length += len(sentence)
if current:
chunks.append(" ".join(current))
return chunks
class ClusteringChunker(Chunker):
"""Chunk using clustering on sentence embeddings."""
def __init__(
self,
embedding_model: Any,
num_clusters: int = None,
min_cluster_size: int = 3
):
self.embedding_model = embedding_model
self.num_clusters = num_clusters
self.min_cluster_size = min_cluster_size
async def chunk_async(
self,
text: str,
metadata: dict = None
) -> list[Chunk]:
"""Cluster sentences into chunks."""
from sklearn.cluster import KMeans
sentences = self._split_sentences(text)
if len(sentences) <= self.min_cluster_size:
return [Chunk(content=text, metadata=metadata or {})]
# Get embeddings
embeddings = await self.embedding_model.embed(sentences)
embeddings = np.array(embeddings)
# Determine number of clusters
n_clusters = self.num_clusters or max(2, len(sentences) // 5)
n_clusters = min(n_clusters, len(sentences))
# Cluster
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings)
# Group sentences by cluster
clusters = {}
for i, label in enumerate(labels):
if label not in clusters:
clusters[label] = []
clusters[label].append((i, sentences[i]))
# Create chunks from clusters (maintaining order within cluster)
chunks = []
chunk_index = 0
for label in sorted(clusters.keys()):
cluster_sentences = clusters[label]
cluster_sentences.sort(key=lambda x: x[0]) # Sort by original order
chunk_content = " ".join(s for _, s in cluster_sentences)
chunks.append(Chunk(
content=chunk_content,
metadata={
**(metadata or {}),
"cluster": int(label)
},
chunk_index=chunk_index
))
chunk_index += 1
return chunks
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Sync wrapper."""
import asyncio
return asyncio.run(self.chunk_async(text, metadata))
def _split_sentences(self, text: str) -> list[str]:
"""Split into sentences."""
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
Specialized Chunkers
from dataclasses import dataclass
from typing import Any, Optional
import re
class CodeChunker(Chunker):
"""Chunk code by logical units."""
def __init__(
self,
chunk_size: int = 1500,
language: str = "python"
):
self.chunk_size = chunk_size
self.language = language
# Language-specific patterns
self.patterns = {
"python": {
"class": r'^class\s+\w+',
"function": r'^def\s+\w+',
"async_function": r'^async\s+def\s+\w+'
},
"javascript": {
"class": r'^class\s+\w+',
"function": r'^function\s+\w+',
"arrow": r'^const\s+\w+\s*=\s*\([^)]*\)\s*=>'
},
"java": {
"class": r'^(public|private|protected)?\s*class\s+\w+',
"method": r'^(public|private|protected)?\s*(static)?\s*\w+\s+\w+\s*\('
}
}
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split code into logical chunks."""
lines = text.split('\n')
chunks = []
current_chunk = []
current_context = None
chunk_index = 0
patterns = self.patterns.get(self.language, self.patterns["python"])
for line in lines:
# Check for new logical unit
is_boundary = False
new_context = None
for unit_type, pattern in patterns.items():
if re.match(pattern, line.strip()):
is_boundary = True
new_context = f"{unit_type}: {line.strip()[:50]}"
break
# Start new chunk at boundary if current is large enough
if is_boundary and current_chunk:
chunk_content = '\n'.join(current_chunk)
if len(chunk_content) > 50: # Min size
chunks.append(Chunk(
content=chunk_content,
metadata={
**(metadata or {}),
"context": current_context,
"language": self.language
},
chunk_index=chunk_index
))
chunk_index += 1
current_chunk = []
current_chunk.append(line)
if new_context:
current_context = new_context
# Force split if too large
chunk_content = '\n'.join(current_chunk)
if len(chunk_content) > self.chunk_size:
chunks.append(Chunk(
content=chunk_content,
metadata={
**(metadata or {}),
"context": current_context,
"language": self.language
},
chunk_index=chunk_index
))
chunk_index += 1
current_chunk = []
# Don't forget last chunk
if current_chunk:
chunk_content = '\n'.join(current_chunk)
chunks.append(Chunk(
content=chunk_content,
metadata={
**(metadata or {}),
"context": current_context,
"language": self.language
},
chunk_index=chunk_index
))
return chunks
class TableChunker(Chunker):
"""Chunk documents with tables."""
def __init__(
self,
chunk_size: int = 1000,
preserve_tables: bool = True
):
self.chunk_size = chunk_size
self.preserve_tables = preserve_tables
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split while preserving tables."""
# Find tables (markdown format)
table_pattern = r'(\|[^\n]+\|\n)+'
chunks = []
chunk_index = 0
last_end = 0
for match in re.finditer(table_pattern, text):
# Chunk text before table
before_text = text[last_end:match.start()].strip()
if before_text:
text_chunks = self._chunk_text(before_text, metadata)
for tc in text_chunks:
tc.chunk_index = chunk_index
chunks.append(tc)
chunk_index += 1
# Handle table
table_text = match.group(0)
if self.preserve_tables or len(table_text) <= self.chunk_size:
# Keep table as single chunk
chunks.append(Chunk(
content=table_text,
metadata={
**(metadata or {}),
"type": "table"
},
chunk_index=chunk_index
))
chunk_index += 1
else:
# Split large table by rows
rows = table_text.strip().split('\n')
header = rows[0] if rows else ""
current_rows = [header]
current_length = len(header)
for row in rows[1:]:
if current_length + len(row) > self.chunk_size and len(current_rows) > 1:
chunks.append(Chunk(
content='\n'.join(current_rows),
metadata={
**(metadata or {}),
"type": "table_part"
},
chunk_index=chunk_index
))
chunk_index += 1
current_rows = [header]
current_length = len(header)
current_rows.append(row)
current_length += len(row)
if len(current_rows) > 1:
chunks.append(Chunk(
content='\n'.join(current_rows),
metadata={
**(metadata or {}),
"type": "table_part"
},
chunk_index=chunk_index
))
chunk_index += 1
last_end = match.end()
# Chunk remaining text
remaining = text[last_end:].strip()
if remaining:
text_chunks = self._chunk_text(remaining, metadata)
for tc in text_chunks:
tc.chunk_index = chunk_index
chunks.append(tc)
chunk_index += 1
return chunks
def _chunk_text(self, text: str, metadata: dict) -> list[Chunk]:
"""Chunk regular text."""
chunker = RecursiveChunker(chunk_size=self.chunk_size)
return chunker.chunk(text, metadata)
class HTMLChunker(Chunker):
"""Chunk HTML documents by structure."""
def __init__(
self,
chunk_size: int = 1000,
preserve_structure: bool = True
):
self.chunk_size = chunk_size
self.preserve_structure = preserve_structure
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split HTML by semantic elements."""
from html.parser import HTMLParser
# Simple extraction of text content
class TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.sections = []
self.current_section = {"tag": None, "content": []}
self.section_tags = {"h1", "h2", "h3", "h4", "p", "div", "section", "article"}
def handle_starttag(self, tag, attrs):
if tag in self.section_tags:
if self.current_section["content"]:
self.sections.append(self.current_section)
self.current_section = {"tag": tag, "content": []}
def handle_data(self, data):
text = data.strip()
if text:
self.current_section["content"].append(text)
def handle_endtag(self, tag):
pass
extractor = TextExtractor()
extractor.feed(text)
if extractor.current_section["content"]:
extractor.sections.append(extractor.current_section)
# Convert sections to chunks
chunks = []
chunk_index = 0
for section in extractor.sections:
content = " ".join(section["content"])
if not content:
continue
if len(content) > self.chunk_size:
# Split large sections
sub_chunker = RecursiveChunker(chunk_size=self.chunk_size)
sub_chunks = sub_chunker.chunk(content)
for sc in sub_chunks:
sc.chunk_index = chunk_index
sc.metadata = {
**(metadata or {}),
"html_tag": section["tag"]
}
chunks.append(sc)
chunk_index += 1
else:
chunks.append(Chunk(
content=content,
metadata={
**(metadata or {}),
"html_tag": section["tag"]
},
chunk_index=chunk_index
))
chunk_index += 1
return chunks
Chunking Pipeline
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class DocumentType(Enum):
"""Document types."""
TEXT = "text"
MARKDOWN = "markdown"
CODE = "code"
HTML = "html"
PDF = "pdf"
@dataclass
class ChunkingConfig:
"""Chunking configuration."""
chunk_size: int = 1000
chunk_overlap: int = 200
strategy: str = "recursive" # fixed, recursive, semantic
preserve_structure: bool = True
class ChunkingPipeline:
"""Complete chunking pipeline."""
def __init__(
self,
config: ChunkingConfig = None,
embedding_model: Any = None
):
self.config = config or ChunkingConfig()
self.embedding_model = embedding_model
# Initialize chunkers
self.chunkers = {
DocumentType.TEXT: RecursiveChunker(
chunk_size=self.config.chunk_size,
chunk_overlap=self.config.chunk_overlap
),
DocumentType.MARKDOWN: MarkdownChunker(
chunk_size=self.config.chunk_size,
chunk_overlap=self.config.chunk_overlap
),
DocumentType.CODE: CodeChunker(
chunk_size=self.config.chunk_size
),
DocumentType.HTML: HTMLChunker(
chunk_size=self.config.chunk_size
)
}
if embedding_model:
self.semantic_chunker = SemanticChunker(
embedding_model=embedding_model,
max_chunk_size=self.config.chunk_size
)
def detect_type(self, text: str, filename: str = None) -> DocumentType:
"""Detect document type."""
if filename:
ext = filename.split('.')[-1].lower()
if ext in ['py', 'js', 'ts', 'java', 'cpp', 'c', 'go', 'rs']:
return DocumentType.CODE
elif ext in ['md', 'markdown']:
return DocumentType.MARKDOWN
elif ext in ['html', 'htm']:
return DocumentType.HTML
elif ext == 'pdf':
return DocumentType.PDF
# Content-based detection
if text.startswith(' list[Chunk]:
"""Chunk document."""
# Detect type if not provided
if doc_type is None:
doc_type = self.detect_type(text, filename)
# Get appropriate chunker
chunker = self.chunkers.get(doc_type, self.chunkers[DocumentType.TEXT])
# Add document metadata
doc_metadata = {
**(metadata or {}),
"doc_type": doc_type.value,
"filename": filename
}
# Chunk
chunks = chunker.chunk(text, doc_metadata)
return chunks
async def chunk_semantic(
self,
text: str,
metadata: dict = None
) -> list[Chunk]:
"""Chunk using semantic boundaries."""
if not self.embedding_model:
raise ValueError("Embedding model required for semantic chunking")
return await self.semantic_chunker.chunk_async(text, metadata)
def chunk_with_context(
self,
text: str,
context_size: int = 100,
metadata: dict = None
) -> list[Chunk]:
"""Chunk with surrounding context."""
# First, chunk normally
chunks = self.chunk(text, metadata=metadata)
# Add context to each chunk
for i, chunk in enumerate(chunks):
# Get context from adjacent chunks
prev_context = ""
next_context = ""
if i > 0:
prev_content = chunks[i-1].content
prev_context = prev_content[-context_size:] if len(prev_content) > context_size else prev_content
if i < len(chunks) - 1:
next_content = chunks[i+1].content
next_context = next_content[:context_size] if len(next_content) > context_size else next_content
chunk.metadata["prev_context"] = prev_context
chunk.metadata["next_context"] = next_context
return chunks
Production Chunking Service
from fastapi import FastAPI, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class ChunkRequest(BaseModel):
text: str
chunk_size: int = 1000
chunk_overlap: int = 200
strategy: str = "recursive"
doc_type: Optional[str] = None
class ChunkResponse(BaseModel):
chunks: list[dict]
total_chunks: int
avg_chunk_size: int
# Initialize pipeline
pipeline = ChunkingPipeline(ChunkingConfig())
@app.post("/v1/chunk")
async def chunk_text(request: ChunkRequest) -> ChunkResponse:
"""Chunk text."""
# Update config
config = ChunkingConfig(
chunk_size=request.chunk_size,
chunk_overlap=request.chunk_overlap,
strategy=request.strategy
)
local_pipeline = ChunkingPipeline(config)
doc_type = None
if request.doc_type:
doc_type = DocumentType(request.doc_type)
chunks = local_pipeline.chunk(request.text, doc_type=doc_type)
return ChunkResponse(
chunks=[
{
"content": c.content,
"metadata": c.metadata,
"chunk_index": c.chunk_index,
"token_count": c.token_count
}
for c in chunks
],
total_chunks=len(chunks),
avg_chunk_size=sum(len(c.content) for c in chunks) // len(chunks) if chunks else 0
)
@app.post("/v1/chunk/file")
async def chunk_file(
file: UploadFile = File(...),
chunk_size: int = 1000,
chunk_overlap: int = 200
):
"""Chunk uploaded file."""
content = await file.read()
text = content.decode('utf-8')
config = ChunkingConfig(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
local_pipeline = ChunkingPipeline(config)
chunks = local_pipeline.chunk(text, filename=file.filename)
return {
"filename": file.filename,
"chunks": [
{
"content": c.content,
"metadata": c.metadata,
"chunk_index": c.chunk_index
}
for c in chunks
],
"total_chunks": len(chunks)
}
@app.get("/v1/strategies")
async def list_strategies():
"""List chunking strategies."""
return {
"strategies": [
{"name": "fixed", "description": "Fixed character size"},
{"name": "recursive", "description": "Recursive splitting on separators"},
{"name": "sentence", "description": "Sentence-aligned chunks"},
{"name": "semantic", "description": "Semantic boundary detection"},
{"name": "markdown", "description": "Markdown structure-aware"},
{"name": "code", "description": "Code-aware chunking"}
]
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- LangChain Text Splitters: https://python.langchain.com/docs/modules/data_connection/document_transformers/
- LlamaIndex Node Parsers: https://docs.llamaindex.ai/en/stable/module_guides/loading/node_parsers/
- Chunking Strategies: https://www.pinecone.io/learn/chunking-strategies/
- Semantic Chunking: https://github.com/FullStackRetrieval-com/RetrievalTutorials
Conclusion
Chunking strategy significantly impacts RAG quality. Start with recursive chunking—it respects document structure while ensuring consistent chunk sizes. For structured documents like markdown or code, use specialized chunkers that preserve logical units. Semantic chunking finds natural topic boundaries but requires embedding computation. Consider your query patterns: if users ask about specific topics, semantic chunking groups related content together. If users ask about specific sections, structure-aware chunking preserves that context. Add overlap between chunks to maintain context across boundaries. Include metadata like headers, section titles, and document structure to help with retrieval and answer generation. Monitor retrieval quality and adjust chunk sizes based on results—too small and you lose context, too large and you include irrelevant content. The key insight is that there's no universal best chunking strategy—the right approach depends on your documents, queries, and how the LLM uses the retrieved context. Experiment with different strategies and measure retrieval quality to find what works best for your use case.
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.