GadaaLabs
Data Science Fundamentals
Lesson 8

Building a Complete DS Pipeline

17 min

Notebook code that runs top-to-bottom once is not a pipeline — it is a script. A real pipeline is a reusable, reproducible object that applies the same transformations to training data, validation data, and future production data in a guaranteed-consistent way. Scikit-learn's Pipeline and ColumnTransformer give you exactly that.

Why Pipelines Matter

Without a pipeline:

  • You might accidentally fit a scaler on test data (leakage).
  • Preprocessing code gets copy-pasted and can diverge between training and serving.
  • Hyperparameter tuning loops become error-prone boilerplate.
  • Serialising "a model" requires separately serialising and loading multiple objects.

With a pipeline, the entire transformation + estimation chain is a single, serialisable object.

Dataset Setup

python
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score
import joblib
import warnings
warnings.filterwarnings("ignore")

# Titanic from OpenML — realistic messy dataset
titanic = fetch_openml("titanic", version=1, as_frame=True)
df = titanic.frame.copy()

# Select and prepare target
df["survived"] = (df["survived"] == "1").astype(int)

NUMERIC_FEATURES    = ["age", "fare", "sibsp", "parch"]
CATEGORICAL_FEATURES = ["pclass", "sex", "embarked"]
TARGET              = "survived"

X = df[NUMERIC_FEATURES + CATEGORICAL_FEATURES]
y = df[TARGET]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.20, random_state=42, stratify=y
)

print(f"Train: {X_train.shape}, Test: {X_test.shape}")
print(f"Null counts:\n{X_train.isnull().sum()}")

Building the ColumnTransformer

ColumnTransformer applies different transformation sequences to different column subsets — numeric and categorical features require completely different preprocessing:

python
# Numeric pipeline: impute with median → scale
numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler",  StandardScaler())
])

# Categorical pipeline: impute with most frequent → one-hot encode
categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

# Combine into a single ColumnTransformer
preprocessor = ColumnTransformer(transformers=[
    ("num", numeric_transformer, NUMERIC_FEATURES),
    ("cat", categorical_transformer, CATEGORICAL_FEATURES)
])

Assembling the Full Pipeline

python
# Full pipeline: preprocessor → model
pipeline = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("classifier",   GradientBoostingClassifier(n_estimators=200, max_depth=3,
                                                 learning_rate=0.1, random_state=42))
])

# Train — a single call handles ALL preprocessing and model training
pipeline.fit(X_train, y_train)

# Predict — same pipeline applies preprocessing automatically
y_pred = pipeline.predict(X_test)
y_prob = pipeline.predict_proba(X_test)[:, 1]

print(classification_report(y_test, y_pred, target_names=["Died", "Survived"]))
print(f"AUC-ROC: {roc_auc_score(y_test, y_prob):.4f}")

Hyperparameter Tuning with GridSearchCV

GridSearchCV searches over a parameter grid using cross-validation. Inside a pipeline, parameters are addressed with the syntax stepname__parametername:

python
param_grid = {
    "classifier__n_estimators":  [100, 200],
    "classifier__max_depth":     [2, 3, 4],
    "classifier__learning_rate": [0.05, 0.10, 0.20],
    "preprocessor__num__imputer__strategy": ["mean", "median"]
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=cv,
    scoring="roc_auc",
    n_jobs=-1,
    verbose=1,
    refit=True   # refit the best model on all training data
)

grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV AUC:     {grid_search.best_score_:.4f}")

best_pipeline = grid_search.best_estimator_
y_prob_best   = best_pipeline.predict_proba(X_test)[:, 1]
print(f"Test AUC:        {roc_auc_score(y_test, y_prob_best):.4f}")

Inspecting Feature Importance Through the Pipeline

python
# Access the fitted model inside the pipeline
best_clf = best_pipeline.named_steps["classifier"]

# Recover feature names from the preprocessor
cat_feature_names = (
    best_pipeline.named_steps["preprocessor"]
    .named_transformers_["cat"]
    .named_steps["encoder"]
    .get_feature_names_out(CATEGORICAL_FEATURES)
)
all_feature_names = NUMERIC_FEATURES + list(cat_feature_names)

importance_df = (
    pd.DataFrame({
        "feature":    all_feature_names,
        "importance": best_clf.feature_importances_
    })
    .sort_values("importance", ascending=False)
    .head(10)
)

print(importance_df.to_string(index=False))

Pipeline steps and their responsibilities:

| Pipeline Step | Responsibility | Key Parameters | |---|---|---| | SimpleImputer | Fill missing values | strategy: mean, median, most_frequent | | StandardScaler | Zero-mean, unit-variance scaling | with_mean, with_std | | OneHotEncoder | Categorical → binary columns | handle_unknown="ignore" | | ColumnTransformer | Route columns to correct transformers | transformers, remainder | | GradientBoostingClassifier | Model training | n_estimators, max_depth, learning_rate |

Serialising and Loading the Pipeline

A trained pipeline is serialised with joblib — a single file contains the entire preprocessing chain plus the model:

python
# Save to disk
joblib.dump(best_pipeline, "titanic_pipeline_v1.pkl")

# Load and use — no code changes needed
loaded_pipeline = joblib.load("titanic_pipeline_v1.pkl")

# The loaded pipeline is ready to predict on raw, unprocessed data
new_passenger = pd.DataFrame([{
    "age": 28, "fare": 52.0, "sibsp": 0, "parch": 0,
    "pclass": "1", "sex": "female", "embarked": "S"
}])

prob = loaded_pipeline.predict_proba(new_passenger)[0, 1]
print(f"Survival probability: {prob:.3f}")

Complete Pipeline Checklist

Before shipping any pipeline to production, verify:

  1. Leakage audit: All transformers are fitted only on training data (fit_transform on train, transform on test).
  2. Unknown category handling: OneHotEncoder(handle_unknown="ignore") prevents failures on unseen categories.
  3. Null robustness: SimpleImputer is present for every column that can contain nulls.
  4. Version pinning: Log the scikit-learn version alongside the saved model file.
  5. Input validation: Assert that the input DataFrame has the expected columns and dtypes before predict().
  6. Performance baseline: Compare against a trivial model (always predict majority class) to confirm the pipeline adds value.

Summary

  • A scikit-learn Pipeline chains preprocessing and modelling steps into a single object that prevents leakage and is serialisable with joblib.
  • ColumnTransformer applies different transformation sequences to numeric and categorical columns simultaneously.
  • Parameters within a pipeline are tuned via GridSearchCV using the stepname__parametername syntax; refit=True trains the final model on all training data.
  • The trained pipeline applies the same preprocessing to any new DataFrame automatically — no separate transformation code required at serving time.
  • Always inspect feature importances and validate on the held-out test set exactly once, after all tuning is complete, to report an honest generalisation estimate.