Loading a dataset is the easiest step in any analysis. Understanding what you have loaded is the hardest. Most analysts run .head(), glance at a few rows, and proceed. Expert analysts treat first contact as a distinct, structured phase that produces documented output before any transformation begins.
This lesson covers two things: how to load data from every common source with production-quality parameters, and how to build an automated profiling system that gives you a complete picture of any DataFrame in seconds.
Loading from CSV — Every Parameter That Matters
pd.read_csv has over 50 parameters. Most analysts use none of them. The parameters below are the ones that actually affect analysis correctness.
python
import pandas as pdimport numpy as npfrom pathlib import Path# Basic load — adequate for small clean files onlydf_basic = pd.read_csv("data/orders.csv")# Production load — explicit types prevent silent errorsorders = pd.read_csv( "data/orders.csv", # Explicitly specify dtypes for known columns. # Without this, pandas infers — often incorrectly for IDs and codes. dtype={ "order_id": "int64", "customer_id": "int64", "product_id": "str", # Product codes like "SKU-001" must stay string "quantity": "int32", "status": "category", # Repeated string values → category saves memory }, # Parse date columns during load — faster than post-load pd.to_datetime parse_dates=["order_date", "shipped_date", "delivered_date"], # Treat these strings as NaN in addition to the default set na_values=["N/A", "n/a", "NONE", "none", "-", "--", "unknown", "Unknown"], # Suppress DtypeWarning: mixed types in a column. # IMPORTANT: Always investigate the warning before suppressing it. # low_memory=False reads the whole file before inferring types — slower but accurate. low_memory=False, # Skip completely empty rows skip_blank_lines=True, # Encoding — always specify; latin-1 handles most Western European legacy data encoding="utf-8", # Change to "latin-1" if you see UnicodeDecodeError)print(f"Loaded: {orders.shape[0]:,} rows × {orders.shape[1]} columns")print(f"Memory: {orders.memory_usage(deep=True).sum() / 1e6:.1f} MB")
Loading Large Files in Chunks
When a file exceeds available RAM, read it in chunks:
python
from typing import Callableimport pandas as pddef load_large_csv_filtered( path: str, chunk_size: int = 100_000, filter_fn: Callable[[pd.DataFrame], pd.DataFrame] | None = None,) -> pd.DataFrame: """ Load a large CSV in chunks, apply an optional filter to each chunk, and concatenate into a single DataFrame. Args: path: Path to the CSV file. chunk_size: Rows per chunk. filter_fn: Optional function applied to each chunk before appending. Returns: Filtered DataFrame. """ chunks = [] rows_read = 0 with pd.read_csv(path, chunksize=chunk_size) as reader: for chunk in reader: rows_read += len(chunk) if filter_fn is not None: chunk = filter_fn(chunk) if len(chunk) > 0: chunks.append(chunk) result = pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame() print(f"Read {rows_read:,} rows total, kept {len(result):,} after filter.") return result# Example: load only 2023 orders from a 10M row filedef filter_2023(chunk: pd.DataFrame) -> pd.DataFrame: chunk["order_date"] = pd.to_datetime(chunk["order_date"], errors="coerce") return chunk[chunk["order_date"].dt.year == 2023]# orders_2023 = load_large_csv_filtered("data/orders_full.csv", filter_fn=filter_2023)
Loading from Excel
Excel files introduce complexity that CSV files do not: multiple sheets, merged header rows, data that starts on row 3, trailing summary rows. Handle all of it explicitly.
python
import pandas as pd# Single sheet, default header row 0df = pd.read_excel("data/sales_report.xlsx", sheet_name="Orders")# Multiple sheets at once → returns dict of DataFramessheets = pd.read_excel("data/sales_report.xlsx", sheet_name=None) # None = all sheetsprint("Sheets found:", list(sheets.keys()))# Header on row 2 (0-indexed → row index 1), skip first rowdf_awkward = pd.read_excel( "data/legacy_report.xlsx", sheet_name="Revenue", header=1, # Row index 1 contains column names skiprows=[0], # Skip the first row (e.g., a title row) skipfooter=3, # Skip the last 3 rows (e.g., "Total" summary rows) usecols="A:H", # Read only columns A through H dtype={"Revenue": float, "Units": int},)# Combine multiple sheets with source trackingdef load_all_sheets(path: str) -> pd.DataFrame: """Load all sheets from an Excel workbook and add a 'sheet' column.""" all_sheets = pd.read_excel(path, sheet_name=None) frames = [] for sheet_name, df in all_sheets.items(): df = df.copy() df["source_sheet"] = sheet_name frames.append(df) combined = pd.concat(frames, ignore_index=True) print(f"Combined {len(all_sheets)} sheets: {combined.shape}") return combined
Loading from SQL Databases
SQL databases are the most common data source in production analytics. Use SQLAlchemy as the connection layer — it gives you database portability and query safety.
python
import pandas as pdfrom sqlalchemy import create_engine, text# Connection strings by database type:# PostgreSQL: postgresql+psycopg2://user:pass@host:5432/dbname# MySQL: mysql+pymysql://user:pass@host:3306/dbname# SQLite: sqlite:///path/to/file.db# BigQuery: bigquery://project/dataset (requires sqlalchemy-bigquery)engine = create_engine( "postgresql+psycopg2://analyst:secret@db.internal:5432/ecommerce", # Connection pool settings for repeated queries pool_size=5, max_overflow=2, pool_pre_ping=True, # Test connections before using (handles stale connections))# Simple queryorders = pd.read_sql( "SELECT * FROM orders WHERE order_date >= '2023-01-01'", con=engine, parse_dates=["order_date", "shipped_date"],)# Parameterised query — NEVER use f-strings for user input (SQL injection risk)start_date = "2023-01-01"end_date = "2023-09-30"orders = pd.read_sql( text("SELECT * FROM orders WHERE order_date BETWEEN :start AND :end"), con=engine, params={"start": start_date, "end": end_date}, parse_dates=["order_date"],)# Chunking large tables — avoids loading millions of rows into RAM at oncedef load_sql_chunked( query: str, engine, chunk_size: int = 50_000,) -> pd.DataFrame: """Load a large SQL result set in chunks.""" chunks = [] for chunk in pd.read_sql(query, con=engine, chunksize=chunk_size): chunks.append(chunk) return pd.concat(chunks, ignore_index=True)# Best practice: push filtering and aggregation to the database# BAD: Load 10M rows, filter in Python# bad_df = pd.read_sql("SELECT * FROM events", engine)# filtered = bad_df[bad_df["event_type"] == "purchase"]# GOOD: Filter in SQL, load only what you need# good_df = pd.read_sql(# "SELECT * FROM events WHERE event_type = 'purchase' AND event_date >= '2023-01-01'",# engine# )
Loading from JSON and Nested JSON
JSON is the lingua franca of APIs. Nested JSON — objects within objects within arrays — is the norm, not the exception.
python
import pandas as pdimport jsonfrom pathlib import Path# Flat JSON (list of records)orders = pd.read_json("data/orders.json")# Nested JSON — use json_normalizewith open("data/orders_nested.json") as f: raw = json.load(f)# Example structure:# [# {# "order_id": 1001,# "customer": {"id": 42, "name": "Alice", "tier": "premium"},# "items": [{"sku": "A1", "qty": 2, "price": 29.99}],# "shipping": {"city": "Berlin", "country": "DE"}# },# ...# ]# Flatten one level: customer and shipping nested objectsorders_flat = pd.json_normalize( raw, # Columns from nested dicts become "customer.id", "customer.name", etc. sep=".", # Expand the "items" list into separate rows, keeping parent fields record_path="items", meta=[ "order_id", ["customer", "id"], ["customer", "tier"], ["shipping", "city"], ["shipping", "country"], ], meta_prefix="", # No prefix for meta fields errors="ignore", # Skip records missing optional nested fields)# Rename flattened columnsorders_flat = orders_flat.rename(columns={ "customer.id": "customer_id", "customer.tier": "customer_tier", "shipping.city": "city", "shipping.country": "country",})print(orders_flat.head(3))print(orders_flat.dtypes)
Loading from APIs with Pagination
Most production APIs paginate results. A robust loader handles pagination automatically.
python
import requestsimport pandas as pdfrom typing import Anyimport timedef fetch_paginated_api( base_url: str, endpoint: str, headers: dict[str, str], params: dict[str, Any] | None = None, page_param: str = "page", page_size_param: str = "per_page", page_size: int = 100, results_key: str = "data", max_pages: int = 500, rate_limit_sleep: float = 0.1,) -> pd.DataFrame: """ Fetch all pages from a paginated REST API endpoint. Args: base_url: Root URL, e.g. "https://api.example.com" endpoint: Path, e.g. "/v1/orders" headers: Auth headers, e.g. {"Authorization": "Bearer TOKEN"} params: Additional query params page_param: Name of the page number parameter page_size_param: Name of the page size parameter page_size: Records per request results_key: JSON key that contains the list of records max_pages: Safety limit to prevent infinite loops rate_limit_sleep: Seconds to sleep between requests Returns: All records as a DataFrame. """ params = params or {} params[page_size_param] = page_size all_records: list[dict] = [] page = 1 while page <= max_pages: params[page_param] = page response = requests.get( f"{base_url}{endpoint}", headers=headers, params=params, timeout=30, ) response.raise_for_status() payload = response.json() records = payload.get(results_key, []) if not records: break # No more data all_records.extend(records) print(f" Page {page}: fetched {len(records)} records (total: {len(all_records):,})") # Some APIs signal last page explicitly if not payload.get("has_more", True): break page += 1 time.sleep(rate_limit_sleep) df = pd.DataFrame(all_records) print(f"Loaded {len(df):,} records from {page - 1} pages.") return df# Usage example (commented out — requires live API)# orders_api = fetch_paginated_api(# base_url="https://api.gadaalabs.com",# endpoint="/v1/orders",# headers={"Authorization": f"Bearer {API_TOKEN}"},# params={"status": "completed", "start_date": "2023-01-01"},# )
First Contact Checklist
After loading, run this checklist on every new dataset before doing anything else:
python
import pandas as pdimport numpy as npdef first_contact(df: pd.DataFrame, name: str = "DataFrame") -> None: """ Run the standard first-contact checklist on a newly loaded DataFrame. Prints a structured summary to stdout. """ print(f"{'=' * 60}") print(f"FIRST CONTACT: {name}") print(f"{'=' * 60}") # 1. Shape rows, cols = df.shape print(f"\n[Shape] {rows:,} rows × {cols} columns") # 2. Memory mem_mb = df.memory_usage(deep=True).sum() / 1e6 print(f"[Memory] {mem_mb:.1f} MB") # 3. Column names and types print(f"\n[Dtypes]") dtype_summary = df.dtypes.value_counts() for dtype, count in dtype_summary.items(): print(f" {str(dtype):<15} {count} columns") # 4. Null counts null_counts = df.isnull().sum() null_cols = null_counts[null_counts > 0] if len(null_cols) > 0: print(f"\n[Nulls] {len(null_cols)} columns have nulls:") for col, count in null_cols.sort_values(ascending=False).items(): pct = count / rows * 100 print(f" {col:<30} {count:>8,} ({pct:.1f}%)") else: print(f"\n[Nulls] None — all columns complete.") # 5. Duplicate rows n_dupes = df.duplicated().sum() print(f"\n[Duplicates] {n_dupes:,} fully duplicate rows ({n_dupes/rows*100:.2f}%)") # 6. Numeric summary numeric_cols = df.select_dtypes(include="number").columns if len(numeric_cols) > 0: print(f"\n[Numeric Summary] {len(numeric_cols)} numeric columns:") print(df[numeric_cols].describe().round(2).to_string()) # 7. Categorical/object summary cat_cols = df.select_dtypes(include=["object", "category"]).columns if len(cat_cols) > 0: print(f"\n[Categorical Columns] {len(cat_cols)} columns:") for col in cat_cols[:10]: # Show first 10 to avoid flooding output n_unique = df[col].nunique() top_val = df[col].value_counts().index[0] if n_unique > 0 else "N/A" print(f" {col:<30} unique: {n_unique:>6,} top: '{top_val}'") # 8. Date columns date_cols = df.select_dtypes(include=["datetime64"]).columns if len(date_cols) > 0: print(f"\n[Date Columns]:") for col in date_cols: print(f" {col:<30} min: {df[col].min()} max: {df[col].max()}") print(f"\n{'=' * 60}\n")# Simulate a dataset for demonstrationnp.random.seed(0)n = 2000demo_orders = pd.DataFrame({ "order_id": range(1001, 1001 + n), "customer_id": np.random.choice(range(1, 501), size=n), "product_id": np.random.choice(["SKU-001", "SKU-002", "SKU-003", "SKU-004"], size=n), "order_date": pd.date_range("2023-01-01", periods=n, freq="2h"), "revenue": np.random.exponential(scale=80, size=n).round(2), "quantity": np.random.randint(1, 6, size=n), "status": np.random.choice(["completed", "cancelled", "refunded"], p=[0.75, 0.15, 0.10], size=n), "channel": np.random.choice(["organic", "paid", "email", "direct"], size=n), "country": np.random.choice(["US", "DE", "UK", "FR", None], p=[0.4, 0.2, 0.2, 0.1, 0.1], size=n),})first_contact(demo_orders, name="orders")
Automated Column Profiling
The first_contact function gives an overview. The profile_dataframe function goes deeper — one rich summary per column, formatted for logging and comparison.
python
from typing import Anyimport pandas as pdimport numpy as npdef profile_column(series: pd.Series) -> dict[str, Any]: """ Generate a detailed profile for a single Series. Returns a dict suitable for appending to a profiling report DataFrame. """ n_total = len(series) n_null = series.isnull().sum() n_non_null = n_total - n_null n_unique = series.nunique(dropna=True) profile: dict[str, Any] = { "column": series.name, "dtype": str(series.dtype), "n_total": n_total, "n_null": int(n_null), "null_pct": round(n_null / n_total * 100, 2) if n_total > 0 else 0.0, "n_unique": int(n_unique), "cardinality_ratio": round(n_unique / n_non_null, 4) if n_non_null > 0 else None, "completeness_pct": round(n_non_null / n_total * 100, 2) if n_total > 0 else 0.0, } if pd.api.types.is_numeric_dtype(series): desc = series.describe() profile.update({ "min": round(float(desc["min"]), 4) if not pd.isna(desc["min"]) else None, "max": round(float(desc["max"]), 4) if not pd.isna(desc["max"]) else None, "mean": round(float(desc["mean"]), 4) if not pd.isna(desc["mean"]) else None, "median": round(float(series.median()), 4), "std": round(float(desc["std"]), 4) if not pd.isna(desc["std"]) else None, "skewness": round(float(series.skew()), 4), "kurtosis": round(float(series.kurt()), 4), "n_zeros": int((series == 0).sum()), "n_negative": int((series < 0).sum()), "sample_values": series.dropna().sample(min(5, n_non_null), random_state=42).tolist(), "top_categories": None, }) elif pd.api.types.is_datetime64_any_dtype(series): profile.update({ "min": str(series.min()), "max": str(series.max()), "mean": None, "median": None, "std": None, "skewness": None, "kurtosis": None, "n_zeros": None, "n_negative": None, "sample_values": [str(v) for v in series.dropna().sample(min(5, n_non_null), random_state=42).tolist()], "top_categories": None, }) else: # Categorical / object / string value_counts = series.value_counts(dropna=True) top_cats = value_counts.head(5).to_dict() profile.update({ "min": None, "max": None, "mean": None, "median": None, "std": None, "skewness": None, "kurtosis": None, "n_zeros": None, "n_negative": None, "sample_values": list(value_counts.head(5).index), "top_categories": {str(k): int(v) for k, v in top_cats.items()}, }) return profiledef profile_dataframe(df: pd.DataFrame) -> pd.DataFrame: """ Profile every column in a DataFrame. Returns: A DataFrame where each row is a column profile. """ profiles = [profile_column(df[col]) for col in df.columns] report = pd.DataFrame(profiles).set_index("column") return report# Generate profiling reportprofile_report = profile_dataframe(demo_orders)# Show high-level viewprint("Profiling Report:")print(profile_report[["dtype", "null_pct", "completeness_pct", "n_unique", "cardinality_ratio"]].to_string())# Columns with high null rates — flag for quality reviewhigh_null = profile_report[profile_report["null_pct"] > 5]print(f"\nColumns with >5% nulls: {list(high_null.index)}")# High cardinality columns — may indicate ID-like columns that shouldn't be categoricalhigh_card = profile_report[profile_report["cardinality_ratio"] > 0.5]print(f"High-cardinality columns: {list(high_card.index)}")
Detecting Schema Drift
In recurring analyses, data arrives from pipelines that can change without warning. Schema drift — unexpected column additions, removals, or type changes — breaks your code silently. Detect it explicitly.
python
from typing import NamedTupleclass SchemaDriftReport(NamedTuple): missing_columns: list[str] # Expected but not present extra_columns: list[str] # Present but not expected type_mismatches: dict[str, tuple[str, str]] # col → (expected, actual) is_clean: booldef check_schema_drift( df: pd.DataFrame, expected_schema: dict[str, str], # {column_name: expected_dtype_string}) -> SchemaDriftReport: """ Compare actual DataFrame schema against expected schema. Args: df: Loaded DataFrame. expected_schema: Dict of {col_name: dtype_string}, e.g. {"order_id": "int64", "order_date": "datetime64[ns]"} Returns: SchemaDriftReport with details of any drift detected. """ actual_schema = {col: str(dtype) for col, dtype in df.dtypes.items()} missing = [col for col in expected_schema if col not in actual_schema] extra = [col for col in actual_schema if col not in expected_schema] mismatches = {} for col in expected_schema: if col in actual_schema and actual_schema[col] != expected_schema[col]: mismatches[col] = (expected_schema[col], actual_schema[col]) is_clean = not missing and not extra and not mismatches return SchemaDriftReport( missing_columns=missing, extra_columns=extra, type_mismatches=mismatches, is_clean=is_clean, )# Define expected schema for the orders tableORDERS_EXPECTED_SCHEMA = { "order_id": "int64", "customer_id": "int64", "product_id": "object", "order_date": "datetime64[ns]", "revenue": "float64", "quantity": "int64", "status": "object", "channel": "object", "country": "object",}drift = check_schema_drift(demo_orders, ORDERS_EXPECTED_SCHEMA)if drift.is_clean: print("Schema check PASSED — no drift detected.")else: print("Schema check FAILED:") if drift.missing_columns: print(f" Missing columns: {drift.missing_columns}") if drift.extra_columns: print(f" Extra columns: {drift.extra_columns}") if drift.type_mismatches: print(" Type mismatches:") for col, (exp, act) in drift.type_mismatches.items(): print(f" {col}: expected {exp}, got {act}")
Memory Profiling and Downcasting
DataFrames often consume 3–10× more memory than necessary because pandas chooses conservative types by default. Downcasting is free performance.
python
import pandas as pdimport numpy as npdef downcast_dataframe(df: pd.DataFrame) -> pd.DataFrame: """ Downcast numeric columns to their smallest valid type. Convert low-cardinality string columns to category dtype. Returns a memory-optimised copy of the DataFrame. """ df = df.copy() before_mb = df.memory_usage(deep=True).sum() / 1e6 # Downcast integers int_cols = df.select_dtypes(include=["int64", "int32"]).columns for col in int_cols: df[col] = pd.to_numeric(df[col], downcast="integer") # Downcast floats float_cols = df.select_dtypes(include=["float64", "float32"]).columns for col in float_cols: df[col] = pd.to_numeric(df[col], downcast="float") # Convert low-cardinality object columns to category # Rule of thumb: unique values < 5% of total rows → good candidate obj_cols = df.select_dtypes(include="object").columns for col in obj_cols: n_unique = df[col].nunique(dropna=True) n_total = len(df) if n_unique / n_total < 0.05: df[col] = df[col].astype("category") after_mb = df.memory_usage(deep=True).sum() / 1e6 reduction_pct = (before_mb - after_mb) / before_mb * 100 print(f"Memory before: {before_mb:.1f} MB") print(f"Memory after: {after_mb:.1f} MB") print(f"Reduction: {reduction_pct:.1f}%") return df# Apply to demo datasetdemo_orders_opt = downcast_dataframe(demo_orders)print("\nDtype changes after optimisation:")for col in demo_orders.columns: before = str(demo_orders[col].dtype) after = str(demo_orders_opt[col].dtype) if before != after: print(f" {col:<20} {before} → {after}")
Saving the Profiling Output
Profiling results should be saved as a structured artifact, not just printed. This creates an audit trail and enables comparison across pipeline runs.
python
import jsonfrom pathlib import Pathfrom datetime import datetimeimport pandas as pddef save_profiling_report( df: pd.DataFrame, dataset_name: str, output_dir: str = "outputs/profiling",) -> Path: """ Generate, display, and save a profiling report for a DataFrame. Returns the path to the saved report. """ output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Generate profile profile_report = profile_dataframe(df) # Timestamp the file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_filename = f"{dataset_name}_profile_{timestamp}.csv" report_path = output_path / report_filename # Save the profile DataFrame as CSV (excluding nested dicts for simplicity) profile_csv = profile_report.drop(columns=["top_categories", "sample_values"], errors="ignore") profile_csv.to_csv(report_path) # Save metadata as JSON metadata = { "dataset": dataset_name, "generated_at": timestamp, "shape": {"rows": int(df.shape[0]), "cols": int(df.shape[1])}, "memory_mb": round(df.memory_usage(deep=True).sum() / 1e6, 2), "null_summary": { col: int(count) for col, count in df.isnull().sum().items() if count > 0 }, "dtypes": {col: str(dt) for col, dt in df.dtypes.items()}, } meta_path = output_path / f"{dataset_name}_meta_{timestamp}.json" with open(meta_path, "w") as f: json.dump(metadata, f, indent=2) print(f"Profile saved: {report_path}") print(f"Metadata saved: {meta_path}") return report_path# Save profile for the demo orders dataset# save_profiling_report(demo_orders, dataset_name="orders")
Key Takeaways
Use pd.read_csv with explicit dtype, parse_dates, na_values, and low_memory=False on every production load. Silent type inference is a source of subtle downstream errors.
For files that exceed RAM, use chunksize to load and filter in chunks before concatenating. Always push filtering to the source (SQL WHERE clause) when possible.
JSON normalization with pd.json_normalize handles nested structures correctly; always inspect the result for unexpected column explosion or row multiplication.
Paginated API loading requires an explicit loop with a safety maximum page count and rate-limit sleep. Encapsulate this in a reusable function.
The first_contact checklist — shape, memory, dtypes, nulls, duplicates, numeric describe, categorical top values, date ranges — must run on every new dataset before any transformation.
profile_dataframe produces a column-level audit trail. Save it as a CSV artifact so you can diff schema and quality changes across pipeline runs.
Schema drift detection — checking expected vs actual column names and types — should be a standard step in any recurring analysis or data pipeline. Silent schema changes are a leading cause of incorrect analysis.
Downcasting numerics and converting low-cardinality strings to category dtype typically reduces memory usage by 30–70% at zero analytical cost.