Polars Patterns¶
Best practices for writing efficient Polars code in Dagster pipelines.
Lazy Evaluation¶
Always prefer lazy operations and collect once at the end:
# GOOD - Lazy operations, collect once
result = (
pl.scan_parquet(path)
.filter(pl.col("price") > 1000)
.select(["id", "price", "date"])
.sort("date")
.collect() # Execute once
)
# BAD - Eager operations
df = pl.read_parquet(path) # Loads all data
df = df.filter(pl.col("price") > 1000)
df = df.select(["id", "price", "date"])
Consolidate with_columns Chains¶
Multiple expressions in a single with_columns run in parallel:
# GOOD - Parallel execution
result = df.with_columns(
(pl.col("price") * 0.1).alias("tax"),
pl.col("name").str.to_uppercase(),
pl.col("date").dt.year().alias("year"),
)
# BAD - Sequential execution
result = df.with_columns((pl.col("price") * 0.1).alias("tax"))
result = result.with_columns(pl.col("name").str.to_uppercase())
result = result.with_columns(pl.col("date").dt.year().alias("year"))
When to split
Only split with_columns when later expressions depend on earlier ones:
Sort Inside Aggregations¶
sort() before group_by() doesn't guarantee order within groups:
# GOOD - Sort inside aggregation
result = df.group_by("category").agg(
pl.struct("id", "value", "date")
.sort_by("date")
.alias("items_by_date")
)
# BAD - Sort before group_by
result = df.sort("date").group_by("category").agg(
pl.struct("id", "value", "date").alias("items_by_date")
)
Semi-Joins Over is_in()¶
Semi-joins stay lazy and avoid early materialization:
# GOOD - Semi-join stays lazy
valid_sales = sales.join(
valid_products.select("product_id"),
on="product_id",
how="semi",
)
# BAD - is_in() forces collection
valid_ids = valid_products.collect()["product_id"] # Materializes!
valid_sales = sales.filter(pl.col("product_id").is_in(valid_ids))
Streaming with LazyFrames¶
Return LazyFrame from transform assets to enable streaming writes:
@dg.asset
def my_transform(context) -> pl.LazyFrame:
"""Returns LazyFrame for streaming via sink_parquet."""
result = (
pl.scan_parquet(path)
.filter(...)
.with_columns(...)
)
# Don't collect! Return lazy for streaming
return result
The IO manager uses sink_parquet() instead of write_parquet(), never materializing the full DataFrame.
Visualization in Metadata¶
Add charts and tables to Dagster asset metadata:
from cogapp_libs.dagster import altair_to_metadata, table_preview_to_metadata
@dg.asset
def my_output(context, data: pl.LazyFrame) -> pl.DataFrame:
result = data.collect()
# Bar chart
chart = result.plot.bar(x="category", y="count")
# Add to metadata
context.add_output_metadata({
**altair_to_metadata(chart, "distribution"),
**table_preview_to_metadata(result.head(5), "preview", "Top 5"),
})
return result
Multi-Table Joins¶
The sales enrichment join (sales + artworks + artists) is the most common pattern in the codebase:
result = (
sales.join(artworks, on="artwork_id", how="left", suffix="_aw")
.join(artists, on="artist_id", how="left", suffix="_ar")
.select(
"sale_id", "sale_date", "sale_price_usd", "buyer_country",
"artwork_id", "title", pl.col("year").alias("artwork_year"),
pl.col("price_usd").alias("list_price_usd"), "medium",
"artist_id", pl.col("name").str.strip_chars().str.to_uppercase().alias("artist_name"),
"nationality",
)
)
Helper opportunity: join_sales_enrichment
This 3-table join with identical column selection appears in 5 Polars asset files. A helper would centralise the join logic and column mapping:
def join_sales_enrichment(
sales: pl.LazyFrame, artworks: pl.LazyFrame, artists: pl.LazyFrame,
) -> pl.LazyFrame:
"""Join sales with artworks and artists, applying standard column renames."""
return (
sales.join(artworks, on="artwork_id", how="left", suffix="_aw")
.join(artists, on="artist_id", how="left", suffix="_ar")
.select(
"sale_id", "sale_date", "sale_price_usd", "buyer_country",
"artwork_id", "title", pl.col("year").alias("artwork_year"),
pl.col("price_usd").alias("list_price_usd"), "medium",
"artist_id", normalize_artist_name("name"),
"nationality",
)
)
Currently duplicated in: polars/assets.py, polars/assets_fs.py, polars/assets_ops.py, polars/assets_multi.py
Helper opportunity: extract_value_counts
Converting Polars value counts to a plain dict for metadata is a 3-line pattern repeated in every output asset:
def extract_value_counts(df: pl.DataFrame, col: str) -> dict[str, int]:
"""Convert a column's value counts to a plain dict for metadata."""
vc = df[col].value_counts().sort(col)
return dict(zip(vc[col].to_list(), vc["count"].to_list()))
# Usage: context.add_output_metadata({"tier_distribution": extract_value_counts(result, "price_tier")})
Currently duplicated in: polars/assets.py, polars/assets_fs.py, polars/assets_ops.py, duckdb/assets.py, original/assets.py
Quick Reference¶
| Pattern | Do | Don't |
|---|---|---|
| Evaluation | scan_parquet().collect() |
read_parquet() |
| Multiple columns | Single with_columns() |
Chained with_columns() |
| Sorted groups | sort_by() in agg() |
sort() before group_by() |
| Filtering by list | how="semi" join |
is_in() with DataFrame |
| Streaming | Return LazyFrame |
Return DataFrame |
External Resources¶
- Polars User Guide: pola.rs/docs
- Polars API Reference: pola.rs/api/python
- Polars Lazy API: Lazy API Guide
- Polars Expressions: Expression Guide
- Polars GitHub: github.com/pola-rs/polars