GC
GauravChakraborty
Back to Writing

Python for Data Ops

A production-focused guide to Python DataOps: pandas vs Polars trade-offs, pyarrow I/O patterns, pipeline architecture, testing strategies, orchestration options (Airflow, Prefect, Dagster), structured logging, and the full deployment checklist.

15 min read
PythonDataOpsPolarspandaspyarrowETLData EngineeringOrchestration

Python for Data Ops

Python didn't win the data engineering war by being the fastest language—it won by being the most composable. A pandas DataFrame talks directly to a pyarrow buffer, which serializes to Parquet, which lands in S3, which gets queried by Spark, DuckDB, and Trino alike. No glue code, no format translation, no JVM bridge overhead. That composability is why Python sits at the center of every modern data stack, from ingestion to serving.

But "works in a notebook" is not the same as "runs reliably in production." This tutorial covers the full operational stack: the right libraries for the right workloads, pipeline architecture patterns, testing strategies, environment management, observability, orchestration, and the deployment practices that separate prototype pipelines from systems that survive on-call rotations.


The Modern Python Data Stack

Before writing code, understand the layers:

| Layer | Tool | When to Use | |-------|------|-------------| | In-memory analysis | pandas | ≤10M rows, exploratory, team familiarity | | High-performance dataframes | Polars | >10M rows, production ETL, CPU-bound transforms | | Columnar I/O | pyarrow | Reading/writing Parquet, Arrow IPC, zero-copy interchange | | Out-of-core / distributed | DuckDB | SQL-first, larger-than-RAM analytics, local dev | | Cluster compute | PySpark / Dask | Multi-node, petabyte-scale, existing Spark infra | | Pipeline orchestration | Airflow / Prefect / Dagster | Scheduling, dependencies, retries, observability | | Data quality | Great Expectations / Soda | Schema contracts, anomaly detection |

The common mistake is reaching for PySpark or Dask before exhausting single-node solutions. DuckDB on a 32-core machine processes terabytes faster than a poorly-tuned Spark cluster, with zero infrastructure to manage.


pandas: Strengths, Limits, and Where It Breaks

pandas is the right default for most analytical work. Its API is expressive, its ecosystem integration is unmatched, and every data scientist already knows it. But it has hard limits that matter in production:

Memory model: pandas copies data aggressively. A single groupby can produce 3–4× memory spikes. On a 16GB machine, this caps you around 4–5GB of input data.

Single-threaded by default: pandas operations use one CPU core. A server with 32 cores executes df.apply(func) on one of them.

String handling: pandas stores strings as Python objects (pointers to heap-allocated strings). This is 3–5× slower and larger than fixed-width or dictionary-encoded columnar strings.

import pandas as pd
import sys

# Demonstrating pandas memory overhead
df = pd.DataFrame({'name': ['Alice'] * 1_000_000})
print(sys.getsizeof(df) // 1024 // 1024, "MB")  # ~130MB for 1M identical strings

# With categorical encoding (like columnar dict encoding)
df['name'] = df['name'].astype('category')
print(sys.getsizeof(df) // 1024 // 1024, "MB")  # ~1MB — 130x reduction

Use category dtype for low-cardinality string columns. Use int32 instead of int64 when values fit. Use float32 for ML features. These micro-decisions compound at scale.


Polars: When to Switch

Polars is a Rust-native dataframe library with a lazy execution engine, multi-threaded by default, and an Arrow-native memory model. It's not a drop-in replacement for pandas—the API is different—but for production ETL it's consistently 5–20× faster on multi-core machines.

import polars as pl

# Lazy evaluation — Polars optimizes the query plan before executing
result = (
    pl.scan_parquet("s3://data-lake/events/*.parquet")
    .filter(pl.col("event_type") == "purchase")
    .group_by("user_id")
    .agg([
        pl.col("amount").sum().alias("total_spent"),
        pl.col("event_id").count().alias("purchase_count"),
        pl.col("created_at").max().alias("last_purchase"),
    ])
    .filter(pl.col("total_spent") > 100)
    .collect()  # Executes only here — predicate pushdown applied automatically
)

The .scan_parquet() + .collect() pattern pushes predicates into the Parquet reader — only rows matching event_type == "purchase" are read from disk. This is query-level optimization for free.

When to use Polars over pandas:

  • ETL pipelines processing >5M rows regularly
  • CPU-bound transformations (string parsing, regex, complex aggregations)
  • Production pipelines where latency matters
  • When you can afford the migration — Polars' API is stricter but more predictable

When to stick with pandas:

  • Existing large codebase with heavy pandas dependency
  • Team that isn't familiar with Polars yet
  • Heavy scikit-learn / statsmodels usage (these expect pandas DataFrames)
  • Quick exploratory analysis where performance isn't a concern

pyarrow for Production I/O

pyarrow is the fastest Python library for reading and writing columnar formats. It's also the shared memory layer that lets pandas, Polars, DuckDB, and Spark exchange data without serialization overhead.

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc

# Reading with column selection and predicate pushdown
table = pq.read_table(
    "events.parquet",
    columns=["user_id", "event_type", "amount"],
    filters=[
        ("event_type", "=", "purchase"),
        ("amount", ">", 0),
    ]
)

# Zero-copy conversion to pandas
df = table.to_pandas()

# Writing with optimal settings for analytics
pq.write_table(
    table,
    "output.parquet",
    compression="zstd",
    compression_level=3,
    row_group_size=128 * 1024,  # 128MB row groups for good read performance
    use_dictionary=True,         # Dictionary encoding for low-cardinality columns
    write_statistics=True,       # Column stats enable predicate pushdown
)

For datasets written once and read many times (typical in data lakes), invest in write-time optimizations: ZSTD compression, statistics, appropriate row group sizing. These don't add write latency and dramatically reduce read costs.


Production Pipeline Architecture

The biggest architectural decision in DataOps isn't which library to use—it's how to structure data flow so pipelines are safe to re-run, easy to test, and diagnosable when they fail.

The Three Non-Negotiable Properties

1. Idempotency — Running the pipeline twice produces the same result. This means:

# Bad: appends on every run
def write_daily_summary(df, date):
    df.to_parquet(f"s3://bucket/summary/{date}/output.parquet", mode="append")

# Good: overwrites partition atomically
def write_daily_summary(df, date):
    output_path = f"s3://bucket/summary/date={date}/output.parquet"
    
    # Write to temp first, then atomic rename
    temp_path = f"{output_path}.tmp"
    df.to_parquet(temp_path)
    s3.rename(temp_path, output_path)  # Atomic on most object stores

2. Parameterization — Every pipeline run takes an explicit date/run ID. No implicit "today":

# Bad: uses datetime.now() — different output on each run
def extract_events():
    date = datetime.now().strftime("%Y-%m-%d")
    return read_events(date)

# Good: explicit parameter — backfill is free
def extract_events(date: str) -> pd.DataFrame:
    return read_events(date)

3. Fail loudly — Corruption is worse than failure. Never silently continue on bad data:

def transform_revenue(df: pd.DataFrame) -> pd.DataFrame:
    result = df.copy()
    result["revenue_usd"] = result["amount"] / result["exchange_rate"]
    
    # Assert invariants — fail the pipeline if violated
    assert result["revenue_usd"].min() >= 0, f"Negative revenue: {result['revenue_usd'].min()}"
    assert not result["revenue_usd"].isna().any(), "Null revenue detected after transform"
    
    return result

Error Handling That Distinguishes Failure Modes

Not all errors are the same. Your handling should reflect the severity:

from typing import Optional
import logging

logger = logging.getLogger(__name__)

def extract_partition(source: str, date: str) -> Optional[pd.DataFrame]:
    """Returns None for missing data, raises for corrupted data."""
    try:
        df = pd.read_parquet(f"{source}/date={date}/")
        _validate_schema(df, expected_schema=EVENTS_SCHEMA)
        return df

    except FileNotFoundError:
        # Recoverable — upstream may be delayed. Log and continue.
        logger.warning("partition_missing", extra={"source": source, "date": date})
        return None

    except SchemaError as e:
        # Not recoverable — schema change requires human decision.
        logger.critical("schema_mismatch", extra={"source": source, "error": str(e)})
        raise  # Stops the pipeline. Alerts fire. Human investigates.

    except pa.ArrowInvalid as e:
        # Corrupted data — do not propagate downstream.
        logger.error("corrupt_parquet", extra={"source": source, "date": date, "error": str(e)})
        raise

    except Exception as e:
        # Unknown — treat as critical until proven otherwise.
        logger.exception("unexpected_error", extra={"source": source})
        raise

Rule of thumb: if a downstream consumer would produce wrong answers on this data, raise. If the pipeline can skip and remain correct, return a sentinel.

Retry Logic for Transient Failures

Network errors, rate limits, and object store throttling are transient. Use exponential backoff:

import time
from functools import wraps

def retry(max_attempts=4, base_delay=2.0, exceptions=(Exception,)):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    logger.warning(f"Retry {attempt + 1}/{max_attempts} after {delay}s: {e}")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry(max_attempts=4, exceptions=(IOError, TimeoutError))
def read_from_s3(path: str) -> pd.DataFrame:
    return pd.read_parquet(f"s3://{path}")

Testing Data Pipelines

Data pipeline tests are different from application tests. You're validating both logic correctness and data contract assumptions.

Layer 1: Unit Tests — Transform Logic

Test pure transformation functions with synthetic, minimal DataFrames:

import pytest
import pandas as pd
from my_pipeline.transforms import normalize_revenue, deduplicate_events

def test_normalize_revenue_converts_to_usd():
    input_df = pd.DataFrame({
        "amount": [100.0, 200.0],
        "currency": ["EUR", "GBP"],
        "exchange_rate": [1.08, 1.27],
    })
    result = normalize_revenue(input_df)
    assert "revenue_usd" in result.columns
    assert result["revenue_usd"].iloc[0] == pytest.approx(108.0)

def test_deduplicate_keeps_latest_event():
    input_df = pd.DataFrame({
        "event_id": [1, 1, 2],
        "timestamp": ["2024-01-01 10:00", "2024-01-01 10:05", "2024-01-01 11:00"],
    })
    result = deduplicate_events(input_df)
    assert len(result) == 2
    assert result.loc[result["event_id"] == 1, "timestamp"].iloc[0] == "2024-01-01 10:05"

These run in milliseconds. Run them on every commit.

Layer 2: Integration Tests — Full Pipeline Flow

Test full pipeline runs with realistic fixture data:

@pytest.fixture
def sample_events(tmp_path):
    """Write realistic sample data to a temp Parquet file."""
    df = pd.read_parquet("tests/fixtures/sample_events_1000rows.parquet")
    out = tmp_path / "events.parquet"
    df.to_parquet(out)
    return str(out)

@pytest.mark.integration
def test_daily_pipeline_produces_valid_output(sample_events, tmp_path):
    output_path = str(tmp_path / "output.parquet")
    
    run_daily_pipeline(
        input_path=sample_events,
        output_path=output_path,
        date="2024-01-15",
    )
    
    result = pd.read_parquet(output_path)
    assert not result.empty
    assert set(result.columns) == {"date", "user_id", "total_revenue", "event_count"}
    assert result["total_revenue"].min() >= 0
    assert result["date"].unique().tolist() == ["2024-01-15"]

Keep sample fixtures small (1K–10K rows). Large fixture files slow CI and signal a test design problem.

Layer 3: Data Quality — Production Assertions

Run these inline during production runs:

def assert_output_quality(df: pd.DataFrame, date: str) -> None:
    """Called after every transform in production. Fails fast before writing."""
    errors = []
    
    if df.empty:
        errors.append("Output is empty")
    if df["revenue_usd"].isna().any():
        errors.append(f"Null revenue in {df['revenue_usd'].isna().sum()} rows")
    if df["revenue_usd"].min() < 0:
        errors.append(f"Negative revenue: min={df['revenue_usd'].min()}")
    if df["date"].nunique() != 1 or df["date"].iloc[0] != date:
        errors.append(f"Wrong date partition: found {df['date'].unique()}")
    if df.duplicated(subset=["user_id", "date"]).any():
        errors.append(f"Duplicate user/date keys: {df.duplicated(subset=['user_id', 'date']).sum()} rows")
    
    if errors:
        raise DataQualityError(f"Quality check failed for {date}:\n" + "\n".join(f"  - {e}" for e in errors))

Dependency and Environment Management

Reproducibility is not optional in production. If your pipeline runs differently on your laptop vs CI vs prod, you don't have a pipeline — you have a script.

pyproject.toml + uv (Modern Standard)

[project]
name = "data-pipeline"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
    "pandas>=2.2,<3.0",
    "polars>=0.20",
    "pyarrow>=16.0",
    "pydantic-settings>=2.0",
    "structlog>=24.0",
    "boto3>=1.34",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "pytest-mock>=3.12",
    "ruff>=0.4",
    "mypy>=1.10",
]

Use uv to manage environments — it's 10–100× faster than pip for resolution and installation:

# Create virtual environment and install deps
uv venv
uv pip sync requirements.txt  # Lock file — exact versions, with hashes

# Generate lock file from pyproject.toml
uv pip compile pyproject.toml -o requirements.txt

The lock file goes in version control. Never deploy without one. A pipeline that installed fine last week might break today if an upstream package released a breaking change.

Docker for Full Reproducibility

For production deployments, Docker eliminates "works on my machine":

FROM python:3.11-slim

WORKDIR /app

# Install deps first (cached layer, rebuilds only when deps change)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy source after deps (avoids re-installing on code changes)
COPY src/ ./src/

ENTRYPOINT ["python", "-m", "pipeline.main"]

Configuration Management

Hard-coded paths, credentials, and thresholds are technical debt that bites when you least expect it. Use pydantic-settings for type-safe config:

from pydantic_settings import BaseSettings
from pydantic import Field

class PipelineConfig(BaseSettings):
    # Required — will raise if not set
    data_lake_bucket: str
    warehouse_connection_string: str
    
    # Optional with defaults
    batch_size: int = Field(default=50_000, gt=0, le=500_000)
    max_retries: int = Field(default=4, ge=1)
    compression: str = Field(default="zstd")
    
    model_config = {
        "env_prefix": "PIPELINE_",
        "env_file": ".env",
        "env_file_encoding": "utf-8",
    }

# Usage
config = PipelineConfig()
df.to_parquet(
    f"s3://{config.data_lake_bucket}/output/",
    compression=config.compression,
)

Set PIPELINE_DATA_LAKE_BUCKET=dev-bucket in dev, =prod-bucket in prod. Same code, no branches.


Structured Logging for Operators

Your logs are for the person debugging at 2am. Make them queryable.

import structlog

log = structlog.get_logger()

def process_partition(date: str, source: str) -> dict:
    log = structlog.get_logger().bind(date=date, source=source, pipeline="daily_revenue")
    
    log.info("extract_start")
    
    df = extract(source, date)
    log.info("extract_complete", rows=len(df), size_mb=round(df.memory_usage(deep=True).sum() / 1e6, 1))
    
    result = transform(df)
    log.info("transform_complete", output_rows=len(result))
    
    write(result, date)
    log.info("partition_written", path=f"s3://bucket/date={date}/")
    
    return {"rows": len(result), "date": date}

With structured logging (JSON output), you can query:

  • jq 'select(.event == "extract_complete" and .rows < 1000)' — detect suspiciously small extracts
  • jq 'select(.pipeline == "daily_revenue") | .size_mb' — track input volume over time

This is Splunk/Datadog/CloudWatch ready without any extra work.


Orchestration Landscape

Once you have multiple pipelines with inter-dependencies, you need an orchestrator. The three main options each have distinct philosophies:

Apache Airflow — The Industry Standard

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    schedule="0 6 * * *",  # Daily at 6am UTC
    start_date=datetime(2024, 1, 1),
    catchup=True,  # Run for missed historical dates (backfill support)
    default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
)
def daily_revenue_pipeline():
    @task
    def extract(date: str) -> str:
        df = read_events(date)
        path = write_staging(df, date)
        return path

    @task
    def transform(staging_path: str) -> str:
        df = pd.read_parquet(staging_path)
        result = normalize_revenue(df)
        return write_transformed(result)

    @task
    def load(transformed_path: str):
        df = pd.read_parquet(transformed_path)
        write_to_warehouse(df)

    load(transform(extract("{{ ds }}")))  # {{ ds }} = execution date

daily_revenue_pipeline()

Use Airflow when: you have an existing Airflow deployment, need batch scheduling, or your organization is already standardized on it.

Prefect — Developer-Friendly

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_events(date: str) -> pd.DataFrame:
    return read_events(date)

@task(retries=3, retry_delay_seconds=30)
def transform_revenue(df: pd.DataFrame) -> pd.DataFrame:
    return normalize_revenue(df)

@flow(name="daily-revenue", log_prints=True)
def daily_revenue(date: str):
    raw = extract_events(date)
    transformed = transform_revenue(raw)
    write_to_warehouse(transformed)

# Trigger manually or via Prefect Cloud schedules
daily_revenue(date="2024-01-15")

Use Prefect when: you want simpler local development, built-in caching, and a less config-heavy setup than Airflow.

Dagster — Data-Asset Native

Dagster models pipelines as data assets rather than tasks — closer to how data engineers actually think:

from dagster import asset, AssetIn

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def raw_events(context) -> pd.DataFrame:
    date = context.partition_key
    return read_events(date)

@asset(ins={"raw_events": AssetIn()})
def daily_revenue(raw_events: pd.DataFrame) -> pd.DataFrame:
    return normalize_revenue(raw_events)

Use Dagster when: you want lineage tracking, data-catalog integration, and a modern developer experience. Best choice for new greenfield projects.


Performance Profiling — Measure Before Optimizing

Python pipelines are almost always I/O-bound, not CPU-bound. Confirm this before optimizing:

import cProfile
import pstats

with cProfile.Profile() as pr:
    run_pipeline(date="2024-01-15")

stats = pstats.Stats(pr)
stats.sort_stats("cumulative")
stats.print_stats(20)  # Top 20 bottlenecks

For line-level profiling:

pip install line-profiler
kernprof -l -v pipeline.py

For sampling-based profiling in production (low overhead):

pip install py-spy
py-spy top --pid $(pgrep -f "pipeline.py")

Common findings and fixes:

| Bottleneck | Fix | |---|---| | Slow Parquet reads | Add column filters, predicate pushdown | | df.apply(func, axis=1) | Vectorize with numpy/pandas ops | | Repeated small writes | Batch into chunks ≥ 10MB | | String operations on object dtype | Cast to category or use Polars | | pandas merge on large frames | Use pyarrow hash joins or Polars | | to_sql() row-by-row inserts | Use COPY / bulk insert |


The Production Deployment Checklist

Before merging to main:

Code Quality

  • [ ] All functions have type hints (mypy --strict passes)
  • [ ] Ruff/black formatting passes
  • [ ] No datetime.now() without explicit timezone

Testing

  • [ ] Unit tests cover happy path, edge cases, and error paths
  • [ ] Integration test passes with fixture data
  • [ ] Data quality assertions cover all output columns

Reliability

  • [ ] Pipeline is idempotent (safe to re-run)
  • [ ] Date is a parameter, not hardcoded
  • [ ] Retry logic on external I/O calls
  • [ ] Failure modes raise vs return handled appropriately

Operations

  • [ ] Structured logging at extract, transform, and load
  • [ ] Dependencies pinned in lock file
  • [ ] Config externalized from code
  • [ ] Backfill procedure documented (can you run 90 days?)

Data

  • [ ] Output schema documented
  • [ ] Quality assertions inline
  • [ ] Output partition overwritten atomically

Conclusion: The Boring Pipeline Wins

The highest-performing data teams I've worked with don't use the most sophisticated tools—they use the right tools with ruthless consistency. pandas where it's fast enough, Polars where performance matters, pyarrow for I/O, DuckDB for ad-hoc SQL over files. Dagster or Prefect for orchestration. Docker for isolation. uv for speed.

The pipelines that run without incident for years aren't the most clever. They're the most boring: explicit dates, idempotent writes, structured logs, inline assertions, pinned dependencies. Every deviation from these principles is future on-call debt.

Write pipelines you'd be comfortable debugging at 3am. That's the real production standard.