GadaaLabs
Machine Learning Engineering — Production ML Systems
Lesson 9

Production Monitoring, Data Drift & Model Decay

26 min

A deployed model is not a finished product — it is a system that continuously interacts with a changing world. The data distribution that produced your training set shifts as user behaviour evolves, upstream pipelines change their schemas, and the real-world phenomenon you modelled changes. Without monitoring, your model silently decays: accuracy falls, business decisions worsen, and nobody notices until a major incident.

What Can Fail in Production

Understanding the failure taxonomy determines what to monitor.

Data drift (covariate shift): P(X) changes. The input feature distribution shifts away from what the model was trained on. Example: an economic shock causes monthly_spend to drop sharply for all users — the model was never trained on this regime.

Concept drift: P(y|X) changes. The relationship between features and the target changes. Example: a competitor launches a competing product, so high-usage customers who previously stayed now churn — the same feature values now imply different outcomes.

Label drift: P(y) changes. The base rate of the target shifts. Example: a marketing campaign successfully retains customers, dropping churn from 25% to 10% — a calibrated model now over-predicts churn.

Upstream pipeline breakage: a schema change in an upstream data source introduces nulls, wrong units, or encoded values the preprocessing has never seen. Often the most common failure in practice.


Monitoring Layers

| Layer | What to track | When to alert | |-------|--------------|---------------| | Data quality | null rate, schema violations, out-of-range values | null rate increases by > 2x; new unknown categories | | Feature distribution | mean, std, histogram per feature | PSI > 0.2; KS p-value < 0.05 | | Prediction distribution | score histogram, mean prediction | PSI > 0.1 on score distribution | | Model performance | accuracy, AUC, F1 | when labels arrive — often days/weeks delayed | | System health | latency p99, error rate, throughput | error rate > 1%; p99 > SLA |


Statistical Drift Tests

KS Test (Kolmogorov-Smirnov)

Tests whether two samples come from the same continuous distribution. The KS statistic is the maximum absolute difference between the two CDFs.

python
import numpy as np
from scipy import stats

rng = np.random.default_rng(42)

# Reference distribution (from training)
ref_spend = rng.normal(120, 40, 2000)

# Production distribution — has drifted
prod_spend_no_drift   = rng.normal(118, 41, 500)   # slight natural variation
prod_spend_drifted    = rng.normal(85,  60, 500)   # mean dropped, std increased

ks_stat_ok,      p_ok     = stats.ks_2samp(ref_spend, prod_spend_no_drift)
ks_stat_drifted, p_drift  = stats.ks_2samp(ref_spend, prod_spend_drifted)

print(f"No drift  — KS={ks_stat_ok:.3f},      p={p_ok:.4f}")
print(f"Drifted   — KS={ks_stat_drifted:.3f}, p={p_drift:.6f}")
# Drifted will show p << 0.05 → reject null that distributions are equal

Chi-Square Test for Categorical Features

python
from scipy.stats import chi2_contingency
import numpy as np

# Reference category counts
ref_counts  = np.array([800, 350, 150])   # monthly, annual, multi-year
prod_counts = np.array([650, 280, 70])    # production — monthly share increased

# Chi-square requires observed vs expected counts (not proportions)
# Scale ref to match prod total
prod_total = prod_counts.sum()
expected   = ref_counts / ref_counts.sum() * prod_total

chi2, p_val, dof, _ = chi2_contingency(
    np.array([prod_counts, expected.astype(int)])
)
print(f"Chi-square={chi2:.2f}, p={p_val:.4f}, dof={dof}")

Population Stability Index (PSI)

PSI is the standard drift metric in credit risk and finance. It quantifies how much a distribution has shifted relative to a reference.

PSI = Σ (p_actual_i - p_expected_i) × ln(p_actual_i / p_expected_i)

Interpretation: PSI < 0.1 = no significant shift; 0.1 – 0.2 = moderate shift, investigate; > 0.2 = significant shift, retrain likely needed.

python
import numpy as np

def compute_psi(
    reference: np.ndarray,
    production: np.ndarray,
    n_bins: int = 10,
    epsilon: float = 1e-8,
) -> float:
    """
    Compute PSI for a continuous feature.
    Bins are defined by reference quantiles (distribution-aware binning).
    """
    # Define bin edges from reference quantiles
    bin_edges = np.percentile(reference, np.linspace(0, 100, n_bins + 1))
    bin_edges[0]  -= 1e-6   # include minimum
    bin_edges[-1] += 1e-6   # include maximum

    ref_counts  = np.histogram(reference,  bins=bin_edges)[0]
    prod_counts = np.histogram(production, bins=bin_edges)[0]

    # Convert to proportions
    ref_pct  = ref_counts  / ref_counts.sum()
    prod_pct = prod_counts / prod_counts.sum()

    # Clip to avoid log(0)
    ref_pct  = np.clip(ref_pct,  epsilon, None)
    prod_pct = np.clip(prod_pct, epsilon, None)

    psi = np.sum((prod_pct - ref_pct) * np.log(prod_pct / ref_pct))
    return float(psi)

# No drift
psi_ok = compute_psi(ref_spend, prod_spend_no_drift)
print(f"PSI (no drift):  {psi_ok:.4f}")   # expect < 0.10

# Significant drift
psi_drift = compute_psi(ref_spend, prod_spend_drifted)
print(f"PSI (drifted):   {psi_drift:.4f}")  # expect > 0.20

Jensen-Shannon Divergence

JSD is symmetric and bounded [0, 1], making it easier to interpret than KL divergence.

python
from scipy.spatial.distance import jensenshannon
import numpy as np

def compute_jsd(ref: np.ndarray, prod: np.ndarray, n_bins: int = 20) -> float:
    """JSD between two continuous distributions (histogram-approximated)."""
    combined = np.concatenate([ref, prod])
    bins = np.histogram_bin_edges(combined, bins=n_bins)

    p = np.histogram(ref,  bins=bins)[0] + 1  # +1 Laplace smoothing
    q = np.histogram(prod, bins=bins)[0] + 1
    p = p / p.sum()
    q = q / q.sum()

    return float(jensenshannon(p, q))

print(f"JSD (no drift):  {compute_jsd(ref_spend, prod_spend_no_drift):.4f}")
print(f"JSD (drifted):   {compute_jsd(ref_spend, prod_spend_drifted):.4f}")

DriftMonitor Class

python
import sqlite3
import json
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime, date
from typing import Optional

@dataclass
class FeatureDriftReport:
    feature:    str
    date:       str
    psi:        float
    ks_stat:    float
    ks_p_value: float
    mean_ref:   float
    mean_prod:  float
    std_ref:    float
    std_prod:   float
    null_pct:   float
    alert:      bool

    @property
    def drift_level(self) -> str:
        if self.psi > 0.20:
            return "HIGH"
        elif self.psi > 0.10:
            return "MODERATE"
        return "NONE"

class DriftMonitor:
    """
    Stores reference distributions from training data and computes daily drift
    metrics on production data. All statistics are persisted in SQLite.
    """

    def __init__(self, db_path: str = "drift_monitor.db"):
        self._db_path   = db_path
        self._reference : dict[str, np.ndarray] = {}
        self._dtypes    : dict[str, str] = {}
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self._db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS drift_metrics (
                    id          INTEGER PRIMARY KEY AUTOINCREMENT,
                    date        TEXT NOT NULL,
                    feature     TEXT NOT NULL,
                    psi         REAL,
                    ks_stat     REAL,
                    ks_p_value  REAL,
                    mean_ref    REAL,
                    mean_prod   REAL,
                    std_ref     REAL,
                    std_prod    REAL,
                    null_pct    REAL,
                    alert       INTEGER,
                    UNIQUE(date, feature)
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS prediction_metrics (
                    id       INTEGER PRIMARY KEY AUTOINCREMENT,
                    date     TEXT NOT NULL,
                    mean     REAL,
                    std      REAL,
                    p10      REAL,
                    p50      REAL,
                    p90      REAL,
                    psi      REAL,
                    UNIQUE(date)
                )
            """)
            conn.commit()

    def set_reference(self, feature: str, values: np.ndarray, dtype: str = "numeric"):
        """Call once with the full training set distribution for each feature."""
        self._reference[feature] = values[~np.isnan(values)]
        self._dtypes[feature]    = dtype

    def compute_feature_drift(
        self,
        feature: str,
        production_values: np.ndarray,
        today: Optional[str] = None,
    ) -> FeatureDriftReport:
        if feature not in self._reference:
            raise ValueError(f"No reference distribution set for '{feature}'")

        today = today or date.today().isoformat()
        ref   = self._reference[feature]
        null_pct = float(np.isnan(production_values).mean())
        prod = production_values[~np.isnan(production_values)]

        psi             = compute_psi(ref, prod)
        ks_stat, ks_p   = stats.ks_2samp(ref, prod)

        report = FeatureDriftReport(
            feature    = feature,
            date       = today,
            psi        = round(psi, 5),
            ks_stat    = round(float(ks_stat), 5),
            ks_p_value = round(float(ks_p),   6),
            mean_ref   = round(float(ref.mean()),  4),
            mean_prod  = round(float(prod.mean()), 4),
            std_ref    = round(float(ref.std()),   4),
            std_prod   = round(float(prod.std()),  4),
            null_pct   = round(null_pct, 4),
            alert      = psi > 0.2 or null_pct > 0.10,
        )

        with sqlite3.connect(self._db_path) as conn:
            conn.execute("""
                INSERT OR REPLACE INTO drift_metrics
                    (date, feature, psi, ks_stat, ks_p_value,
                     mean_ref, mean_prod, std_ref, std_prod, null_pct, alert)
                VALUES (?,?,?,?,?,?,?,?,?,?,?)
            """, (
                report.date, report.feature, report.psi, report.ks_stat,
                report.ks_p_value, report.mean_ref, report.mean_prod,
                report.std_ref, report.std_prod, report.null_pct, int(report.alert),
            ))
            conn.commit()

        return report

    def run_daily_check(
        self,
        production_data: dict[str, np.ndarray],
        today: Optional[str] = None,
    ) -> list[FeatureDriftReport]:
        reports = []
        for feature, values in production_data.items():
            if feature in self._reference:
                reports.append(self.compute_feature_drift(feature, values, today))
        return reports

    def get_alerts(self, today: Optional[str] = None) -> list[dict]:
        today = today or date.today().isoformat()
        with sqlite3.connect(self._db_path) as conn:
            rows = conn.execute(
                "SELECT feature, psi, null_pct FROM drift_metrics WHERE date=? AND alert=1",
                (today,)
            ).fetchall()
        return [{"feature": r[0], "psi": r[1], "null_pct": r[2]} for r in rows]

Prediction Distribution Monitoring

python
def monitor_predictions(
    monitor: DriftMonitor,
    ref_scores: np.ndarray,
    prod_scores: np.ndarray,
    today: Optional[str] = None,
) -> dict:
    """Monitor the model's output score distribution for drift."""
    today = today or date.today().isoformat()
    psi   = compute_psi(ref_scores, prod_scores)

    stats_dict = {
        "date":  today,
        "mean":  round(float(prod_scores.mean()), 4),
        "std":   round(float(prod_scores.std()),  4),
        "p10":   round(float(np.percentile(prod_scores, 10)), 4),
        "p50":   round(float(np.percentile(prod_scores, 50)), 4),
        "p90":   round(float(np.percentile(prod_scores, 90)), 4),
        "psi":   round(psi, 5),
        "alert": psi > 0.10,
    }

    with sqlite3.connect(monitor._db_path) as conn:
        conn.execute("""
            INSERT OR REPLACE INTO prediction_metrics
                (date, mean, std, p10, p50, p90, psi)
            VALUES (:date,:mean,:std,:p10,:p50,:p90,:psi)
        """, stats_dict)
        conn.commit()

    return stats_dict

Evidently AI Integration

python
# pip install evidently
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
import pandas as pd

def run_evidently_report(
    reference_df: pd.DataFrame,
    production_df: pd.DataFrame,
    target_col: str = "churned",
    output_path: str = "evidently_report.html",
):
    """
    Generate an Evidently HTML report with data drift and classification metrics.
    Call this weekly or whenever a significant drift alert fires.
    """
    report = Report(metrics=[
        DataDriftPreset(drift_share_threshold=0.3),   # alert if > 30% of features drift
        ClassificationPreset(),                        # requires 'target' and 'prediction' cols
    ])
    # Evidently requires columns named 'target' and 'prediction'
    ref_ev  = reference_df.rename(columns={target_col: "target"})
    prod_ev = production_df.rename(columns={target_col: "target"})

    report.run(reference_data=ref_ev, current_data=prod_ev)
    report.save_html(output_path)
    print(f"Evidently report saved to {output_path}")
    return report

Alerting Logic

python
import smtplib
from email.message import EmailMessage
from typing import Callable

def build_alert_message(alerts: list[dict], report_date: str) -> str:
    lines = [f"Drift Alert — {report_date}", "=" * 40]
    for a in alerts:
        lines.append(
            f"  FEATURE: {a['feature']:30s} | PSI={a['psi']:.3f} | null%={a.get('null_pct', 0):.1%}"
        )
    lines.append("\nRecommended actions:")
    lines.append("  1. Inspect upstream data pipeline for schema changes")
    lines.append("  2. Compare feature histograms in the monitoring dashboard")
    lines.append("  3. If PSI > 0.2 on 2+ features, trigger retraining pipeline")
    return "\n".join(lines)

def alert_on_drift(
    alerts: list[dict],
    report_date: str,
    notify_fn: Callable[[str], None],
    psi_threshold: float = 0.20,
) -> bool:
    """Returns True if any alert exceeds the threshold and notification was sent."""
    critical = [a for a in alerts if a["psi"] > psi_threshold]
    if critical:
        message = build_alert_message(critical, report_date)
        notify_fn(message)
        return True
    return False

# Example usage:
# alert_on_drift(monitor.get_alerts(), "2026-03-29", lambda msg: print(msg))

Key Takeaways

  • Monitor all four failure modes separately: data drift (P(X) shifts), concept drift (P(y|X) shifts), label drift (P(y) shifts), and upstream pipeline breakage — they have different root causes and different remediation paths.
  • PSI is the industry-standard drift metric: < 0.1 is stable, 0.1-0.2 warrants investigation, > 0.2 likely requires retraining. Use reference-quantile bins to make it distribution-aware.
  • KS test detects differences in the full distribution shape; PSI quantifies how much the population has shifted. Use both — KS for statistical significance, PSI for magnitude.
  • Monitor prediction score distributions in addition to input features: a stable input distribution with a drifted score distribution signals concept drift or a silent model bug.
  • Performance monitoring with delayed labels requires storing prediction IDs and joining them to outcome events when labels arrive — design this pipeline at deployment time, not after an incident.
  • Store all monitoring statistics in a time-series database (SQLite is adequate at small scale) to enable trend analysis and early detection of gradual drift.
  • Evidently AI and similar libraries accelerate dashboard generation but add a dependency — always implement the core KS/PSI logic yourself so you understand what the tools are computing.
  • Set your retraining trigger at PSI > 0.2 on two or more features, or PSI > 0.1 on the prediction score distribution — this gives you an objective, automatable retraining criterion.