GadaaLabs
Machine Learning Engineering — Production ML Systems
Lesson 11

A/B Testing & Shadow Deployment for ML Models

22 min

Offline evaluation metrics — AUC, PR, calibration — tell you how a model performs on historical data. They cannot tell you whether the model improves the business metric you actually care about. The only way to know that is to run a controlled experiment in production. But ML A/B testing has failure modes that traditional software A/B testing does not, and most teams discover them after they've run a flawed experiment.

Why ML A/B Testing Is Hard

Position and novelty bias: users may respond to any change (novelty effect) for the first few days; the initial metric lift decays. Always run experiments long enough to see past the novelty window.

Slow feedback loops: for churn prediction, the label (did they churn?) arrives 30+ days after the prediction. You cannot evaluate in real time. This forces either proxy metrics (engagement, feature usage) or very long experiments.

Carryover effects: if the same user is exposed to both models across sessions (e.g. due to inconsistent assignment), their behaviour is contaminated. User-level assignment is mandatory.

Selection bias: routing traffic based on request properties (e.g. high-value customers get new model) is not randomisation. It confounds treatment with the confound variable.


Experiment Design

Randomisation Unit

Always randomise at the user level for consistency. If you randomise at the session or request level, the same user will see different model outputs across sessions, causing confounded behaviour and violation of the stable unit treatment value assumption (SUTVA).

python
import hashlib

def assign_variant(user_id: str, experiment_name: str, control_pct: float = 0.5) -> str:
    """
    Deterministic, consistent assignment: the same user_id always maps
    to the same variant for the same experiment. Uses a hash to avoid
    a database lookup on every request.
    """
    seed    = f"{experiment_name}:{user_id}"
    digest  = int(hashlib.md5(seed.encode()).hexdigest(), 16)
    bucket  = (digest % 10000) / 10000.0   # [0, 1) in 10000 buckets

    return "control" if bucket < control_pct else "treatment"

# Consistent: same user always in same bucket
assert assign_variant("user_123", "churn_model_v2") == \
       assign_variant("user_123", "churn_model_v2")

Stratification

Stratify assignment by key variables (plan tier, tenure cohort) to ensure the treatment and control groups are balanced on important confounds.

python
import pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedShuffleSplit

def stratified_assignment(
    user_df: pd.DataFrame,
    stratify_cols: list[str],
    treatment_pct: float = 0.5,
    random_state:  int   = 42,
) -> pd.DataFrame:
    """Assign users to treatment/control with stratification."""
    label_col = "__strat_label__"
    user_df[label_col] = (
        user_df[stratify_cols]
        .astype(str)
        .agg("_".join, axis=1)
    )
    sss = StratifiedShuffleSplit(
        n_splits=1, test_size=treatment_pct, random_state=random_state
    )
    control_idx, treatment_idx = next(sss.split(user_df, user_df[label_col]))
    user_df["variant"] = "control"
    user_df.iloc[treatment_idx, user_df.columns.get_loc("variant")] = "treatment"
    return user_df.drop(columns=[label_col])

Sample Size Calculator

python
from statsmodels.stats.power import TTestIndPower
import numpy as np

def calculate_sample_size(
    baseline_rate:   float,
    minimum_detectable_effect: float,  # MDE in absolute terms
    alpha:           float = 0.05,
    power:           float = 0.80,
    two_tailed:      bool  = True,
) -> dict:
    """
    Calculate the required sample size per arm using a two-sample t-test.
    For proportions, the standard deviation is sqrt(p*(1-p)).
    """
    std_pooled = np.sqrt(
        2 * baseline_rate * (1 - baseline_rate)
    )
    effect_size = minimum_detectable_effect / std_pooled

    analysis  = TTestIndPower()
    n_per_arm = analysis.solve_power(
        effect_size = effect_size,
        alpha       = alpha / (2 if two_tailed else 1),
        power       = power,
        ratio       = 1.0,
    )

    return {
        "n_per_arm":      int(np.ceil(n_per_arm)),
        "n_total":        int(np.ceil(n_per_arm)) * 2,
        "effect_size":    round(effect_size, 4),
        "baseline_rate":  baseline_rate,
        "mde_absolute":   minimum_detectable_effect,
        "alpha":          alpha,
        "power":          power,
    }

# Example: churn model, baseline churn rate 25%, MDE = 2pp absolute
result = calculate_sample_size(
    baseline_rate=0.25,
    minimum_detectable_effect=0.02,
    alpha=0.05,
    power=0.80,
)
print(f"Required sample size: {result['n_per_arm']:,} per arm ({result['n_total']:,} total)")

Shadow Deployment

Shadow mode runs both models simultaneously but only serves the champion's predictions to users. The challenger's predictions are logged to a shadow table for offline comparison. Zero production risk.

python
# src/shadow_server.py
import asyncio
import pandas as pd
import sqlite3
from datetime import datetime
from contextlib import asynccontextmanager
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import os

CHAMPION_PATH   = os.getenv("CHAMPION_PATH",   "models/champion.joblib")
CHALLENGER_PATH = os.getenv("CHALLENGER_PATH", "models/challenger.joblib")
SHADOW_DB       = os.getenv("SHADOW_DB",       "shadow_predictions.db")

_state = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    _state["champion"]   = joblib.load(CHAMPION_PATH)
    _state["challenger"] = joblib.load(CHALLENGER_PATH)
    with sqlite3.connect(SHADOW_DB) as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS shadow_predictions (
                id               INTEGER PRIMARY KEY AUTOINCREMENT,
                ts               TEXT NOT NULL,
                user_id          TEXT,
                champion_score   REAL NOT NULL,
                challenger_score REAL NOT NULL
            )
        """)
        conn.commit()
    yield
    _state.clear()

app = FastAPI(title="Shadow Deployment Server", lifespan=lifespan)

class PredictRequest(BaseModel):
    user_id:          str
    tenure_months:    int
    monthly_spend:    float
    support_tickets:  int
    feature_usage_pct:float
    last_login_days_ago: int
    num_integrations: int
    contract_type:    str
    plan_tier:        str
    is_enterprise:    int

@app.post("/predict")
async def predict(req: PredictRequest):
    row = pd.DataFrame([req.model_dump(exclude={"user_id"})])

    # Champion inference (served to user)
    champion_prob = float(_state["champion"].predict_proba(row)[0, 1])

    # Challenger inference in background (logged only, not served)
    asyncio.create_task(_log_challenger(req, row, champion_prob))

    return {
        "churn_probability": round(champion_prob, 4),
        "churn_predicted":   champion_prob >= 0.4,
    }

async def _log_challenger(req: PredictRequest, row: pd.DataFrame, champion_prob: float):
    """Run challenger inference and persist both scores for offline comparison."""
    try:
        challenger_prob = float(
            await asyncio.get_event_loop().run_in_executor(
                None, lambda: _state["challenger"].predict_proba(row)[0, 1]
            )
        )
        with sqlite3.connect(SHADOW_DB) as conn:
            conn.execute(
                "INSERT INTO shadow_predictions (ts, user_id, champion_score, challenger_score) VALUES (?,?,?,?)",
                (datetime.utcnow().isoformat(), req.user_id, champion_prob, challenger_prob),
            )
            conn.commit()
    except Exception as exc:
        print(f"[shadow] Challenger inference failed: {exc}")

Daily Shadow Comparison

python
import sqlite3
import pandas as pd
from scipy import stats

def compare_shadow_predictions(db_path: str = "shadow_predictions.db") -> dict:
    """Compare champion vs challenger score distributions from shadow logs."""
    df = pd.read_sql("SELECT * FROM shadow_predictions", sqlite3.connect(db_path))

    ks_stat, ks_p = stats.ks_2samp(df["champion_score"], df["challenger_score"])
    correlation   = df["champion_score"].corr(df["challenger_score"])

    analysis = {
        "n_predictions":          len(df),
        "champion_mean":          df["champion_score"].mean(),
        "challenger_mean":        df["challenger_score"].mean(),
        "mean_diff":              df["challenger_score"].mean() - df["champion_score"].mean(),
        "ks_stat":                ks_stat,
        "ks_p_value":             ks_p,
        "score_correlation":      correlation,
        "distributions_differ":   ks_p < 0.05,
    }
    print(f"Shadow analysis ({len(df):,} predictions):")
    for k, v in analysis.items():
        print(f"  {k:<30} {v}")
    return analysis

Sequential Testing with mSPRT

Classical fixed-horizon A/B tests require you to decide on the sample size upfront and resist the temptation to peek at results early. Sequential testing with the mixture Sequential Probability Ratio Test (mSPRT) lets you stop early when the evidence is strong.

python
import numpy as np
from scipy import stats

class SequentialTest:
    """
    Mixture Sequential Probability Ratio Test (mSPRT) for proportions.
    Provides anytime-valid p-values: you can look at any time without
    inflating the false positive rate.
    """
    def __init__(self, alpha: float = 0.05, tau: float = 0.005):
        self.alpha       = alpha
        self.tau         = tau   # variance of the mixing distribution
        self.n_control   = 0
        self.n_treatment = 0
        self.sum_control = 0.0
        self.sum_treatment = 0.0

    def update(self, control_outcome: int, treatment_outcome: int):
        self.n_control   += 1
        self.n_treatment += 1
        self.sum_control   += control_outcome
        self.sum_treatment += treatment_outcome

    @property
    def p_control(self) -> float:
        return self.sum_control   / self.n_control   if self.n_control   else 0.0

    @property
    def p_treatment(self) -> float:
        return self.sum_treatment / self.n_treatment if self.n_treatment else 0.0

    def is_significant(self) -> bool:
        if self.n_control < 50 or self.n_treatment < 50:
            return False
        # Two-sided z-test as a conservative approximation to mSPRT
        se    = np.sqrt(self.p_control*(1-self.p_control)/self.n_control +
                        self.p_treatment*(1-self.p_treatment)/self.n_treatment)
        if se == 0:
            return False
        z     = (self.p_treatment - self.p_control) / se
        p_val = 2 * (1 - stats.norm.cdf(abs(z)))
        # mSPRT threshold is tighter; divide alpha by log(e*n) as Wald bound
        adj_alpha = self.alpha / max(1, np.log(np.e * self.n_control))
        return p_val < adj_alpha

    @property
    def summary(self) -> dict:
        return {
            "n_control":    self.n_control,
            "n_treatment":  self.n_treatment,
            "p_control":    round(self.p_control,   4),
            "p_treatment":  round(self.p_treatment, 4),
            "lift":         round(self.p_treatment - self.p_control, 4),
            "significant":  self.is_significant(),
        }

CUPED Variance Reduction

CUPED (Controlled-experiment Using Pre-Experiment Data) reduces the variance of the treatment effect estimator by regressing out a pre-experiment covariate correlated with the metric.

python
import numpy as np
from scipy import stats

def cuped_adjustment(
    y_experiment: np.ndarray,
    y_pre:        np.ndarray,
) -> np.ndarray:
    """
    Returns CUPED-adjusted experiment metric.
    y_pre should be the same metric measured before the experiment started.
    Variance reduction = 1 - corr(y_experiment, y_pre)^2
    """
    theta = np.cov(y_experiment, y_pre)[0, 1] / np.var(y_pre)
    return y_experiment - theta * (y_pre - y_pre.mean())

rng = np.random.default_rng(42)
n   = 1000

# Simulate: pre-period engagement, treatment effect +5pp churn reduction
y_pre_ctrl = rng.normal(0.25, 0.10, n)
y_pre_trt  = rng.normal(0.25, 0.10, n)
y_ctrl     = y_pre_ctrl + rng.normal(0, 0.05, n)          # no treatment effect
y_trt      = y_pre_trt  + rng.normal(-0.05, 0.05, n)      # -5pp treatment effect

# Raw t-test
raw_t, raw_p = stats.ttest_ind(y_ctrl, y_trt)

# CUPED-adjusted t-test
y_ctrl_adj = cuped_adjustment(y_ctrl, y_pre_ctrl)
y_trt_adj  = cuped_adjustment(y_trt,  y_pre_trt)
adj_t, adj_p = stats.ttest_ind(y_ctrl_adj, y_trt_adj)

var_reduction = 1 - np.var(y_trt_adj) / np.var(y_trt)
print(f"Raw t-test:    t={raw_t:.3f}, p={raw_p:.4f}")
print(f"CUPED t-test:  t={adj_t:.3f}, p={adj_p:.4f}")
print(f"Variance reduction: {var_reduction:.1%}")

Key Takeaways

  • Always assign users (not sessions or requests) to experiment variants using a deterministic hash — consistency prevents contamination and makes analysis straightforward.
  • Calculate the required sample size before starting the experiment; running underpowered experiments produces noisy results that waste time and can produce false positives.
  • Shadow deployment is the safest way to compare a challenger model: zero user-facing risk, full score distribution comparison, and challenger logging can run indefinitely until you have enough data.
  • Sequential testing with mSPRT allows early stopping when the evidence is strong without inflating the false positive rate — use it when you need fast iteration cycles.
  • CUPED variance reduction typically cuts required sample sizes by 30-70% by using pre-experiment data as a covariate — this is the single highest-leverage technique for accelerating ML experiments.
  • Slow feedback loops (churn, LTV) require proxy metrics or very long experiment windows; document the metric and its lag explicitly before starting the experiment.
  • Interleaving (showing results from both models in a single ranked list) is 10x more statistically efficient than A/B for ranking tasks — worth implementing if your model is a ranker.
  • Canary release and A/B testing serve different purposes: canary is a deployment safety check (is the new server stable?); A/B testing is a business value validation (does the new model improve the metric?). Run both.