Data Pipelines for LLM Training: Building Production ETL Systems

Building production ETL pipelines for LLM training is complex. After building pipelines processing 100TB+ of data, I’ve learned what works. Here’s the complete guide to building production data pipelines for LLM training.

LLM Training Data Pipeline
Figure 1: LLM Training Data Pipeline Architecture

Why Production ETL Matters for LLM Training

LLM training requires massive amounts of clean, processed data:

  • Scale: Training data can be petabytes in size
  • Quality: Data quality directly impacts model performance
  • Diversity: Need diverse, representative datasets
  • Freshness: Keep training data up-to-date
  • Compliance: Handle sensitive data responsibly

After building pipelines for multiple LLM training projects, I’ve learned that proper ETL is the foundation of successful models.

Pipeline Architecture

1. Extract: Data Collection

Extract data from multiple sources:

from typing import List, Dict, Iterator
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime

class DataExtractor:
    def __init__(self):
        self.sources = []
        self.extracted_data = []
    
    def extract_from_api(self, api_url: str, params: Dict = None) -> List[Dict]:
        # Extract data from API
        response = requests.get(api_url, params=params)
        response.raise_for_status()
        return response.json()
    
    def extract_from_files(self, file_paths: List[str]) -> Iterator[Dict]:
        # Extract data from files
        for file_path in file_paths:
            if file_path.endswith('.json'):
                with open(file_path, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    if isinstance(data, list):
                        for item in data:
                            yield item
                    else:
                        yield data
            elif file_path.endswith('.txt'):
                with open(file_path, 'r', encoding='utf-8') as f:
                    yield {"text": f.read(), "source": file_path}
    
    def extract_from_web(self, urls: List[str]) -> List[Dict]:
        # Extract data from web pages
        extracted = []
        for url in urls:
            try:
                response = requests.get(url, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')
                
                # Extract text content
                text = soup.get_text(separator=' ', strip=True)
                extracted.append({
                    "text": text,
                    "url": url,
                    "extracted_at": datetime.now().isoformat()
                })
            except Exception as e:
                print(f"Error extracting from {url}: {e}")
        
        return extracted
    
    def extract_from_database(self, query: str, connection) -> Iterator[Dict]:
        # Extract data from database
        cursor = connection.cursor()
        cursor.execute(query)
        
        columns = [desc[0] for desc in cursor.description]
        for row in cursor.fetchall():
            yield dict(zip(columns, row))

# Usage
extractor = DataExtractor()

# Extract from multiple sources
api_data = extractor.extract_from_api("https://api.example.com/data")
file_data = list(extractor.extract_from_files(["data1.json", "data2.json"]))
web_data = extractor.extract_from_web(["https://example.com/page1"])

2. Transform: Data Processing

Transform raw data into training-ready format:

import re
from typing import List, Dict, Callable
from dataclasses import dataclass

@dataclass
class DataQualityMetrics:
    total_records: int
    valid_records: int
    invalid_records: int
    duplicates: int
    quality_score: float

class DataTransformer:
    def __init__(self):
        self.transformation_pipeline = []
        self.quality_metrics = None
    
    def add_transformation(self, transform_fn: Callable):
        # Add transformation to pipeline
        self.transformation_pipeline.append(transform_fn)
    
    def clean_text(self, text: str) -> str:
        # Clean text data
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Remove special characters (keep alphanumeric and basic punctuation)
        text = re.sub(r'[^\w\s.,!?;:()\-]', '', text)
        
        # Remove URLs
        text = re.sub(r'http\S+|www\.\S+', '', text)
        
        # Remove email addresses
        text = re.sub(r'\S+@\S+', '', text)
        
        return text.strip()
    
    def normalize_text(self, text: str) -> str:
        # Normalize text
        # Convert to lowercase
        text = text.lower()
        
        # Remove extra punctuation
        text = re.sub(r'[!?]{2,}', '!', text)
        text = re.sub(r'[.]{3,}', '...', text)
        
        return text
    
    def deduplicate(self, data: List[Dict], key_field: str = "text") -> List[Dict]:
        # Remove duplicates
        seen = set()
        unique_data = []
        
        for item in data:
            key = item.get(key_field, str(item))
            if key not in seen:
                seen.add(key)
                unique_data.append(item)
        
        return unique_data
    
    def validate_data(self, data: List[Dict]) -> DataQualityMetrics:
        # Validate data quality
        total = len(data)
        valid = 0
        invalid = 0
        duplicates = 0
        
        seen_texts = set()
        
        for item in data:
            text = item.get("text", "")
            
            # Check if valid
            if text and len(text) > 10:  # Minimum length
                if text not in seen_texts:
                    valid += 1
                    seen_texts.add(text)
                else:
                    duplicates += 1
            else:
                invalid += 1
        
        quality_score = valid / total if total > 0 else 0.0
        
        self.quality_metrics = DataQualityMetrics(
            total_records=total,
            valid_records=valid,
            invalid_records=invalid,
            duplicates=duplicates,
            quality_score=quality_score
        )
        
        return self.quality_metrics
    
    def transform(self, data: List[Dict]) -> List[Dict]:
        # Apply transformation pipeline
        transformed = data
        
        # Apply each transformation
        for transform_fn in self.transformation_pipeline:
            transformed = transform_fn(transformed)
        
        # Clean and normalize
        for item in transformed:
            if "text" in item:
                item["text"] = self.clean_text(item["text"])
                item["text"] = self.normalize_text(item["text"])
        
        # Deduplicate
        transformed = self.deduplicate(transformed)
        
        # Validate
        self.validate_data(transformed)
        
        return transformed

# Usage
transformer = DataTransformer()

# Add custom transformations
def remove_short_texts(data: List[Dict]) -> List[Dict]:
    return [item for item in data if len(item.get("text", "")) > 50]

transformer.add_transformation(remove_short_texts)

# Transform data
transformed_data = transformer.transform(raw_data)
print(f"Quality score: {transformer.quality_metrics.quality_score:.2%}")

3. Load: Data Storage

Load processed data into storage:

from typing import Iterator, List, Dict, Callable
import json
import gzip
from pathlib import Path
import re
from dataclasses import dataclass

class DataLoader:
    def __init__(self, output_path: str = "training_data"):
        self.output_path = Path(output_path)
        self.output_path.mkdir(parents=True, exist_ok=True)
    
    def load_to_jsonl(self, data: List[Dict], filename: str):
        # Load data to JSONL format (one JSON object per line)
        file_path = self.output_path / filename
        
        with open(file_path, 'w', encoding='utf-8') as f:
            for item in data:
                json.dump(item, f, ensure_ascii=False)
                f.write('\n')
    
    def load_to_compressed(self, data: List[Dict], filename: str):
        # Load data to compressed JSONL
        file_path = self.output_path / f"{filename}.gz"
        
        with gzip.open(file_path, 'wt', encoding='utf-8') as f:
            for item in data:
                json.dump(item, f, ensure_ascii=False)
                f.write('\n')
    
    def load_to_chunks(self, data: List[Dict], chunk_size: int = 10000, prefix: str = "chunk"):
        # Load data in chunks for large datasets
        for i in range(0, len(data), chunk_size):
            chunk = data[i:i + chunk_size]
            filename = f"{prefix}_{i // chunk_size:05d}.jsonl"
            self.load_to_jsonl(chunk, filename)
    
    def load_to_database(self, data: List[Dict], table_name: str, connection):
        # Load data to database
        cursor = connection.cursor()
        
        for item in data:
            # Insert into database (adjust based on schema)
            cursor.execute(
                f"INSERT INTO {table_name} (text, metadata) VALUES (?, ?)",
                (item.get("text"), json.dumps(item.get("metadata", {})))
            )
        
        connection.commit()

# Usage
loader = DataLoader(output_path="training_data")

# Load to JSONL
loader.load_to_jsonl(transformed_data, "training_data.jsonl")

# Load in chunks for large datasets
loader.load_to_chunks(transformed_data, chunk_size=10000)

Production Pipeline with Airflow

Build production pipelines with Apache Airflow:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'llm_training_data_pipeline',
    default_args=default_args,
    description='ETL pipeline for LLM training data',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2025, 1, 1),
    catchup=False
)

def extract_data(**context):
    # Extract data from sources
    extractor = DataExtractor()
    
    # Extract from multiple sources
    api_data = extractor.extract_from_api("https://api.example.com/data")
    file_data = list(extractor.extract_from_files(["data1.json", "data2.json"]))
    
    # Combine
    all_data = api_data + file_data
    
    # Store in XCom for next task
    context['ti'].xcom_push(key='extracted_data', value=all_data)
    
    return len(all_data)

def transform_data(**context):
    # Transform data
    extracted_data = context['ti'].xcom_pull(key='extracted_data', task_ids='extract')
    
    transformer = DataTransformer()
    transformer.add_transformation(remove_short_texts)
    
    transformed_data = transformer.transform(extracted_data)
    
    # Store in XCom
    context['ti'].xcom_push(key='transformed_data', value=transformed_data)
    context['ti'].xcom_push(key='quality_score', value=transformer.quality_metrics.quality_score)
    
    return len(transformed_data)

def load_data(**context):
    # Load data to storage
    transformed_data = context['ti'].xcom_pull(key='transformed_data', task_ids='transform')
    quality_score = context['ti'].xcom_pull(key='quality_score', task_ids='transform')
    
    # Only load if quality is acceptable
    if quality_score < 0.8:
        raise ValueError(f"Data quality too low: {quality_score:.2%}")
    
    loader = DataLoader(output_path="training_data")
    loader.load_to_chunks(transformed_data, chunk_size=10000)
    
    return f"Loaded {len(transformed_data)} records"

def validate_data(**context):
    # Validate loaded data
    loader = DataLoader(output_path="training_data")
    
    # Check if files exist
    files = list(loader.output_path.glob("chunk_*.jsonl"))
    
    if not files:
        raise ValueError("No data files found")
    
    return f"Validated {len(files)} data files"

# Define tasks
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_data,
    dag=dag
)

validate_task = PythonOperator(
    task_id='validate',
    python_callable=validate_data,
    dag=dag
)

# Define dependencies
extract_task >> transform_task >> load_task >> validate_task
ETL Architecture
Figure 2: ETL Architecture

Incremental Processing

Process only new or changed data:

from datetime import datetime, timedelta
import hashlib

class IncrementalProcessor:
    def __init__(self, state_file: str = "processing_state.json"):
        self.state_file = state_file
        self.processed_hashes = self._load_state()
    
    def _load_state(self) -> set:
        # Load state of processed items
        try:
            with open(self.state_file, 'r') as f:
                state = json.load(f)
                return set(state.get("processed_hashes", []))
        except FileNotFoundError:
            return set()
    
    def _save_state(self):
        # Save processing state
        state = {
            "processed_hashes": list(self.processed_hashes),
            "last_updated": datetime.now().isoformat()
        }
        with open(self.state_file, 'w') as f:
            json.dump(state, f, indent=2)
    
    def _hash_item(self, item: Dict) -> str:
        # Generate hash for item
        text = item.get("text", "")
        return hashlib.md5(text.encode()).hexdigest()
    
    def filter_new_items(self, data: List[Dict]) -> List[Dict]:
        # Filter out already processed items
        new_items = []
        
        for item in data:
            item_hash = self._hash_item(item)
            if item_hash not in self.processed_hashes:
                new_items.append(item)
                self.processed_hashes.add(item_hash)
        
        # Save updated state
        self._save_state()
        
        return new_items
    
    def process_incremental(self, data: List[Dict], transformer: DataTransformer) -> List[Dict]:
        # Process only new items
        new_items = self.filter_new_items(data)
        
        if not new_items:
            return []
        
        # Transform new items
        transformed = transformer.transform(new_items)
        
        return transformed

# Usage
incremental = IncrementalProcessor()
new_data = incremental.filter_new_items(raw_data)

Error Handling and Monitoring

Robust error handling and monitoring:

import logging
from typing import Optional
from datetime import datetime

class PipelineMonitor:
    def __init__(self):
        self.logger = logging.getLogger('pipeline')
        self.metrics = {
            "records_processed": 0,
            "records_failed": 0,
            "processing_time": 0,
            "errors": []
        }
    
    def log_processing(self, stage: str, records: int, duration: float):
        # Log processing metrics
        self.metrics["records_processed"] += records
        self.metrics["processing_time"] += duration
        
        self.logger.info(
            f"Stage {stage}: Processed {records} records in {duration:.2f}s"
        )
    
    def log_error(self, stage: str, error: Exception, context: Dict = None):
        # Log errors
        self.metrics["records_failed"] += 1
        error_info = {
            "stage": stage,
            "error": str(error),
            "timestamp": datetime.now().isoformat(),
            "context": context
        }
        self.metrics["errors"].append(error_info)
        
        self.logger.error(f"Error in {stage}: {error}", exc_info=True)
    
    def get_metrics(self) -> Dict:
        # Get processing metrics
        return {
            **self.metrics,
            "success_rate": (
                self.metrics["records_processed"] / 
                (self.metrics["records_processed"] + self.metrics["records_failed"])
                if (self.metrics["records_processed"] + self.metrics["records_failed"]) > 0
                else 0.0
            )
        }

class RobustETLPipeline:
    def __init__(self):
        self.extractor = DataExtractor()
        self.transformer = DataTransformer()
        self.loader = DataLoader()
        self.monitor = PipelineMonitor()
    
    def run_with_error_handling(self):
        # Run pipeline with comprehensive error handling
        try:
            # Extract
            start_time = datetime.now()
            try:
                raw_data = self.extractor.extract_from_api("https://api.example.com/data")
                duration = (datetime.now() - start_time).total_seconds()
                self.monitor.log_processing("extract", len(raw_data), duration)
            except Exception as e:
                self.monitor.log_error("extract", e)
                raise
            
            # Transform
            start_time = datetime.now()
            try:
                transformed_data = self.transformer.transform(raw_data)
                duration = (datetime.now() - start_time).total_seconds()
                self.monitor.log_processing("transform", len(transformed_data), duration)
            except Exception as e:
                self.monitor.log_error("transform", e, {"input_size": len(raw_data)})
                raise
            
            # Load
            start_time = datetime.now()
            try:
                self.loader.load_to_chunks(transformed_data)
                duration = (datetime.now() - start_time).total_seconds()
                self.monitor.log_processing("load", len(transformed_data), duration)
            except Exception as e:
                self.monitor.log_error("load", e, {"input_size": len(transformed_data)})
                raise
            
            # Return metrics
            return self.monitor.get_metrics()
            
        except Exception as e:
            self.monitor.log_error("pipeline", e)
            raise

# Usage
pipeline = RobustETLPipeline()
metrics = pipeline.run_with_error_handling()
print(f"Success rate: {metrics['success_rate']:.2%}")
Pipeline Stages
Figure 3: Pipeline Stages

Best Practices: Lessons from 100TB+ Processing

From building pipelines processing massive datasets:

  1. Incremental processing: Process only new/changed data. Full reprocessing is expensive.
  2. Data validation: Validate at each stage. Catch issues early.
  3. Error handling: Robust error handling and retries. Failures are inevitable.
  4. Monitoring: Monitor pipeline health and performance. Set up alerts.
  5. Idempotency: Make pipelines idempotent. Safe to rerun.
  6. Data quality: Measure and track data quality. Quality impacts model performance.
  7. Scalability: Design for scale. Process in parallel when possible.
  8. Versioning: Version your data. Track data lineage.
  9. Testing: Test pipelines thoroughly. Use sample data first.
  10. Documentation: Document data sources and transformations. Enables maintenance.
  11. Cost optimization: Optimize for cost. Use appropriate storage and compute.
  12. Compliance: Handle sensitive data responsibly. Meet regulatory requirements.

Common Mistakes and How to Avoid Them

What I learned the hard way:

  • No incremental processing: Process only new data. Full reprocessing wastes resources.
  • Poor error handling: Handle errors at every stage. One failure shouldn’t crash the pipeline.
  • No monitoring: Monitor pipeline health. Set up alerts for failures.
  • Ignoring data quality: Validate data quality. Poor quality data produces poor models.
  • No idempotency: Make pipelines idempotent. Safe reruns prevent data corruption.
  • Not testing: Test pipelines thoroughly. Production failures are costly.
  • No versioning: Version your data. Track what changed and when.
  • Over-engineering: Start simple. Add complexity only when needed.
  • Ignoring costs: Monitor and optimize costs. Large datasets are expensive.
  • No documentation: Document everything. Future you will thank you.

Real-World Example: Processing 100TB Training Dataset

We built a pipeline to process 100TB of training data:

  1. Extract: Collected data from 50+ sources (APIs, databases, files)
  2. Transform: Cleaned, normalized, and validated 2 billion records
  3. Load: Stored in compressed JSONL format (reduced to 30TB)
  4. Incremental: Processed only new data daily (10GB/day)
  5. Quality: Maintained 95%+ data quality score

Key learnings: Incremental processing is essential, data quality matters more than quantity, and proper error handling prevents catastrophic failures.

🎯 Key Takeaway

Production ETL pipelines for LLM training require careful design. Use incremental processing, validate data at each stage, handle errors robustly, and monitor continuously. With proper pipelines, you ensure high-quality training data that produces better models.

Bottom Line

Production ETL pipelines are the foundation of successful LLM training. Use incremental processing, validate data, handle errors gracefully, and monitor continuously. With proper pipelines, you ensure high-quality training data that produces better models. The investment in proper ETL pays off in model performance.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.