A data pipeline that works 95% of the time is a liability. When it fails silently — dropping rows, letting nulls through, writing a corrupted partition — your model trains on garbage and you find out three weeks later when business metrics crater. Production data pipelines for ML must be idempotent, validated at every boundary, and versioned so you can reproduce any training run from any point in time.
ETL vs ELT
ETL (Extract, Transform, Load) transforms data before loading it into the destination. ELT (Extract, Load, Transform) loads raw data first, then transforms at query time inside the destination system.
When to use ETL: you need to enforce a strict schema at the destination, the raw data contains sensitive fields that must be masked or dropped before storage, or the destination is not powerful enough to run complex transformations (e.g., an OLTP database).
When to use ELT: the destination is a cloud data warehouse (BigQuery, Snowflake, Redshift) with massive query compute, you want to preserve raw data for future feature engineering, or your transformation logic changes frequently and you want to avoid re-running the full extraction.
For ML specifically, ELT is increasingly common: land raw data in a data lake (S3/GCS/ADLS), transform in SQL or Spark inside the warehouse, and materialise feature tables. This preserves optionality — you can always re-derive features from raw events.
python
# ETL: transform before loading (tight schema enforcement)def etl_pipeline(raw_events: list[dict]) -> pd.DataFrame: cleaned = [clean_event(e) for e in raw_events] # transform validated = [e for e in cleaned if is_valid(e)] # filter df = pd.DataFrame(validated) df.to_parquet("s3://data-lake/processed/events/") # load clean return df# ELT: load raw first, transform at query timedef elt_pipeline(raw_events: list[dict]) -> None: df = pd.DataFrame(raw_events) # minimal parsing df["ingested_at"] = pd.Timestamp.utcnow() df.to_parquet("s3://data-lake/raw/events/") # load raw # transformation happens later in SQL/dbt/Spark
Batch vs Micro-batch vs Streaming
Batch processing runs on a schedule (hourly, daily). It is simple to implement, easy to backfill, and cost-efficient for workloads where latency of minutes-to-hours is acceptable. Most ML training pipelines are batch.
Micro-batch (Spark Structured Streaming, Flink in batch mode) processes data in small windows (seconds to minutes). Useful when you need near-real-time features but cannot afford the complexity of true streaming.
Streaming (Kafka + Flink, Kafka Streams) processes each event as it arrives with sub-second latency. Required for fraud detection, real-time recommendation, or any use case where stale features cause business harm. The operational complexity is significantly higher: you must handle out-of-order events, watermarks, state management, and exactly-once semantics.
python
# Batch: process yesterday's datadef batch_job(date: str) -> None: df = read_parquet(f"s3://raw/events/dt={date}/") features = compute_features(df) features.to_parquet(f"s3://features/dt={date}/")# Micro-batch pattern: process in time windowsdef micro_batch_job(window_start: datetime, window_end: datetime) -> None: df = read_events_between(window_start, window_end) features = compute_features(df) features.to_parquet( f"s3://features/ts={window_start.isoformat()}/" )# Streaming: event-by-event (pseudocode for Kafka consumer)def streaming_consumer(consumer: KafkaConsumer) -> None: for message in consumer: event = json.loads(message.value) feature_vector = compute_online_features(event) redis_client.set( f"features:{event['user_id']}", json.dumps(feature_vector), ex=3600 # TTL 1 hour )
Building Idempotent Pipelines
An idempotent pipeline produces the same output when run with the same input, regardless of how many times it runs. This is not optional — pipelines fail and get retried, and non-idempotent pipelines corrupt data on retry.
Three rules for idempotency:
Write to a deterministic output path. Include the date/version/run-id in the output path so each run writes to its own location. Never append to an existing file.
Delete before write. If the output path already exists, delete it before writing. This makes a re-run equivalent to a fresh run.
Use atomic writes. Write to a temporary path, validate, then rename. This prevents partially-written files from being consumed downstream.
python
import shutilfrom pathlib import Pathdef idempotent_write(df: pd.DataFrame, output_path: str) -> None: """Write a DataFrame idempotently using delete-before-write.""" path = Path(output_path) temp_path = path.parent / f"_tmp_{path.name}" # Write to temp first df.to_parquet(temp_path, index=False) # Validate temp output written = pd.read_parquet(temp_path) assert len(written) == len(df), "Row count mismatch after write" # Atomic: delete existing, rename temp to final if path.exists(): shutil.rmtree(path) if path.is_dir() else path.unlink() temp_path.rename(path)def process_partition(date: str, input_path: str, output_path: str) -> None: """Process a date partition idempotently.""" out = Path(output_path) / f"dt={date}" # Idempotent: always recompute from scratch df = pd.read_parquet(f"{input_path}/dt={date}/") features = compute_features(df) idempotent_write(features, str(out)) print(f"Written {len(features)} rows to {out}")
Data Validation with Great Expectations
Great Expectations (GX) is the standard library for data validation in ML pipelines. You define expectations about your data — assertions about schema, nullability, ranges, patterns — and run them as a checkpoint before any transformation or training.
python
import great_expectations as gxfrom great_expectations.core.batch import RuntimeBatchRequest# Initialise a data context (stores config, expectations, results)context = gx.get_context()# Create a datasource for a Pandas DataFramedatasource = context.sources.add_pandas("my_datasource")data_asset = datasource.add_dataframe_asset("events")# Define an expectation suitesuite = context.add_expectation_suite("events.basic_quality")# Common expectations for ML datavalidator = context.get_validator( batch_request=data_asset.build_batch_request(dataframe=df), expectation_suite_name="events.basic_quality",)# Completenessvalidator.expect_column_values_to_not_be_null("user_id")validator.expect_column_values_to_not_be_null("event_timestamp")validator.expect_column_values_to_not_be_null("label")# Range checks (clip outliers or flag corrupt data)validator.expect_column_values_to_be_between( "session_duration_seconds", min_value=0, max_value=86400)validator.expect_column_values_to_be_between( "purchase_amount_usd", min_value=0, max_value=50000)# Format checksvalidator.expect_column_values_to_match_regex( "user_id", r"^usr_[a-f0-9]{16}$")validator.expect_column_values_to_match_regex( "event_timestamp", r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")# Cardinalityvalidator.expect_column_distinct_values_to_be_in_set( "event_type", {"page_view", "add_to_cart", "purchase", "search"})# Save expectations and run checkpointvalidator.save_expectation_suite(discard_failed_expectations=False)# Run a checkpoint (validation + optionally save results)checkpoint = context.add_or_update_checkpoint( name="events_checkpoint", validations=[{ "batch_request": data_asset.build_batch_request(dataframe=df), "expectation_suite_name": "events.basic_quality", }])result = checkpoint.run()if not result["success"]: failed = [ r["expectation_config"]["expectation_type"] for r in result.list_validation_results()[0]["results"] if not r["success"] ] raise ValueError(f"Data validation failed: {failed}")print("Data validation passed.")
Schema Evolution Handling
Schemas change. New features get added, old columns get renamed, types change. A pipeline that breaks on every schema change is fragile.
Additive changes (new columns, new enum values) are backwards compatible — old code ignores new fields. Handle these gracefully by selecting only the columns you need.
Breaking changes (column renamed, type changed, column removed) require explicit migration. Version your schema explicitly and handle both old and new versions during a transition window.
python
from dataclasses import dataclassfrom typing import Optional@dataclassclass EventSchemaV1: user_id: str session_id: str event_type: str timestamp: str@dataclassclass EventSchemaV2: user_id: str session_uuid: str # renamed from session_id event_type: str event_timestamp: int # changed: string → unix epoch client_version: Optional[str] = None # new optional columndef parse_event(raw: dict, schema_version: int) -> dict: """Parse raw event dict into a normalised schema.""" if schema_version == 1: return { "user_id": raw["user_id"], "session_uuid": raw["session_id"], # map old name "event_type": raw["event_type"], "event_timestamp": int( pd.Timestamp(raw["timestamp"]).timestamp() ), "client_version": None, } elif schema_version == 2: return { "user_id": raw["user_id"], "session_uuid": raw["session_uuid"], "event_type": raw["event_type"], "event_timestamp": raw["event_timestamp"], "client_version": raw.get("client_version"), } else: raise ValueError(f"Unknown schema version: {schema_version}")def infer_schema_version(raw: dict) -> int: if "session_uuid" in raw: return 2 return 1
Data Partitioning for Training Data
How you partition training data has significant impact on training speed, reproducibility, and the validity of your evaluation.
By date: the most common partition for temporal data. Always train on data before date X and evaluate on data after date X — never shuffle across time, or you leak future information into training.
By label: useful when one class is rare. Store positive and negative examples in separate partitions so you can easily sample at any ratio.
Stratified: ensure each partition has the same class distribution as the full dataset. Required when downstream consumers need balanced data but the raw data is imbalanced.
python
import pandas as pdfrom pathlib import Pathfrom sklearn.model_selection import StratifiedKFolddef partition_by_date( df: pd.DataFrame, output_dir: str, date_col: str = "event_date") -> None: """Partition training data by date for temporal splits.""" for date, group in df.groupby(date_col): out_path = Path(output_dir) / f"dt={date}" out_path.mkdir(parents=True, exist_ok=True) group.to_parquet(out_path / "data.parquet", index=False)def temporal_train_test_split( df: pd.DataFrame, cutoff_date: str, date_col: str = "event_date") -> tuple[pd.DataFrame, pd.DataFrame]: """Split by time — never shuffle across the boundary.""" train = df[df[date_col] < cutoff_date].copy() test = df[df[date_col] >= cutoff_date].copy() assert len(train) + len(test) == len(df) return train, testdef stratified_partition( df: pd.DataFrame, output_dir: str, label_col: str, n_splits: int = 5) -> None: """Write stratified k-fold partitions for cross-validation.""" skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42) for fold, (train_idx, val_idx) in enumerate( skf.split(df, df[label_col]) ): train_df = df.iloc[train_idx] val_df = df.iloc[val_idx] fold_dir = Path(output_dir) / f"fold={fold}" fold_dir.mkdir(parents=True, exist_ok=True) train_df.to_parquet(fold_dir / "train.parquet", index=False) val_df.to_parquet(fold_dir / "val.parquet", index=False)
DVC for Data Versioning
DVC (Data Version Control) versions large files and datasets alongside your code in Git, without storing the files in Git itself. The actual data lives in remote storage (S3, GCS, Azure Blob); Git tracks a small .dvc metadata file.
bash
# Initialise DVC in an existing Git repodvc initgit add .dvc .dvcignoregit commit -m "init: add DVC"# Configure remote storagedvc remote add -d myremote s3://my-bucket/dvc-storegit add .dvc/configgit commit -m "config: add DVC S3 remote"# Track a datasetdvc add data/raw/events_2025_03.parquetgit add data/raw/events_2025_03.parquet.dvc .gitignoregit commit -m "data: add March 2025 events dataset"# Push data to remotedvc push# On another machine: pull the datagit pulldvc pull # downloads the file from S3# Pin to a specific data version in codedvc get . data/raw/events_2025_03.parquet --rev v1.2.0
python
# Read data lineage programmaticallyimport subprocess, jsondef get_data_lineage(dvc_file: str) -> dict: """Return the DVC metadata for a tracked file.""" result = subprocess.run( ["dvc", "params", "diff", "--show-json"], capture_output=True, text=True ) return json.loads(result.stdout)def reproduce_pipeline(target: str) -> None: """Reproduce a DVC pipeline stage (runs only if inputs changed).""" subprocess.run(["dvc", "repro", target], check=True)def get_stage_dependencies(stage_name: str) -> list[str]: """List all data dependencies of a pipeline stage.""" result = subprocess.run( ["dvc", "dag", "--dot"], capture_output=True, text=True ) # Parse DOT format to extract edges return [ line.split("->")[0].strip().strip('"') for line in result.stdout.splitlines() if "->" in line and stage_name in line ]
Handling Late-Arriving Data
In real-world systems, data does not arrive in event-time order. A mobile app event logged at 14:00 may arrive at your pipeline at 14:45 due to network issues, retries, or the device being offline. Processing pipelines must handle this gracefully.
Grace period: after the nominal window closes, keep the window open for an additional time (the grace period) to accept late arrivals. A typical grace period is 30 minutes to 2 hours for ad-click data, and up to 24 hours for mobile events.
Watermark: the watermark is the estimated time up to which all events have arrived. Events with event_time less than the watermark are considered complete; events beyond the watermark are still accumulating. Flink and Spark Structured Streaming implement watermarks natively.
python
from datetime import datetime, timedeltadef process_with_grace_period( events: list[dict], window_end: datetime, grace_period_hours: int = 2,) -> pd.DataFrame: """ Process events for a window, accepting late arrivals up to grace_period_hours after the window nominally closed. """ cutoff = window_end - timedelta(hours=grace_period_hours) in_window = [ e for e in events if cutoff <= datetime.fromisoformat(e["event_time"]) < window_end ] late_arrivals = [ e for e in events if datetime.fromisoformat(e["event_time"]) < cutoff ] if late_arrivals: print(f"Warning: {len(late_arrivals)} events arrived after grace period") return pd.DataFrame(in_window)def compute_watermark( events: list[dict], allowed_lateness_seconds: int = 3600,) -> datetime: """ Watermark = max(event_time) - allowed_lateness. Events with event_time < watermark are considered complete. """ if not events: return datetime.min max_event_time = max( datetime.fromisoformat(e["event_time"]) for e in events ) return max_event_time - timedelta(seconds=allowed_lateness_seconds)
Prefect-Style Pipeline with Decorators
Prefect (and similar orchestrators like Airflow, Dagster) use decorators to mark functions as tasks and flows. Even without installing an orchestrator, you can structure your pipeline code so it is drop-in compatible.
python
import functoolsimport timeimport loggingfrom typing import Callable, Anylogger = logging.getLogger(__name__)# Minimal task/flow decorator implementation# (compatible with the @task/@flow pattern without requiring Prefect)def task(retries: int = 0, retry_delay_seconds: int = 5): """Decorator that adds retry logic and logging to a pipeline step.""" def decorator(fn: Callable) -> Callable: @functools.wraps(fn) def wrapper(*args, **kwargs) -> Any: for attempt in range(retries + 1): try: start = time.time() logger.info(f"[TASK] {fn.__name__} starting (attempt {attempt+1})") result = fn(*args, **kwargs) elapsed = time.time() - start logger.info(f"[TASK] {fn.__name__} completed in {elapsed:.2f}s") return result except Exception as exc: logger.warning(f"[TASK] {fn.__name__} failed: {exc}") if attempt < retries: time.sleep(retry_delay_seconds) else: raise return wrapper return decoratordef flow(name: str): """Decorator that marks a function as an orchestrated pipeline flow.""" def decorator(fn: Callable) -> Callable: @functools.wraps(fn) def wrapper(*args, **kwargs) -> Any: logger.info(f"[FLOW] {name} started") start = time.time() result = fn(*args, **kwargs) elapsed = time.time() - start logger.info(f"[FLOW] {name} finished in {elapsed:.2f}s") return result return wrapper return decorator
Complete Pipeline: Ingest to Partition
Assembling all the pieces into a single, production-ready pipeline.
Use ELT when your destination has powerful query compute; use ETL when you must enforce schema or mask sensitive fields at landing time.
Choose batch for training pipelines (simplicity, cost), micro-batch for near-real-time features, and streaming only when sub-minute latency is a genuine business requirement.
Idempotency requires deterministic output paths, delete-before-write semantics, and atomic renames — without these, retries corrupt data.
Great Expectations catches data quality regressions before they reach training; run it as a blocking checkpoint, not a logging step.
Handle schema changes explicitly by versioning your schema and maintaining migration logic during transition windows — additive changes are safe, breaking changes require coordination.
Partition training data by time and never shuffle across the temporal boundary; this is the most common source of evaluation leakage in ML pipelines.
DVC gives you reproducible data lineage alongside your Git history — every training run should be traceable to a specific data version.
Build your pipeline with @task/@flow patterns from day one so adoption of a real orchestrator (Prefect, Airflow, Dagster) is a drop-in swap.