Skip to content

Best Practices for Dagster Development

Guidelines and recommendations for writing maintainable, performant Dagster pipelines in honey-duck.

Code Organization

DO: Keep Assets Focused and Single-Purpose

# GOOD - Clear, focused asset
@dg.asset(kinds={"polars"}, group_name="transform")
def sales_with_discounts(context: dg.AssetExecutionContext) -> pl.DataFrame:
    """Add discount calculations to sales data."""
    tables = read_harvest_tables_lazy(...)
    result = tables["sales_raw"].with_columns(
        (pl.col("sale_price_usd") * 0.1).alias("discount")
    ).collect()
    return result
# BAD - Doing too many things
@dg.asset
def process_everything(context):
    """Process sales, artworks, artists, media, and generate reports."""
    # 200 lines of mixed logic...
    # Hard to test, debug, and reuse

Why: Single-purpose assets are easier to test, debug, and understand.


DO: Use Descriptive Asset Names

# GOOD - Clear what it does
@dg.asset
def high_value_sales_over_1m(context) -> pl.DataFrame:
    pass

@dg.asset
def artworks_with_primary_images(context) -> pl.DataFrame:
    pass
# BAD - Vague names
@dg.asset
def process_data(context) -> pl.DataFrame:
    pass

@dg.asset
def step2(context) -> pl.DataFrame:
    pass

Why: Clear names make the asset graph self-documenting.


@dg.asset(
    kinds={"polars"},
    group_name="transform_polars",  # ← Groups in UI
)
def sales_transform_polars(context) -> pl.DataFrame:
    pass

@dg.asset(
    kinds={"polars", "json"},
    group_name="output_polars",  # ← Different group
)
def sales_output_polars(context) -> pl.DataFrame:
    pass

Why: Groups help organize assets in the UI and make navigation easier.


Data Processing

DO: Use Lazy Evaluation (Polars)

# GOOD - Lazy operations, collect once
@dg.asset
def my_transform(context) -> pl.DataFrame:
    result = (
        pl.scan_parquet(path)          # ← Lazy
        .filter(pl.col("price") > 1000)
        .select(["id", "price", "date"])
        .sort("date")
        .collect()                      # ← Execute once
    )
    return result
# BAD - Eager operations, multiple materializations
@dg.asset
def my_transform(context) -> pl.DataFrame:
    df = pl.read_parquet(path)         # ← Loads all data
    df = df.filter(pl.col("price") > 1000)  # ← Filters in memory
    df = df.select(["id", "price", "date"])
    df = df.sort("date")
    return df

Why: Lazy evaluation lets Polars optimize the entire query plan, often 10-100x faster.


Polars Patterns

For detailed Polars optimization patterns (consolidating with_columns, sort_by in aggregations, semi-joins, etc.), see the dedicated Polars Patterns guide.


DO: Validate Data Early

# GOOD - Validate inputs
@dg.asset
def sales_enriched(context) -> pl.DataFrame:
    tables = read_harvest_tables_lazy(
        HARVEST_PARQUET_DIR,
        ("sales_raw", ["sale_id", "sale_price_usd"]),  # ← Validates columns
        asset_name="sales_enriched",
    )

    result = tables["sales_raw"].filter(
        pl.col("sale_price_usd") > 0  # ← Business rule validation
    ).collect()

    # Validate outputs
    if len(result) == 0:
        raise ValueError("No valid sales data after filtering")

    return result

Why: Fail fast with clear error messages rather than silent data corruption.


DO: Add Rich Metadata

# GOOD - Rich metadata
@dg.asset
def sales_transform(context) -> pl.DataFrame:
    with track_timing(context, "transformation"):
        result = transform_sales()

    add_dataframe_metadata(
        context,
        result,
        # Business metrics
        total_revenue=float(result["sale_price_usd"].sum()),
        avg_sale=float(result["sale_price_usd"].mean()),
        unique_customers=result["customer_id"].n_unique(),
        date_range=f"{result['sale_date'].min()} to {result['sale_date'].max()}",
        # Data quality metrics
        null_prices=int(result["sale_price_usd"].is_null().sum()),
    )

    return result

Why: Metadata makes debugging easier and provides observability in the UI.

See Logging & Reporting for metadata patterns, notification sensors, and the df_preview_metadata helper suggestion.


Error Handling

DO: Use Descriptive Error Messages

# GOOD - Clear, actionable error
@dg.asset
def validate_sales(context) -> pl.DataFrame:
    result = load_sales()

    invalid_prices = result.filter(pl.col("price") < 0)
    if len(invalid_prices) > 0:
        sample_ids = invalid_prices["sale_id"].head(5).to_list()
        raise ValueError(
            f"Found {len(invalid_prices)} sales with negative prices. "
            f"Sample IDs: {sample_ids}. "
            f"Check data source for corruption."
        )

    return result
# BAD - Vague error
@dg.asset
def validate_sales(context) -> pl.DataFrame:
    result = load_sales()
    if len(result.filter(pl.col("price") < 0)) > 0:
        raise ValueError("Invalid data")  # ← What's invalid? How to fix?

    return result

Why: Specific errors save hours of debugging time.


DO: Log Progress for Long Operations

# GOOD - Progress logging
@dg.asset
def process_large_dataset(context) -> pl.DataFrame:
    context.log.info("Loading data from source...")
    data = load_data()

    context.log.info(f"Loaded {len(data):,} rows")

    context.log.info("Applying transformations...")
    result = transform(data)

    context.log.info(f"Transformations complete. Output: {len(result):,} rows")

    return result

Why: Logs help you understand where time is spent and identify bottlenecks.


Performance

DO: Profile Before Optimizing

# GOOD - Measure, then optimize
@dg.asset
def my_transform(context) -> pl.DataFrame:
    with track_timing(context, "loading"):
        data = load_data()

    with track_timing(context, "filtering"):
        filtered = data.filter(...)

    with track_timing(context, "aggregating"):
        result = filtered.group_by(...).agg(...)

    # Check metadata to see which step is slow
    return result

Why: Optimization without measurement is guesswork. Measure first!


DO: Use Appropriate Data Types

# GOOD - Efficient types
df = pl.DataFrame({
    "id": pl.Series([1, 2, 3], dtype=pl.UInt32),      # Not Int64
    "price": pl.Series([1.5, 2.5], dtype=pl.Float32), # Not Float64
    "category": pl.Series(["A", "B"], dtype=pl.Categorical),  # Not String
})

Why: Smaller types = less memory = faster processing. Use UInt32 for IDs, Float32 for prices, Categorical for categories.


DO: Filter Early, Select Late

# GOOD - Filter first (reduces data), select last
result = (
    pl.scan_parquet(path)
    .filter(pl.col("price") > 1000)     # ← Reduce rows first
    .filter(pl.col("country") == "US")
    .select(["id", "price", "date"])    # ← Reduce columns last
    .collect()
)
# BAD - Select first, filter last
result = (
    pl.scan_parquet(path)
    .select(["id", "price", "date", "country", "many", "other", "cols"])  # ← Too many columns
    .filter(pl.col("price") > 1000)     # ← Filtering large dataset
    .filter(pl.col("country") == "US")
    .collect()
)

Why: Filtering early reduces data volume, making subsequent operations faster.


Testing

DO: Write Tests for Business Logic

# tests/test_sales_logic.py
def test_discount_calculation():
    """Test that discounts are calculated correctly."""
    # Arrange
    sales = pl.DataFrame({
        "sale_id": [1, 2],
        "price": [100.0, 200.0],
    })

    # Act
    result = apply_discount(sales, discount_pct=10)

    # Assert
    assert result["discounted_price"].to_list() == [90.0, 180.0]

Why: Tests catch bugs early and make refactoring safer.


DO: Test Edge Cases

def test_handles_empty_dataframe():
    """Test that pipeline handles empty input gracefully."""
    empty_df = pl.DataFrame(schema={"sale_id": pl.Int64, "price": pl.Float64})

    # Should not crash
    result = process_sales(empty_df)

    assert len(result) == 0
    assert result.schema == expected_schema

Why: Edge cases often cause production failures.


Documentation

DO: Write Helpful Docstrings

# GOOD - Clear docstring
@dg.asset(kinds={"polars"}, group_name="transform")
def sales_with_price_tiers(
    context: dg.AssetExecutionContext,
    sales_transform: pl.DataFrame,
) -> pl.DataFrame:
    """Categorize sales into price tiers (budget/mid/premium).

    Applies the following tiers:
    - Budget: < $500k
    - Mid: $500k - $3M
    - Premium: >= $3M

    Returns:
        DataFrame with additional 'price_tier' column
    """
    pass
# BAD - No docstring
@dg.asset
def process(context, data):
    pass  # What does this do? What are the tiers?

Why: Good docstrings help teammates (and future you) understand the code.


DO: Document Business Rules

@dg.asset
def filter_valid_sales(context) -> pl.DataFrame:
    """Filter sales to include only valid transactions.

    Business rules (as of 2024-01):
    - Sale price must be positive
    - Sale date must be within last 10 years
    - Must have valid artwork_id reference
    - Excludes test/demo sales (buyer_country != 'XX')

    See: https://wiki.company.com/sales-validation-rules
    """
    pass

Why: Business logic changes over time. Documentation preserves context.


Dependencies

DO: Use deps for External Dependencies

# GOOD - Clear external dependency
from honey_duck.defs.shared.helpers import STANDARD_HARVEST_DEPS

@dg.asset(
    deps=STANDARD_HARVEST_DEPS,  # ← External CSV/SQLite files
    kinds={"dlt"},
)
def my_harvest(context):
    """Load data from external CSV files."""
    pass

Why: Makes it clear which assets depend on external data sources.


DO: Keep Dependency Graphs Shallow

# GOOD - Shallow graph (3 levels)
csv_sales  harvest_sales  sales_transform  sales_output

# BAD - Deep graph (10+ levels)
raw  clean  validate  enrich  normalize  categorize 
    aggregate  summarize  filter  output

Why: Shallow graphs are easier to understand and faster to recompute.


Configuration

DO: Use Environment Variables for Config

# GOOD - Environment-based config
import os

OUTPUT_PATH = Path(os.getenv("SALES_OUTPUT_PATH", "data/output/json/sales.json"))
THRESHOLD = int(os.getenv("SALES_THRESHOLD", "1000"))
# BAD - Hardcoded values
OUTPUT_PATH = Path("/home/user/data/sales.json")  # ← Breaks on other machines
THRESHOLD = 1000  # ← Can't override without code change

Why: Environment variables make code portable and configurable.


DO: Use Constants for Business Rules

# src/honey_duck/defs/shared/constants.py
MIN_SALE_VALUE_USD = 10_000
PRICE_TIER_BUDGET_MAX_USD = 500_000
PRICE_TIER_MID_MAX_USD = 3_000_000

# In asset code
from honey_duck.defs.shared.constants import MIN_SALE_VALUE_USD

result = df.filter(pl.col("price") >= MIN_SALE_VALUE_USD)

Why: Centralized constants are easier to update and audit.

Helper opportunity: compute_price_tier

The price tier CASE/WHEN logic is duplicated 6 times across Polars and DuckDB assets, each using the same constants. A shared expression helper would be a single source of truth:

# In shared/expressions.py
from .constants import PRICE_TIER_BUDGET_MAX_USD, PRICE_TIER_MID_MAX_USD

def price_tier_expr() -> pl.Expr:
    """Polars expression for price tier categorization."""
    return (
        pl.when(pl.col("list_price_usd") < PRICE_TIER_BUDGET_MAX_USD)
        .then(pl.lit("budget"))
        .when(pl.col("list_price_usd") < PRICE_TIER_MID_MAX_USD)
        .then(pl.lit("mid"))
        .otherwise(pl.lit("premium"))
        .alias("price_tier")
    )

PRICE_TIER_SQL = f"""
    CASE
        WHEN list_price_usd < {PRICE_TIER_BUDGET_MAX_USD} THEN 'budget'
        WHEN list_price_usd < {PRICE_TIER_MID_MAX_USD} THEN 'mid'
        ELSE 'premium'
    END AS price_tier
"""

# Usage (Polars): df.with_columns(price_tier_expr())
# Usage (DuckDB): f"SELECT *, {PRICE_TIER_SQL} FROM artworks"

Currently duplicated in: polars/assets.py, polars/assets_fs.py, polars/assets_ops.py, polars/assets_multi.py, duckdb/assets.py, duckdb_soda/assets.py

Helper opportunity: normalize_artist_name

Artist name normalization (strip whitespace + uppercase) appears 6 times across assets. A shared expression keeps the business rule in one place:

def normalize_artist_name(col: str = "artist_name") -> pl.Expr:
    """Strip whitespace and uppercase artist names."""
    return pl.col(col).str.strip_chars().str.to_uppercase()

# Usage: df.with_columns(normalize_artist_name().alias("artist_name"))

Currently duplicated in: polars/assets.py, polars/assets_fs.py, polars/assets_ops.py, polars/assets_multi.py, duckdb/assets.py, original/assets.py


Anti-Patterns to Avoid

DON'T: Modify Input Data In-Place

# BAD
@dg.asset
def my_asset(context, input_df: pl.DataFrame) -> pl.DataFrame:
    input_df = input_df.with_columns(...)  # ← Modifies parameter
    return input_df
# GOOD
@dg.asset
def my_asset(context, input_df: pl.DataFrame) -> pl.DataFrame:
    result = input_df.with_columns(...)  # ← New variable
    return result

DON'T: Use Magic Numbers

# BAD
df = df.filter(pl.col("price") > 1000000)  # What does 1M mean?
df = df.filter(pl.col("age_days") < 365)   # Why 365?
# GOOD
PREMIUM_PRICE_THRESHOLD = 1_000_000  # Prices above 1M are "premium"
MAX_AGE_DAYS = 365  # Only include sales from last year

df = df.filter(pl.col("price") > PREMIUM_PRICE_THRESHOLD)
df = df.filter(pl.col("age_days") < MAX_AGE_DAYS)

DON'T: Ignore Errors Silently

# BAD
try:
    result = risky_operation()
except Exception:
    result = None  # ← Silent failure!
# GOOD
try:
    result = risky_operation()
except ValueError as e:
    context.log.error(f"Operation failed: {e}")
    raise  # Re-raise for visibility

DON'T: Create God Assets

# BAD - One asset doing everything
@dg.asset
def complete_pipeline(context):
    # Load
    # Transform
    # Validate
    # Aggregate
    # Write
    # Notify
    # 500 lines of code...
# GOOD - Split into focused assets
@dg.asset
def load_data(context): ...

@dg.asset
def transform_data(context, load_data): ...

@dg.asset
def validate_data(context, transform_data): ...

Quick Checklist

Before committing code, check:

  • [ ] Asset has a clear, descriptive name
  • [ ] Asset has a docstring explaining what it does
  • [ ] Uses helper functions (read_harvest_tables_lazy, add_dataframe_metadata)
  • [ ] Returns correct type (usually pl.DataFrame)
  • [ ] Adds useful metadata (record counts, business metrics)
  • [ ] Uses lazy operations where possible (Polars scan_*)
  • [ ] Validates inputs and outputs
  • [ ] Has logging for long operations
  • [ ] Has tests for business logic
  • [ ] No hardcoded paths or magic numbers
  • [ ] Follows honey-duck naming conventions

Remember: Good code is code that's easy to understand, test, and change. When in doubt, prefer clarity over cleverness.