Document Chunking Strategies: Optimizing RAG Retrieval Quality

Introduction: RAG systems live or die by their chunking strategy. Chunk too large and you waste context window space with irrelevant content. Chunk too small and you lose semantic coherence, making it hard for the LLM to understand context. The right chunking strategy depends on your document types, query patterns, and retrieval approach. This guide covers practical chunking techniques: fixed-size chunking for simplicity, recursive splitting that respects document structure, semantic chunking that groups related content, and specialized strategies for code, tables, and structured documents. Whether you’re building a knowledge base, document Q&A system, or code search, choosing the right chunking strategy can dramatically improve retrieval quality.

Document Chunking
Document Chunking: Fixed Size, Semantic Boundary, Recursive Split

Fixed-Size Chunking

from dataclasses import dataclass, field
from typing import Any, Optional
from abc import ABC, abstractmethod

@dataclass
class Chunk:
    """A document chunk."""
    
    content: str
    metadata: dict = field(default_factory=dict)
    start_index: int = 0
    end_index: int = 0
    chunk_index: int = 0
    
    @property
    def token_count(self) -> int:
        """Estimate token count."""
        return len(self.content) // 4

class Chunker(ABC):
    """Abstract document chunker."""
    
    @abstractmethod
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split text into chunks."""
        pass

class FixedSizeChunker(Chunker):
    """Fixed-size character chunking."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split into fixed-size chunks."""
        
        chunks = []
        start = 0
        chunk_index = 0
        
        while start < len(text):
            end = start + self.chunk_size
            
            # Don't exceed text length
            if end > len(text):
                end = len(text)
            
            chunk_content = text[start:end]
            
            chunks.append(Chunk(
                content=chunk_content,
                metadata=metadata or {},
                start_index=start,
                end_index=end,
                chunk_index=chunk_index
            ))
            
            # Move start, accounting for overlap
            start = end - self.chunk_overlap
            chunk_index += 1
            
            # Prevent infinite loop
            if start >= len(text):
                break
        
        return chunks

class TokenChunker(Chunker):
    """Token-based chunking."""
    
    def __init__(
        self,
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        tokenizer: Any = None
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.tokenizer = tokenizer
    
    def _tokenize(self, text: str) -> list[str]:
        """Tokenize text."""
        
        if self.tokenizer:
            return self.tokenizer.encode(text)
        # Simple word tokenization
        return text.split()
    
    def _detokenize(self, tokens: list) -> str:
        """Convert tokens back to text."""
        
        if self.tokenizer:
            return self.tokenizer.decode(tokens)
        return " ".join(tokens)
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split into token-based chunks."""
        
        tokens = self._tokenize(text)
        chunks = []
        start = 0
        chunk_index = 0
        
        while start < len(tokens):
            end = min(start + self.chunk_size, len(tokens))
            
            chunk_tokens = tokens[start:end]
            chunk_content = self._detokenize(chunk_tokens)
            
            chunks.append(Chunk(
                content=chunk_content,
                metadata=metadata or {},
                start_index=start,
                end_index=end,
                chunk_index=chunk_index
            ))
            
            start = end - self.chunk_overlap
            chunk_index += 1
            
            if start >= len(tokens):
                break
        
        return chunks

class SentenceChunker(Chunker):
    """Sentence-aware chunking."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        min_chunk_size: int = 100
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split text into sentences."""
        
        import re
        
        # Split on sentence boundaries
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split into sentence-aligned chunks."""
        
        sentences = self._split_sentences(text)
        chunks = []
        current_chunk = []
        current_length = 0
        chunk_index = 0
        
        for sentence in sentences:
            sentence_length = len(sentence)
            
            # If adding this sentence exceeds chunk size
            if current_length + sentence_length > self.chunk_size and current_chunk:
                # Save current chunk
                chunk_content = " ".join(current_chunk)
                chunks.append(Chunk(
                    content=chunk_content,
                    metadata=metadata or {},
                    chunk_index=chunk_index
                ))
                chunk_index += 1
                
                # Start new chunk with overlap
                overlap_sentences = []
                overlap_length = 0
                
                for s in reversed(current_chunk):
                    if overlap_length + len(s) <= self.chunk_overlap:
                        overlap_sentences.insert(0, s)
                        overlap_length += len(s)
                    else:
                        break
                
                current_chunk = overlap_sentences
                current_length = overlap_length
            
            current_chunk.append(sentence)
            current_length += sentence_length
        
        # Don't forget last chunk
        if current_chunk:
            chunk_content = " ".join(current_chunk)
            if len(chunk_content) >= self.min_chunk_size:
                chunks.append(Chunk(
                    content=chunk_content,
                    metadata=metadata or {},
                    chunk_index=chunk_index
                ))
        
        return chunks

Recursive Chunking

from dataclasses import dataclass
from typing import Any, Optional

class RecursiveChunker(Chunker):
    """Recursively split on different separators."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        separators: list[str] = None
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # Default separators in order of preference
        self.separators = separators or [
            "\n\n",  # Paragraphs
            "\n",    # Lines
            ". ",    # Sentences
            ", ",    # Clauses
            " ",     # Words
            ""       # Characters
        ]
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Recursively split text."""
        
        chunks = self._split_recursive(text, self.separators)
        
        # Add metadata and indices
        result = []
        for i, chunk_content in enumerate(chunks):
            result.append(Chunk(
                content=chunk_content,
                metadata=metadata or {},
                chunk_index=i
            ))
        
        return result
    
    def _split_recursive(
        self,
        text: str,
        separators: list[str]
    ) -> list[str]:
        """Recursively split on separators."""
        
        if not text:
            return []
        
        # If text fits in chunk, return it
        if len(text) <= self.chunk_size:
            return [text]
        
        # Try each separator
        for separator in separators:
            if separator == "":
                # Character-level split as last resort
                return self._split_by_size(text)
            
            if separator in text:
                splits = text.split(separator)
                
                # Merge small splits
                chunks = self._merge_splits(splits, separator)
                
                # Recursively process chunks that are still too large
                result = []
                remaining_separators = separators[separators.index(separator) + 1:]
                
                for chunk in chunks:
                    if len(chunk) > self.chunk_size:
                        result.extend(
                            self._split_recursive(chunk, remaining_separators)
                        )
                    else:
                        result.append(chunk)
                
                return result
        
        # No separator found, split by size
        return self._split_by_size(text)
    
    def _merge_splits(
        self,
        splits: list[str],
        separator: str
    ) -> list[str]:
        """Merge small splits into chunks."""
        
        chunks = []
        current = []
        current_length = 0
        
        for split in splits:
            split_length = len(split) + len(separator)
            
            if current_length + split_length > self.chunk_size and current:
                chunks.append(separator.join(current))
                
                # Overlap
                overlap = []
                overlap_length = 0
                for s in reversed(current):
                    if overlap_length + len(s) <= self.chunk_overlap:
                        overlap.insert(0, s)
                        overlap_length += len(s)
                    else:
                        break
                
                current = overlap
                current_length = overlap_length
            
            current.append(split)
            current_length += split_length
        
        if current:
            chunks.append(separator.join(current))
        
        return chunks
    
    def _split_by_size(self, text: str) -> list[str]:
        """Split by character size."""
        
        chunks = []
        start = 0
        
        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            chunks.append(text[start:end])
            start = end - self.chunk_overlap
        
        return chunks

class MarkdownChunker(Chunker):
    """Chunk markdown documents by structure."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        include_headers: bool = True
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.include_headers = include_headers
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split markdown by headers."""
        
        import re
        
        # Split by headers
        header_pattern = r'^(#{1,6})\s+(.+)$'
        lines = text.split('\n')
        
        sections = []
        current_section = {"headers": [], "content": []}
        
        for line in lines:
            header_match = re.match(header_pattern, line)
            
            if header_match:
                # Save current section
                if current_section["content"]:
                    sections.append(current_section)
                
                level = len(header_match.group(1))
                header_text = header_match.group(2)
                
                # Start new section
                current_section = {
                    "headers": current_section["headers"][:level-1] + [header_text],
                    "content": []
                }
            else:
                current_section["content"].append(line)
        
        # Don't forget last section
        if current_section["content"]:
            sections.append(current_section)
        
        # Convert sections to chunks
        chunks = []
        chunk_index = 0
        
        for section in sections:
            content = "\n".join(section["content"]).strip()
            
            if not content:
                continue
            
            # Add header context if enabled
            if self.include_headers and section["headers"]:
                header_context = " > ".join(section["headers"])
                content = f"[{header_context}]\n\n{content}"
            
            # Split large sections
            if len(content) > self.chunk_size:
                sub_chunker = RecursiveChunker(
                    chunk_size=self.chunk_size,
                    chunk_overlap=self.chunk_overlap
                )
                sub_chunks = sub_chunker.chunk(content)
                
                for sub_chunk in sub_chunks:
                    sub_chunk.chunk_index = chunk_index
                    sub_chunk.metadata = {
                        **(metadata or {}),
                        "headers": section["headers"]
                    }
                    chunks.append(sub_chunk)
                    chunk_index += 1
            else:
                chunks.append(Chunk(
                    content=content,
                    metadata={
                        **(metadata or {}),
                        "headers": section["headers"]
                    },
                    chunk_index=chunk_index
                ))
                chunk_index += 1
        
        return chunks

Semantic Chunking

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

class SemanticChunker(Chunker):
    """Chunk based on semantic similarity."""
    
    def __init__(
        self,
        embedding_model: Any,
        similarity_threshold: float = 0.5,
        min_chunk_size: int = 100,
        max_chunk_size: int = 2000
    ):
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
    
    async def chunk_async(
        self,
        text: str,
        metadata: dict = None
    ) -> list[Chunk]:
        """Split based on semantic boundaries."""
        
        # Split into sentences
        sentences = self._split_sentences(text)
        
        if len(sentences) <= 1:
            return [Chunk(content=text, metadata=metadata or {})]
        
        # Get embeddings for all sentences
        embeddings = await self.embedding_model.embed(sentences)
        embeddings = np.array(embeddings)
        
        # Find semantic boundaries
        boundaries = self._find_boundaries(embeddings)
        
        # Create chunks from boundaries
        chunks = []
        chunk_index = 0
        start = 0
        
        for boundary in boundaries:
            chunk_sentences = sentences[start:boundary + 1]
            chunk_content = " ".join(chunk_sentences)
            
            # Respect size limits
            if len(chunk_content) > self.max_chunk_size:
                # Split large chunk
                sub_chunks = self._split_large_chunk(chunk_sentences)
                for sub_content in sub_chunks:
                    chunks.append(Chunk(
                        content=sub_content,
                        metadata=metadata or {},
                        chunk_index=chunk_index
                    ))
                    chunk_index += 1
            elif len(chunk_content) >= self.min_chunk_size:
                chunks.append(Chunk(
                    content=chunk_content,
                    metadata=metadata or {},
                    chunk_index=chunk_index
                ))
                chunk_index += 1
            
            start = boundary + 1
        
        # Handle remaining sentences
        if start < len(sentences):
            remaining = " ".join(sentences[start:])
            if len(remaining) >= self.min_chunk_size:
                chunks.append(Chunk(
                    content=remaining,
                    metadata=metadata or {},
                    chunk_index=chunk_index
                ))
        
        return chunks
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Sync wrapper."""
        import asyncio
        return asyncio.run(self.chunk_async(text, metadata))
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split into sentences."""
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _find_boundaries(self, embeddings: np.ndarray) -> list[int]:
        """Find semantic boundaries using similarity drops."""
        
        boundaries = []
        
        for i in range(len(embeddings) - 1):
            # Cosine similarity between consecutive sentences
            sim = np.dot(embeddings[i], embeddings[i + 1]) / (
                np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i + 1])
            )
            
            # If similarity drops below threshold, mark boundary
            if sim < self.similarity_threshold:
                boundaries.append(i)
        
        return boundaries
    
    def _split_large_chunk(self, sentences: list[str]) -> list[str]:
        """Split large chunk into smaller pieces."""
        
        chunks = []
        current = []
        current_length = 0
        
        for sentence in sentences:
            if current_length + len(sentence) > self.max_chunk_size and current:
                chunks.append(" ".join(current))
                current = []
                current_length = 0
            
            current.append(sentence)
            current_length += len(sentence)
        
        if current:
            chunks.append(" ".join(current))
        
        return chunks

class ClusteringChunker(Chunker):
    """Chunk using clustering on sentence embeddings."""
    
    def __init__(
        self,
        embedding_model: Any,
        num_clusters: int = None,
        min_cluster_size: int = 3
    ):
        self.embedding_model = embedding_model
        self.num_clusters = num_clusters
        self.min_cluster_size = min_cluster_size
    
    async def chunk_async(
        self,
        text: str,
        metadata: dict = None
    ) -> list[Chunk]:
        """Cluster sentences into chunks."""
        
        from sklearn.cluster import KMeans
        
        sentences = self._split_sentences(text)
        
        if len(sentences) <= self.min_cluster_size:
            return [Chunk(content=text, metadata=metadata or {})]
        
        # Get embeddings
        embeddings = await self.embedding_model.embed(sentences)
        embeddings = np.array(embeddings)
        
        # Determine number of clusters
        n_clusters = self.num_clusters or max(2, len(sentences) // 5)
        n_clusters = min(n_clusters, len(sentences))
        
        # Cluster
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings)
        
        # Group sentences by cluster
        clusters = {}
        for i, label in enumerate(labels):
            if label not in clusters:
                clusters[label] = []
            clusters[label].append((i, sentences[i]))
        
        # Create chunks from clusters (maintaining order within cluster)
        chunks = []
        chunk_index = 0
        
        for label in sorted(clusters.keys()):
            cluster_sentences = clusters[label]
            cluster_sentences.sort(key=lambda x: x[0])  # Sort by original order
            
            chunk_content = " ".join(s for _, s in cluster_sentences)
            
            chunks.append(Chunk(
                content=chunk_content,
                metadata={
                    **(metadata or {}),
                    "cluster": int(label)
                },
                chunk_index=chunk_index
            ))
            chunk_index += 1
        
        return chunks
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Sync wrapper."""
        import asyncio
        return asyncio.run(self.chunk_async(text, metadata))
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split into sentences."""
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]

Specialized Chunkers

from dataclasses import dataclass
from typing import Any, Optional
import re

class CodeChunker(Chunker):
    """Chunk code by logical units."""
    
    def __init__(
        self,
        chunk_size: int = 1500,
        language: str = "python"
    ):
        self.chunk_size = chunk_size
        self.language = language
        
        # Language-specific patterns
        self.patterns = {
            "python": {
                "class": r'^class\s+\w+',
                "function": r'^def\s+\w+',
                "async_function": r'^async\s+def\s+\w+'
            },
            "javascript": {
                "class": r'^class\s+\w+',
                "function": r'^function\s+\w+',
                "arrow": r'^const\s+\w+\s*=\s*\([^)]*\)\s*=>'
            },
            "java": {
                "class": r'^(public|private|protected)?\s*class\s+\w+',
                "method": r'^(public|private|protected)?\s*(static)?\s*\w+\s+\w+\s*\('
            }
        }
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split code into logical chunks."""
        
        lines = text.split('\n')
        chunks = []
        current_chunk = []
        current_context = None
        chunk_index = 0
        
        patterns = self.patterns.get(self.language, self.patterns["python"])
        
        for line in lines:
            # Check for new logical unit
            is_boundary = False
            new_context = None
            
            for unit_type, pattern in patterns.items():
                if re.match(pattern, line.strip()):
                    is_boundary = True
                    new_context = f"{unit_type}: {line.strip()[:50]}"
                    break
            
            # Start new chunk at boundary if current is large enough
            if is_boundary and current_chunk:
                chunk_content = '\n'.join(current_chunk)
                
                if len(chunk_content) > 50:  # Min size
                    chunks.append(Chunk(
                        content=chunk_content,
                        metadata={
                            **(metadata or {}),
                            "context": current_context,
                            "language": self.language
                        },
                        chunk_index=chunk_index
                    ))
                    chunk_index += 1
                    current_chunk = []
            
            current_chunk.append(line)
            if new_context:
                current_context = new_context
            
            # Force split if too large
            chunk_content = '\n'.join(current_chunk)
            if len(chunk_content) > self.chunk_size:
                chunks.append(Chunk(
                    content=chunk_content,
                    metadata={
                        **(metadata or {}),
                        "context": current_context,
                        "language": self.language
                    },
                    chunk_index=chunk_index
                ))
                chunk_index += 1
                current_chunk = []
        
        # Don't forget last chunk
        if current_chunk:
            chunk_content = '\n'.join(current_chunk)
            chunks.append(Chunk(
                content=chunk_content,
                metadata={
                    **(metadata or {}),
                    "context": current_context,
                    "language": self.language
                },
                chunk_index=chunk_index
            ))
        
        return chunks

class TableChunker(Chunker):
    """Chunk documents with tables."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        preserve_tables: bool = True
    ):
        self.chunk_size = chunk_size
        self.preserve_tables = preserve_tables
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split while preserving tables."""
        
        # Find tables (markdown format)
        table_pattern = r'(\|[^\n]+\|\n)+'
        
        chunks = []
        chunk_index = 0
        last_end = 0
        
        for match in re.finditer(table_pattern, text):
            # Chunk text before table
            before_text = text[last_end:match.start()].strip()
            if before_text:
                text_chunks = self._chunk_text(before_text, metadata)
                for tc in text_chunks:
                    tc.chunk_index = chunk_index
                    chunks.append(tc)
                    chunk_index += 1
            
            # Handle table
            table_text = match.group(0)
            
            if self.preserve_tables or len(table_text) <= self.chunk_size:
                # Keep table as single chunk
                chunks.append(Chunk(
                    content=table_text,
                    metadata={
                        **(metadata or {}),
                        "type": "table"
                    },
                    chunk_index=chunk_index
                ))
                chunk_index += 1
            else:
                # Split large table by rows
                rows = table_text.strip().split('\n')
                header = rows[0] if rows else ""
                
                current_rows = [header]
                current_length = len(header)
                
                for row in rows[1:]:
                    if current_length + len(row) > self.chunk_size and len(current_rows) > 1:
                        chunks.append(Chunk(
                            content='\n'.join(current_rows),
                            metadata={
                                **(metadata or {}),
                                "type": "table_part"
                            },
                            chunk_index=chunk_index
                        ))
                        chunk_index += 1
                        current_rows = [header]
                        current_length = len(header)
                    
                    current_rows.append(row)
                    current_length += len(row)
                
                if len(current_rows) > 1:
                    chunks.append(Chunk(
                        content='\n'.join(current_rows),
                        metadata={
                            **(metadata or {}),
                            "type": "table_part"
                        },
                        chunk_index=chunk_index
                    ))
                    chunk_index += 1
            
            last_end = match.end()
        
        # Chunk remaining text
        remaining = text[last_end:].strip()
        if remaining:
            text_chunks = self._chunk_text(remaining, metadata)
            for tc in text_chunks:
                tc.chunk_index = chunk_index
                chunks.append(tc)
                chunk_index += 1
        
        return chunks
    
    def _chunk_text(self, text: str, metadata: dict) -> list[Chunk]:
        """Chunk regular text."""
        chunker = RecursiveChunker(chunk_size=self.chunk_size)
        return chunker.chunk(text, metadata)

class HTMLChunker(Chunker):
    """Chunk HTML documents by structure."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        preserve_structure: bool = True
    ):
        self.chunk_size = chunk_size
        self.preserve_structure = preserve_structure
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split HTML by semantic elements."""
        
        from html.parser import HTMLParser
        
        # Simple extraction of text content
        class TextExtractor(HTMLParser):
            def __init__(self):
                super().__init__()
                self.sections = []
                self.current_section = {"tag": None, "content": []}
                self.section_tags = {"h1", "h2", "h3", "h4", "p", "div", "section", "article"}
            
            def handle_starttag(self, tag, attrs):
                if tag in self.section_tags:
                    if self.current_section["content"]:
                        self.sections.append(self.current_section)
                    self.current_section = {"tag": tag, "content": []}
            
            def handle_data(self, data):
                text = data.strip()
                if text:
                    self.current_section["content"].append(text)
            
            def handle_endtag(self, tag):
                pass
        
        extractor = TextExtractor()
        extractor.feed(text)
        
        if extractor.current_section["content"]:
            extractor.sections.append(extractor.current_section)
        
        # Convert sections to chunks
        chunks = []
        chunk_index = 0
        
        for section in extractor.sections:
            content = " ".join(section["content"])
            
            if not content:
                continue
            
            if len(content) > self.chunk_size:
                # Split large sections
                sub_chunker = RecursiveChunker(chunk_size=self.chunk_size)
                sub_chunks = sub_chunker.chunk(content)
                
                for sc in sub_chunks:
                    sc.chunk_index = chunk_index
                    sc.metadata = {
                        **(metadata or {}),
                        "html_tag": section["tag"]
                    }
                    chunks.append(sc)
                    chunk_index += 1
            else:
                chunks.append(Chunk(
                    content=content,
                    metadata={
                        **(metadata or {}),
                        "html_tag": section["tag"]
                    },
                    chunk_index=chunk_index
                ))
                chunk_index += 1
        
        return chunks

Chunking Pipeline

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class DocumentType(Enum):
    """Document types."""
    
    TEXT = "text"
    MARKDOWN = "markdown"
    CODE = "code"
    HTML = "html"
    PDF = "pdf"

@dataclass
class ChunkingConfig:
    """Chunking configuration."""
    
    chunk_size: int = 1000
    chunk_overlap: int = 200
    strategy: str = "recursive"  # fixed, recursive, semantic
    preserve_structure: bool = True

class ChunkingPipeline:
    """Complete chunking pipeline."""
    
    def __init__(
        self,
        config: ChunkingConfig = None,
        embedding_model: Any = None
    ):
        self.config = config or ChunkingConfig()
        self.embedding_model = embedding_model
        
        # Initialize chunkers
        self.chunkers = {
            DocumentType.TEXT: RecursiveChunker(
                chunk_size=self.config.chunk_size,
                chunk_overlap=self.config.chunk_overlap
            ),
            DocumentType.MARKDOWN: MarkdownChunker(
                chunk_size=self.config.chunk_size,
                chunk_overlap=self.config.chunk_overlap
            ),
            DocumentType.CODE: CodeChunker(
                chunk_size=self.config.chunk_size
            ),
            DocumentType.HTML: HTMLChunker(
                chunk_size=self.config.chunk_size
            )
        }
        
        if embedding_model:
            self.semantic_chunker = SemanticChunker(
                embedding_model=embedding_model,
                max_chunk_size=self.config.chunk_size
            )
    
    def detect_type(self, text: str, filename: str = None) -> DocumentType:
        """Detect document type."""
        
        if filename:
            ext = filename.split('.')[-1].lower()
            
            if ext in ['py', 'js', 'ts', 'java', 'cpp', 'c', 'go', 'rs']:
                return DocumentType.CODE
            elif ext in ['md', 'markdown']:
                return DocumentType.MARKDOWN
            elif ext in ['html', 'htm']:
                return DocumentType.HTML
            elif ext == 'pdf':
                return DocumentType.PDF
        
        # Content-based detection
        if text.startswith(' list[Chunk]:
        """Chunk document."""
        
        # Detect type if not provided
        if doc_type is None:
            doc_type = self.detect_type(text, filename)
        
        # Get appropriate chunker
        chunker = self.chunkers.get(doc_type, self.chunkers[DocumentType.TEXT])
        
        # Add document metadata
        doc_metadata = {
            **(metadata or {}),
            "doc_type": doc_type.value,
            "filename": filename
        }
        
        # Chunk
        chunks = chunker.chunk(text, doc_metadata)
        
        return chunks
    
    async def chunk_semantic(
        self,
        text: str,
        metadata: dict = None
    ) -> list[Chunk]:
        """Chunk using semantic boundaries."""
        
        if not self.embedding_model:
            raise ValueError("Embedding model required for semantic chunking")
        
        return await self.semantic_chunker.chunk_async(text, metadata)
    
    def chunk_with_context(
        self,
        text: str,
        context_size: int = 100,
        metadata: dict = None
    ) -> list[Chunk]:
        """Chunk with surrounding context."""
        
        # First, chunk normally
        chunks = self.chunk(text, metadata=metadata)
        
        # Add context to each chunk
        for i, chunk in enumerate(chunks):
            # Get context from adjacent chunks
            prev_context = ""
            next_context = ""
            
            if i > 0:
                prev_content = chunks[i-1].content
                prev_context = prev_content[-context_size:] if len(prev_content) > context_size else prev_content
            
            if i < len(chunks) - 1:
                next_content = chunks[i+1].content
                next_context = next_content[:context_size] if len(next_content) > context_size else next_content
            
            chunk.metadata["prev_context"] = prev_context
            chunk.metadata["next_context"] = next_context
        
        return chunks

Production Chunking Service

from fastapi import FastAPI, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class ChunkRequest(BaseModel):
    text: str
    chunk_size: int = 1000
    chunk_overlap: int = 200
    strategy: str = "recursive"
    doc_type: Optional[str] = None

class ChunkResponse(BaseModel):
    chunks: list[dict]
    total_chunks: int
    avg_chunk_size: int

# Initialize pipeline
pipeline = ChunkingPipeline(ChunkingConfig())

@app.post("/v1/chunk")
async def chunk_text(request: ChunkRequest) -> ChunkResponse:
    """Chunk text."""
    
    # Update config
    config = ChunkingConfig(
        chunk_size=request.chunk_size,
        chunk_overlap=request.chunk_overlap,
        strategy=request.strategy
    )
    
    local_pipeline = ChunkingPipeline(config)
    
    doc_type = None
    if request.doc_type:
        doc_type = DocumentType(request.doc_type)
    
    chunks = local_pipeline.chunk(request.text, doc_type=doc_type)
    
    return ChunkResponse(
        chunks=[
            {
                "content": c.content,
                "metadata": c.metadata,
                "chunk_index": c.chunk_index,
                "token_count": c.token_count
            }
            for c in chunks
        ],
        total_chunks=len(chunks),
        avg_chunk_size=sum(len(c.content) for c in chunks) // len(chunks) if chunks else 0
    )

@app.post("/v1/chunk/file")
async def chunk_file(
    file: UploadFile = File(...),
    chunk_size: int = 1000,
    chunk_overlap: int = 200
):
    """Chunk uploaded file."""
    
    content = await file.read()
    text = content.decode('utf-8')
    
    config = ChunkingConfig(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    
    local_pipeline = ChunkingPipeline(config)
    chunks = local_pipeline.chunk(text, filename=file.filename)
    
    return {
        "filename": file.filename,
        "chunks": [
            {
                "content": c.content,
                "metadata": c.metadata,
                "chunk_index": c.chunk_index
            }
            for c in chunks
        ],
        "total_chunks": len(chunks)
    }

@app.get("/v1/strategies")
async def list_strategies():
    """List chunking strategies."""
    
    return {
        "strategies": [
            {"name": "fixed", "description": "Fixed character size"},
            {"name": "recursive", "description": "Recursive splitting on separators"},
            {"name": "sentence", "description": "Sentence-aligned chunks"},
            {"name": "semantic", "description": "Semantic boundary detection"},
            {"name": "markdown", "description": "Markdown structure-aware"},
            {"name": "code", "description": "Code-aware chunking"}
        ]
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Chunking strategy significantly impacts RAG quality. Start with recursive chunking—it respects document structure while ensuring consistent chunk sizes. For structured documents like markdown or code, use specialized chunkers that preserve logical units. Semantic chunking finds natural topic boundaries but requires embedding computation. Consider your query patterns: if users ask about specific topics, semantic chunking groups related content together. If users ask about specific sections, structure-aware chunking preserves that context. Add overlap between chunks to maintain context across boundaries. Include metadata like headers, section titles, and document structure to help with retrieval and answer generation. Monitor retrieval quality and adjust chunk sizes based on results—too small and you lose context, too large and you include irrelevant content. The key insight is that there's no universal best chunking strategy—the right approach depends on your documents, queries, and how the LLM uses the retrieved context. Experiment with different strategies and measure retrieval quality to find what works best for your use case.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.