Skip to content

Getting Started with Dagster - For New Developers

A hands-on guide to understanding and working with Dagster in the honey-duck project.

What is Dagster?

Dagster is a data orchestration platform that helps you build, test, and monitor data pipelines. Think of it as a smart way to organize your data processing code with:

  • Visual lineage: See how data flows through your pipeline
  • Incremental execution: Only recompute what changed
  • Observability: Track what's happening in real-time
  • Testing: Test your pipeline logic before deploying
  • Scheduling: Run pipelines on a schedule or trigger manually

Core Concepts (5-Minute Version)

1. Assets - The Building Blocks

An asset is a piece of data you want to create and keep track of. In honey-duck:

@dg.asset
def sales_transform(context: dg.AssetExecutionContext) -> pl.DataFrame:
    """This function creates the 'sales_transform' asset."""
    # Your transformation logic here
    result = load_and_transform_sales()
    return result

Key points: - The function name becomes the asset name - Return value is the asset's data - Dagster tracks when it was created and what it depends on

2. Dependencies - How Assets Connect

Assets can depend on other assets:

@dg.asset
def sales_output(
    context: dg.AssetExecutionContext,
    sales_transform: pl.DataFrame,  # ← Dependency!
) -> pl.DataFrame:
    """This asset depends on sales_transform."""
    # sales_transform is automatically loaded and passed in
    return sales_transform.filter(pl.col("price") > 1000)

Dagster automatically: - Runs sales_transform before sales_output - Passes the data between them - Shows the connection in the UI

3. Context - Your Pipeline Toolbox

The context parameter gives you access to:

@dg.asset
def my_asset(context: dg.AssetExecutionContext):
    context.log.info("Logging messages")  # Logging
    context.add_output_metadata({"rows": 100})  # Metadata
    # ... your code

4. IO Managers - How Data is Stored

IO Managers handle saving/loading data between assets:

# Dagster automatically:
# 1. Saves sales_transform as Parquet (PolarsParquetIOManager)
# 2. Loads it when sales_output needs it
# 3. Shows file paths in the UI

You don't need to write file I/O code - Dagster handles it!

Your First Asset (Step by Step)

Let's create a simple asset that filters artworks by price:

Step 1: Create the Asset

Add to src/honey_duck/defs/polars/my_first_asset.py:

import dagster as dg
import polars as pl
from cogapp_libs.dagster import read_harvest_tables_lazy, add_dataframe_metadata
from honey_duck.defs.shared.resources import PathsResource

@dg.asset(
    kinds={"polars"},
    group_name="tutorial",
)
def expensive_artworks(
    context: dg.AssetExecutionContext,
    paths: PathsResource,
) -> pl.DataFrame:
    """Find artworks priced over $1 million."""

    # Load data from harvest
    tables = read_harvest_tables_lazy(
        paths.harvest_dir,
        ("artworks_raw", ["artwork_id", "title", "price_usd"]),
        asset_name="expensive_artworks",
    )

    # Filter for expensive artworks
    result = (
        tables["artworks_raw"]
        .filter(pl.col("price_usd") > 1_000_000)
        .sort("price_usd", descending=True)
        .collect()
    )

    # Add metadata (shows up in UI)
    add_dataframe_metadata(
        context,
        result,
        total_value=float(result["price_usd"].sum()),
        avg_price=float(result["price_usd"].mean()),
    )

    context.log.info(f"Found {len(result)} expensive artworks")

    return result

Step 2: Register the Asset

The asset is automatically registered -- Dagster discovers assets in the defs/ folder. No manual registration needed.

Step 3: Materialize It

Start Dagster UI:

uv run dg dev

Open http://localhost:3000 and: 1. Go to Assets tab 2. Find expensive_artworks 3. Click Materialize 4. Watch it run!

Step 4: View the Results

After materialization: - Click on the asset to see metadata - Click "Show Markdown" on preview - See logs in the Logs tab - View file location in Metadata

Common Patterns

Pattern 1: Reading from Harvest Tables

from cogapp_libs.dagster import read_harvest_tables_lazy
from honey_duck.defs.shared.resources import PathsResource

@dg.asset
def my_asset(
    context: dg.AssetExecutionContext,
    paths: PathsResource,  # Injected by Dagster
) -> pl.DataFrame:
    # Read multiple tables at once
    tables = read_harvest_tables_lazy(
        paths.harvest_dir,  # Use injected resource
        ("sales_raw", ["sale_id", "sale_price_usd"]),  # Validates columns
        ("artworks_raw", ["artwork_id", "title"]),
        asset_name="my_asset",  # For error messages
    )

    # Use lazy operations, then collect
    return (
        tables["sales_raw"]
        .join(tables["artworks_raw"], on="artwork_id")
        .filter(pl.col("sale_price_usd") > 1000)
        .collect()  # ← Execute here
    )

Pattern 2: Adding Metadata

from cogapp_libs.dagster import add_dataframe_metadata, track_timing

@dg.asset
def my_transform(context: dg.AssetExecutionContext) -> pl.DataFrame:
    # Track execution time automatically
    with track_timing(context, "transformation"):
        result = expensive_operation()

    # Add rich metadata
    add_dataframe_metadata(
        context,
        result,
        unique_customers=result["customer_id"].n_unique(),
        total_revenue=float(result["revenue"].sum()),
        date_range=f"{result['date'].min()} to {result['date'].max()}",
    )

    return result

Pattern 3: Writing JSON Output

from cogapp_libs.dagster import write_json_output
from honey_duck.defs.shared.resources import OutputPathsResource

@dg.asset(kinds={"polars", "json"})
def my_output(
    context: dg.AssetExecutionContext,
    my_transform: pl.DataFrame,
    output_paths: OutputPathsResource,  # Injected by Dagster
) -> pl.DataFrame:
    """Filter and write to JSON."""
    filtered = my_transform.filter(pl.col("amount") > 100)

    # Write to configured output path
    write_json_output(
        filtered,
        output_paths.sales_polars,  # Use configured path
        context,
    )

    return filtered

Pattern 4: External Dependencies

When your asset depends on external files (not other assets):

from honey_duck.defs.shared.helpers import STANDARD_HARVEST_DEPS

@dg.asset(
    deps=STANDARD_HARVEST_DEPS,  # CSV/SQLite files
    kinds={"dlt"},
)
def my_harvest(context: dg.AssetExecutionContext):
    # This asset depends on dlt_harvest_* assets
    # which load from CSV files
    pass

Understanding the Asset Graph

In Dagster UI, you'll see nodes and edges:

csv_sales → dlt_harvest_sales → sales_transform → sales_output
                                 sales_joined
  • Nodes (boxes): Assets
  • Edges (arrows): Dependencies
  • Colors:
  • Green: Successfully materialized
  • Gray: Not yet materialized
  • Yellow: In progress
  • Red: Failed

Tip: Click "View lineage" to see the full dependency graph!

Testing Your Assets

Method 1: In the UI

  1. Materialize single asset: Click asset → Materialize
  2. Materialize with dependencies: Check "Materialize upstream assets"
  3. View results: Click asset → See metadata/preview/logs

Method 2: In Tests

# tests/test_my_asset.py
from dagster import materialize
from honey_duck.defs.polars.my_first_asset import expensive_artworks

def test_expensive_artworks():
    # Materialize the asset
    result = materialize([expensive_artworks])
    assert result.success

    # Check output
    output = result.output_for_node("expensive_artworks")
    assert len(output) > 0
    assert all(output["price_usd"] > 1_000_000)

Method 3: Via CLI

# Materialize specific asset
uv run dg launch --assets expensive_artworks

# Materialize with dependencies
uv run dg launch --assets sales_output --select +sales_output

# Run full job
uv run dg launch --job polars_pipeline

Debugging Tips

Check Logs

@dg.asset
def my_asset(context: dg.AssetExecutionContext):
    context.log.info("Starting processing...")
    context.log.debug(f"Data shape: {df.shape}")
    context.log.warning("Low data quality detected")
    # context.log.error("Critical error!")  # For errors

View logs in: - Dagster UI: Asset page → Logs tab - Terminal: When running dg dev

Inspect Intermediate Data

@dg.asset
def my_asset(context: dg.AssetExecutionContext):
    result = transform_data()

    # Add preview to metadata
    context.add_output_metadata({
        "preview": dg.MetadataValue.md(
            result.head(10).to_pandas().to_markdown()
        ),
    })

    return result

Check File Paths

Assets stored by IO managers are in:

data/output/storage/
  ├── expensive_artworks/
  │   └── expensive_artworks
  └── sales_transform/
      └── sales_transform

Read them directly for debugging:

import polars as pl
df = pl.read_parquet("data/output/storage/expensive_artworks/expensive_artworks")

Common Mistakes & Fixes

Mistake 1: Forgetting to Return Data

@dg.asset
def my_asset(context):
    result = transform_data()
    # Forgot to return!  ← BUG

Fix:

@dg.asset
def my_asset(context):
    result = transform_data()
    return result  # ← Always return!

Mistake 2: Wrong Parameter Name

@dg.asset
def sales_output(context, sales_data: pl.DataFrame):  # ← Wrong name
    # Dagster looks for asset named "sales_data", not "sales_transform"

Fix:

@dg.asset
def sales_output(context, sales_transform: pl.DataFrame):  # ← Match asset name
    pass

Mistake 3: Modifying Input Data

@dg.asset
def my_asset(context, input_data: pl.DataFrame):
    input_data = input_data.filter(...)  # May cause issues
    return input_data

Fix:

@dg.asset
def my_asset(context, input_data: pl.DataFrame):
    result = input_data.filter(...)  # Create new variable
    return result

Mistake 4: Not Using Helper Functions

@dg.asset
def my_asset(context):
    # Manual file reading - error-prone!
    df = pl.read_parquet("data/output/dlt/harvest_parquet/sales_raw.parquet")

Fix:

@dg.asset
def my_asset(context, paths: PathsResource):
    # Use helper - automatic validation!
    tables = read_harvest_tables_lazy(
        paths.harvest_dir,
        ("sales_raw", ["sale_id", "price"]),
        asset_name="my_asset",
    )

Next Steps

1. Explore Existing Assets

Read these files to see patterns: - src/honey_duck/defs/polars/assets.py - Clean Polars implementation - src/honey_duck/defs/duckdb/assets.py - SQL-based approach - See Polars Patterns for detailed pattern documentation

2. Learn Multi-Assets

For related outputs:

@dg.multi_asset(
    outs={
        "sales_summary": dg.AssetOut(),
        "sales_details": dg.AssetOut(),
    }
)
def sales_pipeline(context):
    # Create both assets in one function
    summary = create_summary()
    details = create_details()

    yield dg.Output(summary, output_name="sales_summary")
    yield dg.Output(details, output_name="sales_details")

3. Add Asset Checks

Validate data quality:

from dagster import AssetCheckResult, asset_check

@asset_check(asset=expensive_artworks)
def check_price_range(context):
    """Ensure all artworks are actually expensive."""
    df = load_asset(expensive_artworks)

    min_price = df["price_usd"].min()

    return AssetCheckResult(
        passed=min_price > 1_000_000,
        metadata={"min_price": float(min_price)},
    )

4. Add Schedules

Run assets on a schedule:

from dagster import ScheduleDefinition

daily_schedule = ScheduleDefinition(
    name="daily_pipeline",
    cron_schedule="0 2 * * *",  # 2 AM daily
    target=[expensive_artworks],
)

Resources

Quick Reference

# Start Dagster UI
uv run dg dev

# Materialize asset
uv run dg launch --assets my_asset

# Run job
uv run dg launch --job polars_pipeline

# Run tests
uv run pytest

# Check asset graph
# → Open http://localhost:3000 → Assets → View lineage

Welcome to Dagster. Start small, experiment in the UI, and build up from there.