GadaaLabs
Data Science Fundamentals — From Theory to Production Models
Lesson 12

End-to-End Data Science Pipeline — From Problem to Production

28 min

A model is not a deliverable. A deployed, monitored, reproducible system that helps humans make better decisions is a deliverable. The gap between a notebook with 0.85 AUC and a system that runs reliably in production is where most data science projects fail. This lesson assembles everything from the course into one complete pipeline and makes the operational concerns explicit.

Project Lifecycle

Problem definition   → What decision does this enable? What is the cost of a false positive vs false negative?
Data                 → Where does it come from? What are the refresh cadences? Who owns it?
EDA                  → Class balance, missing values, distributions, correlations, leakage risks
Feature engineering  → Transforms, encodings, aggregations (all inside a Pipeline)
Model selection      → Multiple algorithms compared on cross-validation ROC-AUC
Evaluation           → Confusion matrix, ROC, PR curve, calibration, SHAP
Serialise            → joblib for sklearn Pipeline (ONNX for neural nets)
Serving              → FastAPI with Pydantic validation, health endpoint, async error handling
Monitoring           → Prediction distribution, feature drift, delayed label performance

Step 1: Generate the Synthetic Dataset

python
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

rng = np.random.default_rng(42)
n   = 1200

# Synthetic SaaS churn dataset
df = pd.DataFrame({
    # Usage
    "tenure_months":      np.clip(rng.exponential(18, n), 1, 60).astype(int),
    "monthly_spend":      np.clip(rng.normal(120, 60, n), 10, 500),
    "support_tickets":    rng.integers(0, 15, n),
    "feature_usage_pct":  np.clip(rng.beta(2, 5, n), 0, 1),  # right-skewed usage
    "last_login_days_ago":rng.integers(0, 90, n),
    "num_integrations":   rng.integers(0, 12, n),
    # Account metadata
    "contract_type":      rng.choice(["monthly", "annual", "multi-year"], n,
                                      p=[0.5, 0.35, 0.15]),
    "plan_tier":          rng.choice(["starter", "growth", "enterprise"], n,
                                      p=[0.55, 0.30, 0.15]),
    "is_enterprise":      rng.binomial(1, 0.15, n),
})

# Inject ~4% missing values in two numeric columns
for col in ["monthly_spend", "last_login_days_ago"]:
    missing_mask = rng.random(n) < 0.04
    df.loc[missing_mask, col] = np.nan

# Target: churned (driven by tenure, support tickets, low usage, monthly contract)
log_odds = (
    -1.5
    - 0.05 * df["tenure_months"].fillna(df["tenure_months"].median())
    + 0.12 * df["support_tickets"]
    - 2.0  * df["feature_usage_pct"]
    + 0.02 * df["last_login_days_ago"].fillna(30)
    - 0.15 * df["num_integrations"]
    + 0.8  * (df["contract_type"] == "monthly").astype(int)
    - 0.5  * df["is_enterprise"]
)
prob_churn = 1 / (1 + np.exp(-log_odds))
df["churned"] = rng.binomial(1, prob_churn)

print(f"Dataset shape: {df.shape}")
print(f"Churn rate:    {df['churned'].mean():.2%}")
print(f"Missing values:\n{df.isnull().sum()[df.isnull().sum() > 0]}")

Step 2: Exploratory Data Analysis

python
import matplotlib.pyplot as plt
import seaborn as sns

X_raw = df.drop(columns=["churned"])
y     = df["churned"]

# Class balance
print(f"Churned:     {y.sum()} ({y.mean():.2%})")
print(f"Not churned: {(~y.astype(bool)).sum()} ({(1-y.mean()):.2%})")

# Missing value summary
missing_pct = (df.isnull().sum() / len(df) * 100).round(2)
print("\nMissing % per column:")
print(missing_pct[missing_pct > 0])

# Numeric feature distributions split by churn
numeric_cols = ["tenure_months", "monthly_spend", "support_tickets",
                "feature_usage_pct", "last_login_days_ago", "num_integrations"]

fig, axes = plt.subplots(2, 3, figsize=(15, 8))
for ax, col in zip(axes.ravel(), numeric_cols):
    df.groupby("churned")[col].plot.hist(ax=ax, bins=30, alpha=0.6,
                                          legend=True, density=True)
    ax.set_title(col)
plt.suptitle("Numeric feature distributions by churn")
plt.tight_layout()
plt.show()

# Correlation heatmap (numeric only, fill NaN for correlation calc)
corr = df[numeric_cols + ["churned"]].fillna(df[numeric_cols].median()).corr()
fig, ax = plt.subplots(figsize=(9, 7))
sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", center=0,
            linewidths=0.5, ax=ax)
ax.set_title("Pearson Correlation Heatmap")
plt.tight_layout()
plt.show()

Step 3: Feature Engineering Pipeline

python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PowerTransformer
from sklearn.impute import SimpleImputer

numeric_features     = ["tenure_months", "monthly_spend", "support_tickets",
                        "feature_usage_pct", "last_login_days_ago",
                        "num_integrations", "is_enterprise"]
categorical_features = ["contract_type", "plan_tier"]

numeric_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("power",   PowerTransformer(method="yeo-johnson", standardize=False)),
    ("scaler",  StandardScaler()),
])

categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("ohe",     OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])

preprocessor = ColumnTransformer([
    ("num", numeric_pipeline,     numeric_features),
    ("cat", categorical_pipeline, categorical_features),
])

X = df.drop(columns=["churned"])
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

Step 4: Model Comparison

python
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, StratifiedKFold
import xgboost as xgb

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

candidates = {
    "LogisticRegression": Pipeline([
        ("prep", preprocessor),
        ("clf",  LogisticRegression(C=1.0, max_iter=1000, random_state=42)),
    ]),
    "RandomForest": Pipeline([
        ("prep", preprocessor),
        ("clf",  RandomForestClassifier(n_estimators=300, max_features="sqrt",
                                         n_jobs=-1, random_state=42)),
    ]),
    "XGBClassifier": Pipeline([
        ("prep", preprocessor),
        ("clf",  xgb.XGBClassifier(n_estimators=300, max_depth=4, learning_rate=0.05,
                                    subsample=0.8, colsample_bytree=0.8,
                                    min_child_weight=5, tree_method="hist",
                                    eval_metric="auc", random_state=42, verbose=0)),
    ]),
}

results = {}
for name, pipe in candidates.items():
    scores = cross_val_score(pipe, X_train, y_train, cv=cv, scoring="roc_auc", n_jobs=-1)
    results[name] = scores
    print(f"{name:25s}: {scores.mean():.4f} ± {scores.std():.4f}")

best_name = max(results, key=lambda k: results[k].mean())
print(f"\nBest model: {best_name}")

Step 5: Full Evaluation of Best Model

python
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (
    confusion_matrix, classification_report,
    roc_curve, auc, precision_recall_curve,
    average_precision_score, RocCurveDisplay,
    PrecisionRecallDisplay, CalibrationDisplay,
)

best_pipeline = candidates[best_name]
best_pipeline.fit(X_train, y_train)
y_prob = best_pipeline.predict_proba(X_test)[:, 1]
y_pred = (y_prob >= 0.4).astype(int)  # lower threshold: prefer recall for churn

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(cm)

# Classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=["retained", "churned"]))

# Four evaluation plots
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# ROC curve
fpr, tpr, _ = roc_curve(y_test, y_prob)
roc_auc     = auc(fpr, tpr)
axes[0, 0].plot(fpr, tpr, color="#7c3aed", lw=2, label=f"AUC = {roc_auc:.4f}")
axes[0, 0].plot([0, 1], [0, 1], "k--", lw=1)
axes[0, 0].set_title("ROC Curve")
axes[0, 0].set_xlabel("False Positive Rate")
axes[0, 0].set_ylabel("True Positive Rate")
axes[0, 0].legend()

# PR curve
precision, recall, _ = precision_recall_curve(y_test, y_prob)
ap = average_precision_score(y_test, y_prob)
axes[0, 1].plot(recall, precision, color="#06b6d4", lw=2, label=f"AP = {ap:.4f}")
baseline = y_test.mean()
axes[0, 1].axhline(baseline, ls="--", color="gray", label=f"Baseline ({baseline:.2f})")
axes[0, 1].set_title("Precision-Recall Curve")
axes[0, 1].set_xlabel("Recall")
axes[0, 1].set_ylabel("Precision")
axes[0, 1].legend()

# Calibration curve
CalibrationDisplay.from_predictions(
    y_test, y_prob, n_bins=10,
    ax=axes[1, 0], name=best_name,
    color="#7c3aed",
)
axes[1, 0].set_title("Calibration Curve")

# Prediction score distribution by true label
axes[1, 1].hist(y_prob[y_test == 0], bins=40, alpha=0.6,
                 color="#06b6d4", label="Retained", density=True)
axes[1, 1].hist(y_prob[y_test == 1], bins=40, alpha=0.6,
                 color="#f87171", label="Churned", density=True)
axes[1, 1].axvline(0.4, ls="--", color="black", label="Decision threshold=0.4")
axes[1, 1].set_title("Score Distribution by True Label")
axes[1, 1].legend()

plt.suptitle(f"{best_name} — Full Evaluation", fontsize=14)
plt.tight_layout()
plt.show()

Step 6: SHAP Summary

python
import shap

# Extract the sklearn pipeline's preprocessed training data for SHAP
X_train_processed = best_pipeline.named_steps["prep"].transform(X_train)
X_test_processed  = best_pipeline.named_steps["prep"].transform(X_test)

# Get feature names after OHE
ohe_cols = (
    best_pipeline.named_steps["prep"]
    .named_transformers_["cat"]
    .named_steps["ohe"]
    .get_feature_names_out(categorical_features)
)
feature_names = numeric_features + list(ohe_cols)

raw_clf = best_pipeline.named_steps["clf"]
explainer   = shap.TreeExplainer(raw_clf)
shap_values = explainer(
    X_test_processed,
    feature_names=feature_names,
)
shap.summary_plot(shap_values, X_test_processed,
                   feature_names=feature_names, max_display=12, show=True)

# Waterfall for the highest-probability churner
top_churner_idx = np.argmax(y_prob)
shap.waterfall_plot(shap_values[top_churner_idx], max_display=10, show=True)

Step 7: Serialise the Pipeline

python
import joblib
import hashlib
import os

model_path = "churn_pipeline_v1.joblib"
joblib.dump(best_pipeline, model_path, compress=3)
print(f"Model saved: {model_path} ({os.path.getsize(model_path) / 1024:.1f} KB)")

# Store SHA256 for integrity verification at load time
with open(model_path, "rb") as f:
    sha256 = hashlib.sha256(f.read()).hexdigest()
print(f"SHA256: {sha256}")

# Safe load helper
def load_model(path: str, expected_sha256: str = None):
    if expected_sha256:
        with open(path, "rb") as f:
            actual = hashlib.sha256(f.read()).hexdigest()
        if actual != expected_sha256:
            raise ValueError(f"Checksum mismatch: model file may be corrupted")
    return joblib.load(path)

pipeline = load_model(model_path, sha256)
assert roc_auc_score(y_test, pipeline.predict_proba(X_test)[:, 1]) == roc_auc_score(y_test, y_prob)
print("Model round-trip OK")

Step 8: FastAPI Prediction Endpoint

python
# src/serve.py
from contextlib import asynccontextmanager
from typing import Literal
import hashlib
import joblib
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, field_validator

MODEL_PATH = "churn_pipeline_v1.joblib"
_model = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Load model at startup; release at shutdown."""
    _model["pipeline"] = joblib.load(MODEL_PATH)
    # Warm-up: run a dummy prediction to avoid cold-start latency
    dummy = pd.DataFrame([{
        "tenure_months": 12, "monthly_spend": 99.0, "support_tickets": 2,
        "feature_usage_pct": 0.4, "last_login_days_ago": 7, "num_integrations": 3,
        "contract_type": "annual", "plan_tier": "growth", "is_enterprise": 0,
    }])
    _model["pipeline"].predict_proba(dummy)
    print("Model loaded and warmed up.")
    yield
    _model.clear()
    print("Model released.")

app = FastAPI(title="Churn Prediction API", version="1.0.0", lifespan=lifespan)

class ChurnRequest(BaseModel):
    tenure_months:      int   = Field(..., ge=1, le=120)
    monthly_spend:      float = Field(..., ge=0)
    support_tickets:    int   = Field(..., ge=0, le=50)
    feature_usage_pct:  float = Field(..., ge=0.0, le=1.0)
    last_login_days_ago:int   = Field(..., ge=0, le=365)
    num_integrations:   int   = Field(..., ge=0, le=50)
    contract_type:      Literal["monthly", "annual", "multi-year"]
    plan_tier:          Literal["starter", "growth", "enterprise"]
    is_enterprise:      Literal[0, 1]

class ChurnResponse(BaseModel):
    churn_probability: float
    churn_predicted:   bool
    risk_band:         Literal["low", "medium", "high"]

@app.get("/health")
async def health():
    return {"status": "ok"}

@app.get("/ready")
async def ready():
    if "pipeline" not in _model:
        raise HTTPException(503, "Model not loaded")
    return {"status": "ready"}

@app.post("/predict", response_model=ChurnResponse)
async def predict(req: ChurnRequest):
    try:
        row = pd.DataFrame([req.model_dump()])
        prob = float(_model["pipeline"].predict_proba(row)[0, 1])
    except Exception as exc:
        raise HTTPException(500, f"Inference error: {exc}") from exc

    if prob < 0.3:
        band = "low"
    elif prob < 0.6:
        band = "medium"
    else:
        band = "high"

    return ChurnResponse(
        churn_probability=round(prob, 4),
        churn_predicted=prob >= 0.4,
        risk_band=band,
    )

# Run: uvicorn src.serve:app --host 0.0.0.0 --port 8000

Model Card

markdown
# Model Card: Churn Prediction v1.0

## Model Details
- **Type**: XGBoost classifier (sklearn Pipeline with preprocessing)
- **Version**: 1.0.0
- **Created**: 2026-03-29
- **Owner**: GadaaLabs Data Science Team
- **Artifact**: `churn_pipeline_v1.joblib`

## Intended Use
- **Primary use**: Predict 30-day churn probability for SaaS customers
- **Intended users**: Customer success team, automated retention workflows
- **Out-of-scope**: Credit decisions, employment decisions, any regulatory context

## Training Data
- Synthetic dataset, 1200 customers, 9 features
- Churn rate: ~24%
- Split: 80% train / 20% test, stratified on churn

## Evaluation Results (Test Set)
| Metric | Value |
|--------|-------|
| ROC-AUC | ~0.87 |
| Average Precision | ~0.76 |
| Decision threshold | 0.40 |

## Features Used
tenure_months, monthly_spend, support_tickets, feature_usage_pct,
last_login_days_ago, num_integrations, contract_type, plan_tier, is_enterprise

## Fairness Considerations
No protected attributes are included in the feature set. Model should be
audited for proxy discrimination via SHAP analysis before production use.

## Limitations
Trained on synthetic data. Retrain on real customer data before deployment.
Performance may degrade on customer segments not represented in training.

## Monitoring Requirements
- Daily prediction distribution monitoring (PSI &lt; 0.2)
- Weekly feature drift check on numeric features
- Monthly performance evaluation when labels arrive

20-Item Pre-Delivery Checklist

python
CHECKLIST = """
Pre-delivery checklist for Churn Prediction v1.0
=================================================
Data & Features
  [ ] 1.  All features present in training are present in serving payload schema
  [ ] 2.  No training data leakage (target not used in feature computation)
  [ ] 3.  All preprocessing steps are inside the sklearn Pipeline (fit on train only)
  [ ] 4.  Missing value strategy documented and tested on production-like data
  [ ] 5.  No features derived from data recorded AFTER the prediction target event

Model Quality
  [ ] 6.  CV ROC-AUC evaluated on stratified 5-fold (not simple random split)
  [ ] 7.  PR curve and calibration curve reviewed (not just AUC)
  [ ] 8.  Performance evaluated on at least one demographic/segment slice
  [ ] 9.  SHAP summary plot reviewed — no unexpected top features
  [ ] 10. Decision threshold chosen based on business cost of FP vs FN

Serialisation & Reproducibility
  [ ] 11. Model artifact saved with SHA256 checksum recorded
  [ ] 12. Exact versions of all training dependencies pinned (requirements.txt)
  [ ] 13. Training script is version-controlled and re-runs to the same metric
  [ ] 14. Training random seeds are fixed and documented

Serving
  [ ] 15. FastAPI server has /health and /ready endpoints
  [ ] 16. Pydantic schema validates all input fields with correct types and bounds
  [ ] 17. Server returns meaningful HTTP 422/500 on invalid/failed input
  [ ] 18. Model loaded at startup (not per request); warm-up inference run at startup

Monitoring & Operations
  [ ] 19. Prediction distribution baseline recorded for drift monitoring
  [ ] 20. Alerting configured for PSI > 0.2 on any input feature
"""
print(CHECKLIST)

Key Takeaways

  • Start every project by defining the business decision the model enables and the cost of each error type — these determine the evaluation metric and the decision threshold.
  • All preprocessing must live inside a sklearn Pipeline fitted only on training data; applying transforms computed on the test set is data leakage even if it seems minor.
  • Compare multiple algorithm families (linear, tree ensemble, gradient boosting) with the same preprocessing pipeline and the same CV strategy before committing to hyperparameter tuning.
  • ROC-AUC alone is insufficient — always examine the precision-recall curve for imbalanced problems and the calibration curve if predicted probabilities will be used directly.
  • Serialise with joblib for sklearn pipelines; record the SHA256 checksum and pin all library versions to make the artifact reproducible and verifiable.
  • A FastAPI serving endpoint should load the model once at startup via lifespan, run a warm-up prediction to avoid cold-start latency, and validate all inputs through Pydantic before passing them to the model.
  • A model card documents intended use, evaluation results, features used, fairness considerations, and monitoring requirements — it is the artifact that travels with the model and enables governance.
  • The 20-item checklist covers the five most common production failure modes: leakage, poor evaluation, non-reproducibility, serving errors, and silent model decay.