Skip to content

Polars Patterns

Best practices for writing efficient Polars code in Dagster pipelines.

Lazy Evaluation

Always prefer lazy operations and collect once at the end:

# GOOD - Lazy operations, collect once
result = (
    pl.scan_parquet(path)
    .filter(pl.col("price") > 1000)
    .select(["id", "price", "date"])
    .sort("date")
    .collect()  # Execute once
)

# BAD - Eager operations
df = pl.read_parquet(path)  # Loads all data
df = df.filter(pl.col("price") > 1000)
df = df.select(["id", "price", "date"])

Consolidate with_columns Chains

Multiple expressions in a single with_columns run in parallel:

# GOOD - Parallel execution
result = df.with_columns(
    (pl.col("price") * 0.1).alias("tax"),
    pl.col("name").str.to_uppercase(),
    pl.col("date").dt.year().alias("year"),
)

# BAD - Sequential execution
result = df.with_columns((pl.col("price") * 0.1).alias("tax"))
result = result.with_columns(pl.col("name").str.to_uppercase())
result = result.with_columns(pl.col("date").dt.year().alias("year"))

When to split

Only split with_columns when later expressions depend on earlier ones:

result = (
    df.with_columns(pl.col("a").fill_null(0))  # First: fill nulls
    .with_columns((pl.col("a") > 0).alias("has_a"))  # Then: use filled value
)

Sort Inside Aggregations

sort() before group_by() doesn't guarantee order within groups:

# GOOD - Sort inside aggregation
result = df.group_by("category").agg(
    pl.struct("id", "value", "date")
      .sort_by("date")
      .alias("items_by_date")
)

# BAD - Sort before group_by
result = df.sort("date").group_by("category").agg(
    pl.struct("id", "value", "date").alias("items_by_date")
)

Semi-Joins Over is_in()

Semi-joins stay lazy and avoid early materialization:

# GOOD - Semi-join stays lazy
valid_sales = sales.join(
    valid_products.select("product_id"),
    on="product_id",
    how="semi",
)

# BAD - is_in() forces collection
valid_ids = valid_products.collect()["product_id"]  # Materializes!
valid_sales = sales.filter(pl.col("product_id").is_in(valid_ids))

Streaming with LazyFrames

Return LazyFrame from transform assets to enable streaming writes:

@dg.asset
def my_transform(context) -> pl.LazyFrame:
    """Returns LazyFrame for streaming via sink_parquet."""
    result = (
        pl.scan_parquet(path)
        .filter(...)
        .with_columns(...)
    )
    # Don't collect! Return lazy for streaming
    return result

The IO manager uses sink_parquet() instead of write_parquet(), never materializing the full DataFrame.

Visualization in Metadata

Add charts and tables to Dagster asset metadata:

from cogapp_libs.dagster import altair_to_metadata, table_preview_to_metadata

@dg.asset
def my_output(context, data: pl.LazyFrame) -> pl.DataFrame:
    result = data.collect()

    # Bar chart
    chart = result.plot.bar(x="category", y="count")

    # Add to metadata
    context.add_output_metadata({
        **altair_to_metadata(chart, "distribution"),
        **table_preview_to_metadata(result.head(5), "preview", "Top 5"),
    })

    return result

Multi-Table Joins

The sales enrichment join (sales + artworks + artists) is the most common pattern in the codebase:

result = (
    sales.join(artworks, on="artwork_id", how="left", suffix="_aw")
    .join(artists, on="artist_id", how="left", suffix="_ar")
    .select(
        "sale_id", "sale_date", "sale_price_usd", "buyer_country",
        "artwork_id", "title", pl.col("year").alias("artwork_year"),
        pl.col("price_usd").alias("list_price_usd"), "medium",
        "artist_id", pl.col("name").str.strip_chars().str.to_uppercase().alias("artist_name"),
        "nationality",
    )
)

Helper opportunity: join_sales_enrichment

This 3-table join with identical column selection appears in 5 Polars asset files. A helper would centralise the join logic and column mapping:

def join_sales_enrichment(
    sales: pl.LazyFrame, artworks: pl.LazyFrame, artists: pl.LazyFrame,
) -> pl.LazyFrame:
    """Join sales with artworks and artists, applying standard column renames."""
    return (
        sales.join(artworks, on="artwork_id", how="left", suffix="_aw")
        .join(artists, on="artist_id", how="left", suffix="_ar")
        .select(
            "sale_id", "sale_date", "sale_price_usd", "buyer_country",
            "artwork_id", "title", pl.col("year").alias("artwork_year"),
            pl.col("price_usd").alias("list_price_usd"), "medium",
            "artist_id", normalize_artist_name("name"),
            "nationality",
        )
    )

Currently duplicated in: polars/assets.py, polars/assets_fs.py, polars/assets_ops.py, polars/assets_multi.py

Helper opportunity: extract_value_counts

Converting Polars value counts to a plain dict for metadata is a 3-line pattern repeated in every output asset:

def extract_value_counts(df: pl.DataFrame, col: str) -> dict[str, int]:
    """Convert a column's value counts to a plain dict for metadata."""
    vc = df[col].value_counts().sort(col)
    return dict(zip(vc[col].to_list(), vc["count"].to_list()))

# Usage: context.add_output_metadata({"tier_distribution": extract_value_counts(result, "price_tier")})

Currently duplicated in: polars/assets.py, polars/assets_fs.py, polars/assets_ops.py, duckdb/assets.py, original/assets.py


Quick Reference

Pattern Do Don't
Evaluation scan_parquet().collect() read_parquet()
Multiple columns Single with_columns() Chained with_columns()
Sorted groups sort_by() in agg() sort() before group_by()
Filtering by list how="semi" join is_in() with DataFrame
Streaming Return LazyFrame Return DataFrame

External Resources