Python for Data Ops
A production-focused guide to Python DataOps: pandas vs Polars trade-offs, pyarrow I/O patterns, pipeline architecture, testing strategies, orchestration options (Airflow, Prefect, Dagster), structured logging, and the full deployment checklist.
Python for Data Ops
Python didn't win the data engineering war by being the fastest language—it won by being the most composable. A pandas DataFrame talks directly to a pyarrow buffer, which serializes to Parquet, which lands in S3, which gets queried by Spark, DuckDB, and Trino alike. No glue code, no format translation, no JVM bridge overhead. That composability is why Python sits at the center of every modern data stack, from ingestion to serving.
But "works in a notebook" is not the same as "runs reliably in production." This tutorial covers the full operational stack: the right libraries for the right workloads, pipeline architecture patterns, testing strategies, environment management, observability, orchestration, and the deployment practices that separate prototype pipelines from systems that survive on-call rotations.
The Modern Python Data Stack
Before writing code, understand the layers:
| Layer | Tool | When to Use | |-------|------|-------------| | In-memory analysis | pandas | ≤10M rows, exploratory, team familiarity | | High-performance dataframes | Polars | >10M rows, production ETL, CPU-bound transforms | | Columnar I/O | pyarrow | Reading/writing Parquet, Arrow IPC, zero-copy interchange | | Out-of-core / distributed | DuckDB | SQL-first, larger-than-RAM analytics, local dev | | Cluster compute | PySpark / Dask | Multi-node, petabyte-scale, existing Spark infra | | Pipeline orchestration | Airflow / Prefect / Dagster | Scheduling, dependencies, retries, observability | | Data quality | Great Expectations / Soda | Schema contracts, anomaly detection |
The common mistake is reaching for PySpark or Dask before exhausting single-node solutions. DuckDB on a 32-core machine processes terabytes faster than a poorly-tuned Spark cluster, with zero infrastructure to manage.
pandas: Strengths, Limits, and Where It Breaks
pandas is the right default for most analytical work. Its API is expressive, its ecosystem integration is unmatched, and every data scientist already knows it. But it has hard limits that matter in production:
Memory model: pandas copies data aggressively. A single groupby can produce 3–4× memory spikes. On a 16GB machine, this caps you around 4–5GB of input data.
Single-threaded by default: pandas operations use one CPU core. A server with 32 cores executes df.apply(func) on one of them.
String handling: pandas stores strings as Python objects (pointers to heap-allocated strings). This is 3–5× slower and larger than fixed-width or dictionary-encoded columnar strings.
import pandas as pd
import sys
# Demonstrating pandas memory overhead
df = pd.DataFrame({'name': ['Alice'] * 1_000_000})
print(sys.getsizeof(df) // 1024 // 1024, "MB") # ~130MB for 1M identical strings
# With categorical encoding (like columnar dict encoding)
df['name'] = df['name'].astype('category')
print(sys.getsizeof(df) // 1024 // 1024, "MB") # ~1MB — 130x reduction
Use category dtype for low-cardinality string columns. Use int32 instead of int64 when values fit. Use float32 for ML features. These micro-decisions compound at scale.
Polars: When to Switch
Polars is a Rust-native dataframe library with a lazy execution engine, multi-threaded by default, and an Arrow-native memory model. It's not a drop-in replacement for pandas—the API is different—but for production ETL it's consistently 5–20× faster on multi-core machines.
import polars as pl
# Lazy evaluation — Polars optimizes the query plan before executing
result = (
pl.scan_parquet("s3://data-lake/events/*.parquet")
.filter(pl.col("event_type") == "purchase")
.group_by("user_id")
.agg([
pl.col("amount").sum().alias("total_spent"),
pl.col("event_id").count().alias("purchase_count"),
pl.col("created_at").max().alias("last_purchase"),
])
.filter(pl.col("total_spent") > 100)
.collect() # Executes only here — predicate pushdown applied automatically
)
The .scan_parquet() + .collect() pattern pushes predicates into the Parquet reader — only rows matching event_type == "purchase" are read from disk. This is query-level optimization for free.
When to use Polars over pandas:
- •ETL pipelines processing >5M rows regularly
- •CPU-bound transformations (string parsing, regex, complex aggregations)
- •Production pipelines where latency matters
- •When you can afford the migration — Polars' API is stricter but more predictable
When to stick with pandas:
- •Existing large codebase with heavy pandas dependency
- •Team that isn't familiar with Polars yet
- •Heavy scikit-learn / statsmodels usage (these expect pandas DataFrames)
- •Quick exploratory analysis where performance isn't a concern
pyarrow for Production I/O
pyarrow is the fastest Python library for reading and writing columnar formats. It's also the shared memory layer that lets pandas, Polars, DuckDB, and Spark exchange data without serialization overhead.
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
# Reading with column selection and predicate pushdown
table = pq.read_table(
"events.parquet",
columns=["user_id", "event_type", "amount"],
filters=[
("event_type", "=", "purchase"),
("amount", ">", 0),
]
)
# Zero-copy conversion to pandas
df = table.to_pandas()
# Writing with optimal settings for analytics
pq.write_table(
table,
"output.parquet",
compression="zstd",
compression_level=3,
row_group_size=128 * 1024, # 128MB row groups for good read performance
use_dictionary=True, # Dictionary encoding for low-cardinality columns
write_statistics=True, # Column stats enable predicate pushdown
)
For datasets written once and read many times (typical in data lakes), invest in write-time optimizations: ZSTD compression, statistics, appropriate row group sizing. These don't add write latency and dramatically reduce read costs.
Production Pipeline Architecture
The biggest architectural decision in DataOps isn't which library to use—it's how to structure data flow so pipelines are safe to re-run, easy to test, and diagnosable when they fail.
The Three Non-Negotiable Properties
1. Idempotency — Running the pipeline twice produces the same result. This means:
# Bad: appends on every run
def write_daily_summary(df, date):
df.to_parquet(f"s3://bucket/summary/{date}/output.parquet", mode="append")
# Good: overwrites partition atomically
def write_daily_summary(df, date):
output_path = f"s3://bucket/summary/date={date}/output.parquet"
# Write to temp first, then atomic rename
temp_path = f"{output_path}.tmp"
df.to_parquet(temp_path)
s3.rename(temp_path, output_path) # Atomic on most object stores
2. Parameterization — Every pipeline run takes an explicit date/run ID. No implicit "today":
# Bad: uses datetime.now() — different output on each run
def extract_events():
date = datetime.now().strftime("%Y-%m-%d")
return read_events(date)
# Good: explicit parameter — backfill is free
def extract_events(date: str) -> pd.DataFrame:
return read_events(date)
3. Fail loudly — Corruption is worse than failure. Never silently continue on bad data:
def transform_revenue(df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
result["revenue_usd"] = result["amount"] / result["exchange_rate"]
# Assert invariants — fail the pipeline if violated
assert result["revenue_usd"].min() >= 0, f"Negative revenue: {result['revenue_usd'].min()}"
assert not result["revenue_usd"].isna().any(), "Null revenue detected after transform"
return result
Error Handling That Distinguishes Failure Modes
Not all errors are the same. Your handling should reflect the severity:
from typing import Optional
import logging
logger = logging.getLogger(__name__)
def extract_partition(source: str, date: str) -> Optional[pd.DataFrame]:
"""Returns None for missing data, raises for corrupted data."""
try:
df = pd.read_parquet(f"{source}/date={date}/")
_validate_schema(df, expected_schema=EVENTS_SCHEMA)
return df
except FileNotFoundError:
# Recoverable — upstream may be delayed. Log and continue.
logger.warning("partition_missing", extra={"source": source, "date": date})
return None
except SchemaError as e:
# Not recoverable — schema change requires human decision.
logger.critical("schema_mismatch", extra={"source": source, "error": str(e)})
raise # Stops the pipeline. Alerts fire. Human investigates.
except pa.ArrowInvalid as e:
# Corrupted data — do not propagate downstream.
logger.error("corrupt_parquet", extra={"source": source, "date": date, "error": str(e)})
raise
except Exception as e:
# Unknown — treat as critical until proven otherwise.
logger.exception("unexpected_error", extra={"source": source})
raise
Rule of thumb: if a downstream consumer would produce wrong answers on this data, raise. If the pipeline can skip and remain correct, return a sentinel.
Retry Logic for Transient Failures
Network errors, rate limits, and object store throttling are transient. Use exponential backoff:
import time
from functools import wraps
def retry(max_attempts=4, base_delay=2.0, exceptions=(Exception,)):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
logger.warning(f"Retry {attempt + 1}/{max_attempts} after {delay}s: {e}")
time.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=4, exceptions=(IOError, TimeoutError))
def read_from_s3(path: str) -> pd.DataFrame:
return pd.read_parquet(f"s3://{path}")
Testing Data Pipelines
Data pipeline tests are different from application tests. You're validating both logic correctness and data contract assumptions.
Layer 1: Unit Tests — Transform Logic
Test pure transformation functions with synthetic, minimal DataFrames:
import pytest
import pandas as pd
from my_pipeline.transforms import normalize_revenue, deduplicate_events
def test_normalize_revenue_converts_to_usd():
input_df = pd.DataFrame({
"amount": [100.0, 200.0],
"currency": ["EUR", "GBP"],
"exchange_rate": [1.08, 1.27],
})
result = normalize_revenue(input_df)
assert "revenue_usd" in result.columns
assert result["revenue_usd"].iloc[0] == pytest.approx(108.0)
def test_deduplicate_keeps_latest_event():
input_df = pd.DataFrame({
"event_id": [1, 1, 2],
"timestamp": ["2024-01-01 10:00", "2024-01-01 10:05", "2024-01-01 11:00"],
})
result = deduplicate_events(input_df)
assert len(result) == 2
assert result.loc[result["event_id"] == 1, "timestamp"].iloc[0] == "2024-01-01 10:05"
These run in milliseconds. Run them on every commit.
Layer 2: Integration Tests — Full Pipeline Flow
Test full pipeline runs with realistic fixture data:
@pytest.fixture
def sample_events(tmp_path):
"""Write realistic sample data to a temp Parquet file."""
df = pd.read_parquet("tests/fixtures/sample_events_1000rows.parquet")
out = tmp_path / "events.parquet"
df.to_parquet(out)
return str(out)
@pytest.mark.integration
def test_daily_pipeline_produces_valid_output(sample_events, tmp_path):
output_path = str(tmp_path / "output.parquet")
run_daily_pipeline(
input_path=sample_events,
output_path=output_path,
date="2024-01-15",
)
result = pd.read_parquet(output_path)
assert not result.empty
assert set(result.columns) == {"date", "user_id", "total_revenue", "event_count"}
assert result["total_revenue"].min() >= 0
assert result["date"].unique().tolist() == ["2024-01-15"]
Keep sample fixtures small (1K–10K rows). Large fixture files slow CI and signal a test design problem.
Layer 3: Data Quality — Production Assertions
Run these inline during production runs:
def assert_output_quality(df: pd.DataFrame, date: str) -> None:
"""Called after every transform in production. Fails fast before writing."""
errors = []
if df.empty:
errors.append("Output is empty")
if df["revenue_usd"].isna().any():
errors.append(f"Null revenue in {df['revenue_usd'].isna().sum()} rows")
if df["revenue_usd"].min() < 0:
errors.append(f"Negative revenue: min={df['revenue_usd'].min()}")
if df["date"].nunique() != 1 or df["date"].iloc[0] != date:
errors.append(f"Wrong date partition: found {df['date'].unique()}")
if df.duplicated(subset=["user_id", "date"]).any():
errors.append(f"Duplicate user/date keys: {df.duplicated(subset=['user_id', 'date']).sum()} rows")
if errors:
raise DataQualityError(f"Quality check failed for {date}:\n" + "\n".join(f" - {e}" for e in errors))
Dependency and Environment Management
Reproducibility is not optional in production. If your pipeline runs differently on your laptop vs CI vs prod, you don't have a pipeline — you have a script.
pyproject.toml + uv (Modern Standard)
[project]
name = "data-pipeline"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
"pandas>=2.2,<3.0",
"polars>=0.20",
"pyarrow>=16.0",
"pydantic-settings>=2.0",
"structlog>=24.0",
"boto3>=1.34",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-mock>=3.12",
"ruff>=0.4",
"mypy>=1.10",
]
Use uv to manage environments — it's 10–100× faster than pip for resolution and installation:
# Create virtual environment and install deps
uv venv
uv pip sync requirements.txt # Lock file — exact versions, with hashes
# Generate lock file from pyproject.toml
uv pip compile pyproject.toml -o requirements.txt
The lock file goes in version control. Never deploy without one. A pipeline that installed fine last week might break today if an upstream package released a breaking change.
Docker for Full Reproducibility
For production deployments, Docker eliminates "works on my machine":
FROM python:3.11-slim
WORKDIR /app
# Install deps first (cached layer, rebuilds only when deps change)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy source after deps (avoids re-installing on code changes)
COPY src/ ./src/
ENTRYPOINT ["python", "-m", "pipeline.main"]
Configuration Management
Hard-coded paths, credentials, and thresholds are technical debt that bites when you least expect it. Use pydantic-settings for type-safe config:
from pydantic_settings import BaseSettings
from pydantic import Field
class PipelineConfig(BaseSettings):
# Required — will raise if not set
data_lake_bucket: str
warehouse_connection_string: str
# Optional with defaults
batch_size: int = Field(default=50_000, gt=0, le=500_000)
max_retries: int = Field(default=4, ge=1)
compression: str = Field(default="zstd")
model_config = {
"env_prefix": "PIPELINE_",
"env_file": ".env",
"env_file_encoding": "utf-8",
}
# Usage
config = PipelineConfig()
df.to_parquet(
f"s3://{config.data_lake_bucket}/output/",
compression=config.compression,
)
Set PIPELINE_DATA_LAKE_BUCKET=dev-bucket in dev, =prod-bucket in prod. Same code, no branches.
Structured Logging for Operators
Your logs are for the person debugging at 2am. Make them queryable.
import structlog
log = structlog.get_logger()
def process_partition(date: str, source: str) -> dict:
log = structlog.get_logger().bind(date=date, source=source, pipeline="daily_revenue")
log.info("extract_start")
df = extract(source, date)
log.info("extract_complete", rows=len(df), size_mb=round(df.memory_usage(deep=True).sum() / 1e6, 1))
result = transform(df)
log.info("transform_complete", output_rows=len(result))
write(result, date)
log.info("partition_written", path=f"s3://bucket/date={date}/")
return {"rows": len(result), "date": date}
With structured logging (JSON output), you can query:
- •
jq 'select(.event == "extract_complete" and .rows < 1000)'— detect suspiciously small extracts - •
jq 'select(.pipeline == "daily_revenue") | .size_mb'— track input volume over time
This is Splunk/Datadog/CloudWatch ready without any extra work.
Orchestration Landscape
Once you have multiple pipelines with inter-dependencies, you need an orchestrator. The three main options each have distinct philosophies:
Apache Airflow — The Industry Standard
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule="0 6 * * *", # Daily at 6am UTC
start_date=datetime(2024, 1, 1),
catchup=True, # Run for missed historical dates (backfill support)
default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
)
def daily_revenue_pipeline():
@task
def extract(date: str) -> str:
df = read_events(date)
path = write_staging(df, date)
return path
@task
def transform(staging_path: str) -> str:
df = pd.read_parquet(staging_path)
result = normalize_revenue(df)
return write_transformed(result)
@task
def load(transformed_path: str):
df = pd.read_parquet(transformed_path)
write_to_warehouse(df)
load(transform(extract("{{ ds }}"))) # {{ ds }} = execution date
daily_revenue_pipeline()
Use Airflow when: you have an existing Airflow deployment, need batch scheduling, or your organization is already standardized on it.
Prefect — Developer-Friendly
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_events(date: str) -> pd.DataFrame:
return read_events(date)
@task(retries=3, retry_delay_seconds=30)
def transform_revenue(df: pd.DataFrame) -> pd.DataFrame:
return normalize_revenue(df)
@flow(name="daily-revenue", log_prints=True)
def daily_revenue(date: str):
raw = extract_events(date)
transformed = transform_revenue(raw)
write_to_warehouse(transformed)
# Trigger manually or via Prefect Cloud schedules
daily_revenue(date="2024-01-15")
Use Prefect when: you want simpler local development, built-in caching, and a less config-heavy setup than Airflow.
Dagster — Data-Asset Native
Dagster models pipelines as data assets rather than tasks — closer to how data engineers actually think:
from dagster import asset, AssetIn
@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def raw_events(context) -> pd.DataFrame:
date = context.partition_key
return read_events(date)
@asset(ins={"raw_events": AssetIn()})
def daily_revenue(raw_events: pd.DataFrame) -> pd.DataFrame:
return normalize_revenue(raw_events)
Use Dagster when: you want lineage tracking, data-catalog integration, and a modern developer experience. Best choice for new greenfield projects.
Performance Profiling — Measure Before Optimizing
Python pipelines are almost always I/O-bound, not CPU-bound. Confirm this before optimizing:
import cProfile
import pstats
with cProfile.Profile() as pr:
run_pipeline(date="2024-01-15")
stats = pstats.Stats(pr)
stats.sort_stats("cumulative")
stats.print_stats(20) # Top 20 bottlenecks
For line-level profiling:
pip install line-profiler
kernprof -l -v pipeline.py
For sampling-based profiling in production (low overhead):
pip install py-spy
py-spy top --pid $(pgrep -f "pipeline.py")
Common findings and fixes:
| Bottleneck | Fix |
|---|---|
| Slow Parquet reads | Add column filters, predicate pushdown |
| df.apply(func, axis=1) | Vectorize with numpy/pandas ops |
| Repeated small writes | Batch into chunks ≥ 10MB |
| String operations on object dtype | Cast to category or use Polars |
| pandas merge on large frames | Use pyarrow hash joins or Polars |
| to_sql() row-by-row inserts | Use COPY / bulk insert |
The Production Deployment Checklist
Before merging to main:
Code Quality
- •[ ] All functions have type hints (
mypy --strictpasses) - •[ ] Ruff/black formatting passes
- •[ ] No
datetime.now()without explicit timezone
Testing
- •[ ] Unit tests cover happy path, edge cases, and error paths
- •[ ] Integration test passes with fixture data
- •[ ] Data quality assertions cover all output columns
Reliability
- •[ ] Pipeline is idempotent (safe to re-run)
- •[ ] Date is a parameter, not hardcoded
- •[ ] Retry logic on external I/O calls
- •[ ] Failure modes raise vs return handled appropriately
Operations
- •[ ] Structured logging at extract, transform, and load
- •[ ] Dependencies pinned in lock file
- •[ ] Config externalized from code
- •[ ] Backfill procedure documented (can you run 90 days?)
Data
- •[ ] Output schema documented
- •[ ] Quality assertions inline
- •[ ] Output partition overwritten atomically
Conclusion: The Boring Pipeline Wins
The highest-performing data teams I've worked with don't use the most sophisticated tools—they use the right tools with ruthless consistency. pandas where it's fast enough, Polars where performance matters, pyarrow for I/O, DuckDB for ad-hoc SQL over files. Dagster or Prefect for orchestration. Docker for isolation. uv for speed.
The pipelines that run without incident for years aren't the most clever. They're the most boring: explicit dates, idempotent writes, structured logs, inline assertions, pinned dependencies. Every deviation from these principles is future on-call debt.
Write pipelines you'd be comfortable debugging at 3am. That's the real production standard.