Skip to content

Dagster Helpers

Utilities for building Dagster assets with less boilerplate.

External Documentation

Data Loading

read_harvest_table_lazy

Read harvest table from Parquet files as Polars LazyFrame.

Convenience wrapper around read_parquet_table_lazy for harvest tables. Automatically handles the schema subdirectory structure.

Parameters:

Name Type Description Default
harvest_dir str | Path

Path to harvest Parquet directory (e.g., data/output/dlt/harvest_parquet)

required
table_name str

Name of table to read

required
schema str

Schema/subdirectory name (default: "raw")

'raw'
required_columns list[str] | None

Optional list of required column names

None
asset_name str

Name of calling asset (for error context)

'unknown'

Returns:

Type Description
LazyFrame

LazyFrame with data from Parquet files

Raises:

Type Description
FileNotFoundError

If harvest directory doesn't exist

MissingTableError

If table doesn't exist (lists available tables)

MissingColumnError

If required columns are missing (lists available columns)

Example
from pathlib import Path
sales = read_harvest_table_lazy(
    Path("data/output/dlt/harvest_parquet"),
    "sales_raw",
    asset_name="sales_transform"
)
result = sales.filter(pl.col("sale_price_usd") > 1000).collect()
Source code in cogapp_libs/dagster/validation.py
def read_harvest_table_lazy(
    harvest_dir: str | Path,
    table_name: str,
    schema: str = "raw",
    required_columns: list[str] | None = None,
    asset_name: str = "unknown",
) -> pl.LazyFrame:
    """Read harvest table from Parquet files as Polars LazyFrame.

    Convenience wrapper around read_parquet_table_lazy for harvest tables.
    Automatically handles the schema subdirectory structure.

    Args:
        harvest_dir: Path to harvest Parquet directory (e.g., data/output/dlt/harvest_parquet)
        table_name: Name of table to read
        schema: Schema/subdirectory name (default: "raw")
        required_columns: Optional list of required column names
        asset_name: Name of calling asset (for error context)

    Returns:
        LazyFrame with data from Parquet files

    Raises:
        FileNotFoundError: If harvest directory doesn't exist
        MissingTableError: If table doesn't exist (lists available tables)
        MissingColumnError: If required columns are missing (lists available columns)

    Example:
        ```python
        from pathlib import Path
        sales = read_harvest_table_lazy(
            Path("data/output/dlt/harvest_parquet"),
            "sales_raw",
            asset_name="sales_transform"
        )
        result = sales.filter(pl.col("sale_price_usd") > 1000).collect()
        ```
    """
    harvest_dir = Path(harvest_dir)
    return read_parquet_table_lazy(
        harvest_dir / schema,
        table_name,
        required_columns=required_columns,
        asset_name=asset_name,
    )

read_harvest_tables_lazy

Read multiple harvest tables in one call with validation.

Convenience function to batch-read multiple related tables. Reduces boilerplate when you need to join or process multiple tables together.

Parameters:

Name Type Description Default
harvest_dir str | Path

Path to harvest Parquet directory (e.g., data/output/dlt/harvest_parquet)

required
*table_specs tuple[str, list[str] | None]

Variable number of (table_name, required_columns) tuples. required_columns can be None to skip column validation.

()
schema str

Schema/subdirectory name (default: "raw")

'raw'
asset_name str

Name of calling asset (for error context)

'unknown'

Returns:

Type Description
dict[str, LazyFrame]

Dictionary mapping table names to LazyFrames

Raises:

Type Description
FileNotFoundError

If harvest directory doesn't exist

MissingTableError

If any table doesn't exist (lists available tables)

MissingColumnError

If required columns are missing (lists available columns)

Example
tables = read_harvest_tables_lazy(
    Path("data/output/dlt/harvest_parquet"),
    ("sales_raw", ["sale_id", "sale_price_usd"]),
    ("artworks_raw", ["artwork_id", "title"]),
    ("artists_raw", None),  # No column validation
    asset_name="sales_transform"
)
sales = tables["sales_raw"]
artworks = tables["artworks_raw"]
result = sales.join(artworks, on="artwork_id").collect()
Source code in cogapp_libs/dagster/validation.py
def read_harvest_tables_lazy(
    harvest_dir: str | Path,
    *table_specs: tuple[str, list[str] | None],
    schema: str = "raw",
    asset_name: str = "unknown",
) -> dict[str, pl.LazyFrame]:
    """Read multiple harvest tables in one call with validation.

    Convenience function to batch-read multiple related tables. Reduces
    boilerplate when you need to join or process multiple tables together.

    Args:
        harvest_dir: Path to harvest Parquet directory (e.g., data/output/dlt/harvest_parquet)
        *table_specs: Variable number of (table_name, required_columns) tuples.
                      required_columns can be None to skip column validation.
        schema: Schema/subdirectory name (default: "raw")
        asset_name: Name of calling asset (for error context)

    Returns:
        Dictionary mapping table names to LazyFrames

    Raises:
        FileNotFoundError: If harvest directory doesn't exist
        MissingTableError: If any table doesn't exist (lists available tables)
        MissingColumnError: If required columns are missing (lists available columns)

    Example:
        ```python
        tables = read_harvest_tables_lazy(
            Path("data/output/dlt/harvest_parquet"),
            ("sales_raw", ["sale_id", "sale_price_usd"]),
            ("artworks_raw", ["artwork_id", "title"]),
            ("artists_raw", None),  # No column validation
            asset_name="sales_transform"
        )
        sales = tables["sales_raw"]
        artworks = tables["artworks_raw"]
        result = sales.join(artworks, on="artwork_id").collect()
        ```
    """
    result = {}
    for table_name, required_columns in table_specs:
        result[table_name] = read_harvest_table_lazy(
            harvest_dir,
            table_name,
            schema=schema,
            required_columns=required_columns,
            asset_name=asset_name,
        )
    return result

read_parquet_table_lazy

Read table from Parquet files as Polars LazyFrame with validation.

Validates table existence and required columns before returning. Provides actionable error messages listing available tables/columns.

Parameters:

Name Type Description Default
parquet_dir str | Path

Path to directory containing Parquet files (e.g., harvest_parquet/)

required
table_name str

Name of table to read (subdirectory name)

required
required_columns list[str] | None

Optional list of required column names

None
asset_name str

Name of calling asset (for error context)

'unknown'

Returns:

Type Description
LazyFrame

LazyFrame with data from Parquet files

Raises:

Type Description
FileNotFoundError

If parquet directory doesn't exist

MissingTableError

If table directory doesn't exist (lists available tables)

MissingColumnError

If required columns are missing (lists available columns)

Example
sales = read_parquet_table_lazy(
    "/path/to/harvest_parquet",
    "sales_raw",
    required_columns=["sale_id", "sale_price_usd"],
    asset_name="sales_transform"
)
result = sales.filter(pl.col("sale_price_usd") > 1000).collect()
Source code in cogapp_libs/dagster/validation.py
def read_parquet_table_lazy(
    parquet_dir: str | Path,
    table_name: str,
    required_columns: list[str] | None = None,
    asset_name: str = "unknown",
) -> pl.LazyFrame:
    """Read table from Parquet files as Polars LazyFrame with validation.

    Validates table existence and required columns before returning.
    Provides actionable error messages listing available tables/columns.

    Args:
        parquet_dir: Path to directory containing Parquet files (e.g., harvest_parquet/)
        table_name: Name of table to read (subdirectory name)
        required_columns: Optional list of required column names
        asset_name: Name of calling asset (for error context)

    Returns:
        LazyFrame with data from Parquet files

    Raises:
        FileNotFoundError: If parquet directory doesn't exist
        MissingTableError: If table directory doesn't exist (lists available tables)
        MissingColumnError: If required columns are missing (lists available columns)

    Example:
        ```python
        sales = read_parquet_table_lazy(
            "/path/to/harvest_parquet",
            "sales_raw",
            required_columns=["sale_id", "sale_price_usd"],
            asset_name="sales_transform"
        )
        result = sales.filter(pl.col("sale_price_usd") > 1000).collect()
        ```
    """
    parquet_dir = Path(parquet_dir)

    # Check parquet directory exists
    if not parquet_dir.exists():
        error: Exception = FileNotFoundError(
            f"[{asset_name}] Parquet directory not found at {parquet_dir}. "
            f"Did you run the harvest job first?"
        )
        raise_as_dagster_failure(error)

    # Check table directory exists
    table_dir = parquet_dir / table_name
    if not table_dir.exists():
        available_tables = [d.name for d in parquet_dir.iterdir() if d.is_dir()]
        error = MissingTableError(asset_name, table_name, available_tables)
        raise_as_dagster_failure(error)

    # Read all Parquet files in table directory
    parquet_files = list(table_dir.glob("*.parquet"))
    if not parquet_files:
        error = FileNotFoundError(f"[{asset_name}] No Parquet files found in {table_dir}")
        raise_as_dagster_failure(error)

    # Read using scan_parquet which supports multiple files
    df = pl.scan_parquet(table_dir / "*.parquet")

    # Validate required columns if specified
    if required_columns:
        actual_columns = df.collect_schema().names()
        missing = set(required_columns) - set(actual_columns)
        if missing:
            error = MissingColumnError(asset_name, missing, actual_columns)
            raise_as_dagster_failure(error)

    return df

Validation

validate_dataframe

Validate DataFrame has required columns.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to validate

required
required_columns list[str]

List of required column names

required
asset_name str

Name of asset (for error messages)

required

Raises:

Type Description
Failure

If required columns are missing (with metadata)

Example
validate_dataframe(
    sales_df,
    ["sale_id", "sale_price_usd"],
    "sales_transform"
)
Source code in cogapp_libs/dagster/validation.py
def validate_dataframe(
    df: pl.DataFrame,
    required_columns: list[str],
    asset_name: str,
) -> None:
    """Validate DataFrame has required columns.

    Args:
        df: DataFrame to validate
        required_columns: List of required column names
        asset_name: Name of asset (for error messages)

    Raises:
        dagster.Failure: If required columns are missing (with metadata)

    Example:
        ```python
        validate_dataframe(
            sales_df,
            ["sale_id", "sale_price_usd"],
            "sales_transform"
        )
        ```
    """
    missing = set(required_columns) - set(df.columns)
    if missing:
        error = MissingColumnError(asset_name, missing, df.columns)
        raise_as_dagster_failure(error)

Metadata

add_dataframe_metadata

Add standard metadata for a DataFrame result.

Automatically includes: - Record count - Column list - Preview (first 5 rows as markdown table) - Any extra metadata provided

Passes through all Dagster parameters to maintain full compatibility with multi-output assets and dynamic partitions.

Parameters:

Name Type Description Default
context AssetExecutionContext

Asset execution context

required
df DataFrame

Result DataFrame

required
output_name str | None

(Optional) For multi-output assets - specify which output this metadata is for

None
mapping_key str | None

(Optional) For dynamic partitions - specify partition key

None
**extra_metadata MetadataValue | str | int | float | bool | None

Additional metadata to include

{}
Example
# Simple usage
add_dataframe_metadata(
    context,
    result,
    unique_artworks=result["artwork_id"].n_unique(),
    total_value=float(result["sale_price_usd"].sum()),
)

# With multi-output asset
add_dataframe_metadata(
    context,
    result,
    output_name="sales_joined",
    unique_artworks=result["artwork_id"].n_unique(),
)
Source code in cogapp_libs/dagster/helpers.py
def add_dataframe_metadata(
    context: dg.AssetExecutionContext,
    df: pl.DataFrame,
    output_name: str | None = None,
    mapping_key: str | None = None,
    **extra_metadata: dg.MetadataValue | str | int | float | bool | None,
) -> None:
    """Add standard metadata for a DataFrame result.

    Automatically includes:
    - Record count
    - Column list
    - Preview (first 5 rows as markdown table)
    - Any extra metadata provided

    Passes through all Dagster parameters to maintain full compatibility with
    multi-output assets and dynamic partitions.

    Args:
        context: Asset execution context
        df: Result DataFrame
        output_name: (Optional) For multi-output assets - specify which output this metadata is for
        mapping_key: (Optional) For dynamic partitions - specify partition key
        **extra_metadata: Additional metadata to include

    Example:
        ```python
        # Simple usage
        add_dataframe_metadata(
            context,
            result,
            unique_artworks=result["artwork_id"].n_unique(),
            total_value=float(result["sale_price_usd"].sum()),
        )

        # With multi-output asset
        add_dataframe_metadata(
            context,
            result,
            output_name="sales_joined",
            unique_artworks=result["artwork_id"].n_unique(),
        )
        ```
    """
    metadata = {
        "record_count": len(df),
        "columns": df.columns,
        "preview": dg.MetadataValue.md(df.head(5).to_pandas().to_markdown(index=False)),
        **extra_metadata,
    }
    context.add_output_metadata(metadata, output_name=output_name, mapping_key=mapping_key)

track_timing

Context manager to track and log execution time.

Automatically adds processing_time_ms to asset metadata and logs completion message. Use this to eliminate manual timing boilerplate.

Passes through all Dagster parameters to maintain full compatibility with multi-output assets and dynamic partitions.

Parameters:

Name Type Description Default
context AssetExecutionContext | OpExecutionContext

Asset execution context

required
operation str

Description of operation (e.g., "transform", "processing")

'processing'
log_message str | None

Optional custom log message template. Use {elapsed_ms} placeholder.

None
output_name str | None

(Optional) For multi-output assets - specify which output this metadata is for

None
mapping_key str | None

(Optional) For dynamic partitions - specify partition key

None
Example
@dg.asset(kinds={"polars"})
def my_asset(context: dg.AssetExecutionContext) -> pl.DataFrame:
    with track_timing(context, "transformation"):
        result = expensive_operation()
        # Automatically logs: "Completed transformation in 123.4ms"
        # Automatically adds processing_time_ms to metadata
    return result

# With multi-output asset
with track_timing(context, "loading", output_name="sales_joined"):
    result = load_data()
Source code in cogapp_libs/dagster/helpers.py
def __init__(
    self,
    context: dg.AssetExecutionContext | dg.OpExecutionContext,
    operation: str = "processing",
    log_message: str | None = None,
    output_name: str | None = None,
    mapping_key: str | None = None,
):
    self.context = context
    self.operation = operation
    self.log_message = log_message
    self.output_name = output_name
    self.mapping_key = mapping_key
    self.start_time = None
    self.elapsed_ms = None

write_json_output

Write DataFrame to JSON using DuckDB's native COPY command.

Creates parent directories if needed. Adds record_count, path, and preview to asset metadata automatically. Accepts both pandas and polars DataFrames.

Uses DuckDB's native JSON export for better performance - avoids the DataFrame → pandas → json serialization path.

Passes through all Dagster parameters to maintain full compatibility with multi-output assets and dynamic partitions.

Parameters:

Name Type Description Default
df DataFrame | 'pl.DataFrame'

DataFrame to write (pandas or polars)

required
output_path str | Path

Path to write JSON file

required
context AssetExecutionContext

Dagster asset execution context

required
extra_metadata dict | None

Additional metadata to include (optional)

None
output_name str | None

(Optional) For multi-output assets - specify which output this metadata is for

None
mapping_key str | None

(Optional) For dynamic partitions - specify partition key

None
Source code in cogapp_libs/dagster/io.py
def write_json_output(
    df: pd.DataFrame | "pl.DataFrame",
    output_path: str | Path,
    context: dg.AssetExecutionContext,
    extra_metadata: dict | None = None,
    output_name: str | None = None,
    mapping_key: str | None = None,
) -> None:
    """Write DataFrame to JSON using DuckDB's native COPY command.

    Creates parent directories if needed. Adds record_count, path, and preview
    to asset metadata automatically. Accepts both pandas and polars DataFrames.

    Uses DuckDB's native JSON export for better performance - avoids the
    DataFrame → pandas → json serialization path.

    Passes through all Dagster parameters to maintain full compatibility with
    multi-output assets and dynamic partitions.

    Args:
        df: DataFrame to write (pandas or polars)
        output_path: Path to write JSON file
        context: Dagster asset execution context
        extra_metadata: Additional metadata to include (optional)
        output_name: (Optional) For multi-output assets - specify which output this metadata is for
        mapping_key: (Optional) For dynamic partitions - specify partition key
    """
    # Convert string to Path if needed
    if isinstance(output_path, str):
        output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Use DuckDB's native JSON export
    conn = duckdb.connect(":memory:")
    conn.register("_df", df)
    conn.execute(f"COPY (SELECT * FROM _df) TO '{output_path}' (FORMAT JSON, ARRAY true)")

    # Get preview as pandas for markdown rendering
    preview_df = conn.sql("SELECT * FROM _df LIMIT 10").df()
    result = conn.sql("SELECT COUNT(*) FROM _df").fetchone()
    record_count = result[0] if result else 0
    conn.close()

    context.add_output_metadata(
        {
            "record_count": record_count,
            "json_output": dg.MetadataValue.path(str(output_path)),
            "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
            **(extra_metadata or {}),
        },
        output_name=output_name,
        mapping_key=mapping_key,
    )

Visualization

altair_to_metadata

Convert an Altair chart to Dagster metadata with embedded PNG.

Creates a base64-encoded PNG image and wraps it in markdown for display in the Dagster UI asset metadata panel.

Parameters:

Name Type Description Default
chart Chart

Altair Chart object (e.g., from df.plot.bar())

required
title str

Metadata key name (default: "chart")

'chart'

Returns:

Type Description
dict[str, MetadataValue]

Dictionary with single key containing MetadataValue.md() with embedded image

Example
chart = result.plot.bar(x="category", y="count")
context.add_output_metadata(altair_to_metadata(chart, "distribution"))
Source code in cogapp_libs/dagster/helpers.py
def altair_to_metadata(chart: alt.Chart, title: str = "chart") -> dict[str, dg.MetadataValue]:
    """Convert an Altair chart to Dagster metadata with embedded PNG.

    Creates a base64-encoded PNG image and wraps it in markdown for display
    in the Dagster UI asset metadata panel.

    Args:
        chart: Altair Chart object (e.g., from df.plot.bar())
        title: Metadata key name (default: "chart")

    Returns:
        Dictionary with single key containing MetadataValue.md() with embedded image

    Example:
        ```python
        chart = result.plot.bar(x="category", y="count")
        context.add_output_metadata(altair_to_metadata(chart, "distribution"))
        ```
    """
    # Save chart to PNG bytes
    png_bytes = io.BytesIO()
    chart.save(png_bytes, format="png")
    png_bytes.seek(0)

    # Encode as base64 and wrap in markdown
    b64_data = base64.b64encode(png_bytes.read()).decode("utf-8")
    md_content = f"![{title}](data:image/png;base64,{b64_data})"

    return {title: dg.MetadataValue.md(md_content)}

table_preview_to_metadata

Convert a Polars DataFrame to a markdown table for Dagster metadata.

Creates a clean markdown table preview suitable for Dagster UI display.

Parameters:

Name Type Description Default
df DataFrame

Polars DataFrame to preview

required
title str

Metadata key name (default: "preview")

'preview'
header str | None

Optional header text to display above the table

None
max_rows int

Maximum rows to include (default: 10)

10

Returns:

Type Description
dict[str, MetadataValue]

Dictionary with single key containing MetadataValue.md() with markdown table

Example
context.add_output_metadata(
    table_preview_to_metadata(
        result.head(5),
        title="top_sales",
        header="Top 5 Sales by Value"
    )
)
Source code in cogapp_libs/dagster/helpers.py
def table_preview_to_metadata(
    df: pl.DataFrame,
    title: str = "preview",
    header: str | None = None,
    max_rows: int = 10,
) -> dict[str, dg.MetadataValue]:
    """Convert a Polars DataFrame to a markdown table for Dagster metadata.

    Creates a clean markdown table preview suitable for Dagster UI display.

    Args:
        df: Polars DataFrame to preview
        title: Metadata key name (default: "preview")
        header: Optional header text to display above the table
        max_rows: Maximum rows to include (default: 10)

    Returns:
        Dictionary with single key containing MetadataValue.md() with markdown table

    Example:
        ```python
        context.add_output_metadata(
            table_preview_to_metadata(
                result.head(5),
                title="top_sales",
                header="Top 5 Sales by Value"
            )
        )
        ```
    """
    md_table = df.head(max_rows).to_pandas().to_markdown(index=False)
    if header:
        md_content = f"### {header}\n\n{md_table}"
    else:
        md_content = md_table
    return {title: dg.MetadataValue.md(md_content)}

Column Lineage

Helpers for tracking column-level data flow and displaying example values in the Dagster UI.

Lineage DSL

Build column lineage definitions with a compact, declarative syntax.

build_lineage

Build column lineage from compact definitions.

Parameters:

Name Type Description Default
passthrough dict[str, AssetKey] | None

{output_col: source_asset} - same column name in source

None
rename dict[str, tuple[AssetKey, str]] | None

{output_col: (source_asset, source_col)} - renamed columns

None
computed dict[str, list[tuple[AssetKey, str]]] | None

{output_col: [(asset, col), ...]} - derived from multiple columns

None

Returns:

Type Description
TableColumnLineage

TableColumnLineage for use with dagster/column_lineage metadata

Example

SALES_RAW = dg.AssetKey("dlt_harvest_sales_raw") ARTWORKS_RAW = dg.AssetKey("dlt_harvest_artworks_raw") lineage = build_lineage( ... passthrough={"sale_id": SALES_RAW, "title": ARTWORKS_RAW}, ... rename={"artwork_year": (ARTWORKS_RAW, "year")}, ... computed={"price_diff": [(SALES_RAW, "sale_price_usd"), (ARTWORKS_RAW, "price_usd")]}, ... )

Source code in cogapp_libs/dagster/lineage.py
def build_lineage(
    passthrough: dict[str, dg.AssetKey] | None = None,
    rename: dict[str, tuple[dg.AssetKey, str]] | None = None,
    computed: dict[str, list[tuple[dg.AssetKey, str]]] | None = None,
) -> dg.TableColumnLineage:
    """Build column lineage from compact definitions.

    Args:
        passthrough: {output_col: source_asset} - same column name in source
        rename: {output_col: (source_asset, source_col)} - renamed columns
        computed: {output_col: [(asset, col), ...]} - derived from multiple columns

    Returns:
        TableColumnLineage for use with dagster/column_lineage metadata

    Example:
        >>> SALES_RAW = dg.AssetKey("dlt_harvest_sales_raw")
        >>> ARTWORKS_RAW = dg.AssetKey("dlt_harvest_artworks_raw")
        >>> lineage = build_lineage(
        ...     passthrough={"sale_id": SALES_RAW, "title": ARTWORKS_RAW},
        ...     rename={"artwork_year": (ARTWORKS_RAW, "year")},
        ...     computed={"price_diff": [(SALES_RAW, "sale_price_usd"), (ARTWORKS_RAW, "price_usd")]},
        ... )
    """
    deps_by_column: dict[str, list[dg.TableColumnDep]] = {}

    # Passthrough: same column name
    if passthrough:
        for col, asset_key in passthrough.items():
            deps_by_column[col] = [dg.TableColumnDep(asset_key=asset_key, column_name=col)]

    # Rename: different column name in source
    if rename:
        for output_col, (asset_key, source_col) in rename.items():
            deps_by_column[output_col] = [
                dg.TableColumnDep(asset_key=asset_key, column_name=source_col)
            ]

    # Computed: derived from multiple columns
    if computed:
        for output_col, deps in computed.items():
            deps_by_column[output_col] = [
                dg.TableColumnDep(asset_key=asset_key, column_name=col) for asset_key, col in deps
            ]

    return dg.TableColumnLineage(deps_by_column=deps_by_column)

Example:

from cogapp_libs.dagster.lineage import build_lineage
import dagster as dg

SALES_RAW = dg.AssetKey("dlt_harvest_sales_raw")
ARTWORKS_RAW = dg.AssetKey("dlt_harvest_artworks_raw")
ARTISTS_RAW = dg.AssetKey("dlt_harvest_artists_raw")

SALES_TRANSFORM_LINEAGE = build_lineage(
    passthrough={
        "sale_id": SALES_RAW,
        "artwork_id": SALES_RAW,
        "title": ARTWORKS_RAW,
        "artist_name": ARTISTS_RAW,
    },
    rename={
        "artwork_year": (ARTWORKS_RAW, "year"),
        "list_price_usd": (ARTWORKS_RAW, "price_usd"),
    },
    computed={
        "price_diff": [(SALES_RAW, "sale_price_usd"), (ARTWORKS_RAW, "price_usd")],
    },
)

context.add_output_metadata({
    "dagster/column_lineage": SALES_TRANSFORM_LINEAGE,
})

passthrough_lineage

Build lineage for passthrough assets (output columns match input).

Parameters:

Name Type Description Default
source AssetKey

Source asset key

required
columns list[str]

List of column names that pass through unchanged

required
renames dict[str, str] | None

{output_col: source_col} for renamed columns

None

Returns:

Type Description
TableColumnLineage

TableColumnLineage for use with dagster/column_lineage metadata

Example

lineage = passthrough_lineage( ... source=dg.AssetKey("sales_transform"), ... columns=["sale_id", "title", "artist_name"], ... renames={"price_difference": "price_diff"}, ... )

Source code in cogapp_libs/dagster/lineage.py
def passthrough_lineage(
    source: dg.AssetKey,
    columns: list[str],
    renames: dict[str, str] | None = None,
) -> dg.TableColumnLineage:
    """Build lineage for passthrough assets (output columns match input).

    Args:
        source: Source asset key
        columns: List of column names that pass through unchanged
        renames: {output_col: source_col} for renamed columns

    Returns:
        TableColumnLineage for use with dagster/column_lineage metadata

    Example:
        >>> lineage = passthrough_lineage(
        ...     source=dg.AssetKey("sales_transform"),
        ...     columns=["sale_id", "title", "artist_name"],
        ...     renames={"price_difference": "price_diff"},
        ... )
    """
    deps_by_column: dict[str, list[dg.TableColumnDep]] = {}

    for col in columns:
        deps_by_column[col] = [dg.TableColumnDep(asset_key=source, column_name=col)]

    if renames:
        for output_col, source_col in renames.items():
            deps_by_column[output_col] = [
                dg.TableColumnDep(asset_key=source, column_name=source_col)
            ]

    return dg.TableColumnLineage(deps_by_column=deps_by_column)

Example:

from cogapp_libs.dagster.lineage import passthrough_lineage
import dagster as dg

# For output assets where columns pass through from a single source
SALES_OUTPUT_LINEAGE = passthrough_lineage(
    source=dg.AssetKey("sales_transform_soda"),
    columns=["sale_id", "artwork_id", "title", "artist_name", "pct_change"],
    renames={"price_difference": "price_diff"},  # output_col: source_col
)

DuckDB View Registration

register_harvest_views

Register DuckDB views for harvest parquet files.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

DuckDB connection

required
harvest_dir str

Base harvest directory (e.g., "data/harvest")

required
views list[str] | None

View names to register (default: all from HARVEST_VIEWS)

None
Example

with duckdb.connect() as conn: ... register_harvest_views(conn, "data/harvest", ["sales", "artworks"]) ... result = conn.sql("SELECT * FROM sales LIMIT 5")

Source code in cogapp_libs/dagster/lineage.py
def register_harvest_views(
    conn: duckdb.DuckDBPyConnection,
    harvest_dir: str,
    views: list[str] | None = None,
) -> None:
    """Register DuckDB views for harvest parquet files.

    Args:
        conn: DuckDB connection
        harvest_dir: Base harvest directory (e.g., "data/harvest")
        views: View names to register (default: all from HARVEST_VIEWS)

    Example:
        >>> with duckdb.connect() as conn:
        ...     register_harvest_views(conn, "data/harvest", ["sales", "artworks"])
        ...     result = conn.sql("SELECT * FROM sales LIMIT 5")
    """
    views_to_register = views if views is not None else list(HARVEST_VIEWS.keys())
    for view_name in views_to_register:
        subdir = HARVEST_VIEWS.get(view_name, view_name)
        parquet_glob = f"{harvest_dir}/raw/{subdir}/**/*.parquet"
        conn.execute(f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM '{parquet_glob}'")

Simplifies registering harvest parquet files as DuckDB views.

Example:

from cogapp_libs.dagster.lineage import register_harvest_views

with duckdb.get_connection() as conn:
    # Register views for sales, artworks, and artists
    register_harvest_views(conn, paths.harvest_dir, ["sales", "artworks", "artists"])

    # Now query using simple view names
    conn.sql("SELECT * FROM sales JOIN artworks USING (artwork_id)")

Default view mappings (defined in HARVEST_VIEWS):

View Name Harvest Subdirectory
sales raw/sales_raw/**/*.parquet
artworks raw/artworks_raw/**/*.parquet
artists raw/artists_raw/**/*.parquet
media raw/media/**/*.parquet

Metadata Collection

Helpers that consolidate lineage, stats, examples, and timing into a single call.

collect_parquet_metadata

Add standard parquet asset metadata to context.

Parameters:

Name Type Description Default
context AssetExecutionContext

Dagster asset execution context

required
conn DuckDBPyConnection

DuckDB connection (for running stats query and getting examples)

required
output_path str

Path to the output parquet file

required
lineage TableColumnLineage

Column lineage definition

required
stats_sql str

SQL returning stat columns (first row used as metadata)

required
example_id tuple[str, int | str] | None

(id_field, id_value) for example row lookup

None
extra_metadata dict | None

Additional metadata to include

None
elapsed_ms float | None

Processing time in milliseconds

None
Example

collect_parquet_metadata( ... context, conn, "data/sales.parquet", ... lineage=SALES_LINEAGE, ... stats_sql="SELECT count(*) as record_count, sum(value) as total FROM 'data/sales.parquet'", ... example_id=("sale_id", 2), ... )

Source code in cogapp_libs/dagster/lineage.py
def collect_parquet_metadata(
    context: dg.AssetExecutionContext,
    conn: duckdb.DuckDBPyConnection,
    output_path: str,
    lineage: dg.TableColumnLineage,
    stats_sql: str,
    example_id: tuple[str, int | str] | None = None,
    extra_metadata: dict | None = None,
    elapsed_ms: float | None = None,
) -> None:
    """Add standard parquet asset metadata to context.

    Args:
        context: Dagster asset execution context
        conn: DuckDB connection (for running stats query and getting examples)
        output_path: Path to the output parquet file
        lineage: Column lineage definition
        stats_sql: SQL returning stat columns (first row used as metadata)
        example_id: (id_field, id_value) for example row lookup
        extra_metadata: Additional metadata to include
        elapsed_ms: Processing time in milliseconds

    Example:
        >>> collect_parquet_metadata(
        ...     context, conn, "data/sales.parquet",
        ...     lineage=SALES_LINEAGE,
        ...     stats_sql="SELECT count(*) as record_count, sum(value) as total FROM 'data/sales.parquet'",
        ...     example_id=("sale_id", 2),
        ... )
    """
    metadata: dict[str, Any] = {
        "dagster/column_lineage": lineage,
    }

    # Get stats from SQL
    result = conn.sql(stats_sql)
    row = result.fetchone()
    if row:
        for col, val in zip(result.columns, row):
            # Convert numeric types for JSON serialization
            if isinstance(val, (int, float)) and val is not None:
                metadata[col] = float(val) if isinstance(val, float) else val
            elif val is not None:
                metadata[col] = val

    # Get example row
    if example_id:
        id_field, id_value = example_id
        examples = get_example_row(conn, output_path, id_field, id_value)
        metadata["lineage_examples"] = examples

    # Add timing
    if elapsed_ms is not None:
        metadata["processing_time_ms"] = round(elapsed_ms, 2)

    # Merge extra metadata
    if extra_metadata:
        metadata.update(extra_metadata)

    context.add_output_metadata(metadata)

Example:

from cogapp_libs.dagster.lineage import collect_parquet_metadata, register_harvest_views

@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_transform_soda(context, duckdb, paths, output_paths) -> str:
    start = time.perf_counter()
    output_path = Path(output_paths.transforms_dir) / "sales.parquet"

    with duckdb.get_connection() as conn:
        register_harvest_views(conn, paths.harvest_dir, ["sales", "artworks", "artists"])
        conn.sql(TRANSFORM_SQL).write_parquet(str(output_path), compression="zstd")

        collect_parquet_metadata(
            context, conn, str(output_path),
            lineage=SALES_TRANSFORM_LINEAGE,
            stats_sql=f"SELECT count(*) AS record_count, sum(value) AS total FROM '{output_path}'",
            example_id=("sale_id", 2),
            elapsed_ms=(time.perf_counter() - start) * 1000,
        )

    return str(output_path)

collect_json_output_metadata

Add standard JSON output asset metadata to context.

Parameters:

Name Type Description Default
context AssetExecutionContext

Dagster asset execution context

required
conn DuckDBPyConnection

DuckDB connection

required
input_path str

Path to input parquet file (for example row)

required
output_path str

Path to output JSON file

required
lineage TableColumnLineage

Column lineage definition

required
example_id tuple[str, int | str] | None

(id_field, id_value) for example row lookup

None
example_renames dict[str, str] | None

{output_col: input_col} for renamed columns in examples

None
extra_metadata dict | None

Additional metadata to include

None
elapsed_ms float | None

Processing time in milliseconds

None
Example

collect_json_output_metadata( ... context, conn, ... input_path="data/sales_transform.parquet", ... output_path="data/output/sales.json", ... lineage=SALES_OUTPUT_LINEAGE, ... example_id=("sale_id", 2), ... example_renames={"price_difference": "price_diff"}, ... )

Source code in cogapp_libs/dagster/lineage.py
def collect_json_output_metadata(
    context: dg.AssetExecutionContext,
    conn: duckdb.DuckDBPyConnection,
    input_path: str,
    output_path: str,
    lineage: dg.TableColumnLineage,
    example_id: tuple[str, int | str] | None = None,
    example_renames: dict[str, str] | None = None,
    extra_metadata: dict | None = None,
    elapsed_ms: float | None = None,
) -> None:
    """Add standard JSON output asset metadata to context.

    Args:
        context: Dagster asset execution context
        conn: DuckDB connection
        input_path: Path to input parquet file (for example row)
        output_path: Path to output JSON file
        lineage: Column lineage definition
        example_id: (id_field, id_value) for example row lookup
        example_renames: {output_col: input_col} for renamed columns in examples
        extra_metadata: Additional metadata to include
        elapsed_ms: Processing time in milliseconds

    Example:
        >>> collect_json_output_metadata(
        ...     context, conn,
        ...     input_path="data/sales_transform.parquet",
        ...     output_path="data/output/sales.json",
        ...     lineage=SALES_OUTPUT_LINEAGE,
        ...     example_id=("sale_id", 2),
        ...     example_renames={"price_difference": "price_diff"},
        ... )
    """
    metadata: dict[str, Any] = {
        "dagster/column_lineage": lineage,
        "json_output": dg.MetadataValue.path(output_path),
    }

    # Get example row from input and apply renames
    if example_id:
        id_field, id_value = example_id
        examples = get_example_row(conn, input_path, id_field, id_value)
        if example_renames:
            for output_col, input_col in example_renames.items():
                examples[output_col] = examples.pop(input_col, None)
        metadata["lineage_examples"] = examples

    # Add timing
    if elapsed_ms is not None:
        metadata["processing_time_ms"] = round(elapsed_ms, 2)

    # Merge extra metadata
    if extra_metadata:
        metadata.update(extra_metadata)

    context.add_output_metadata(metadata)

Example:

from cogapp_libs.dagster.lineage import collect_json_output_metadata

@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_output_soda(context, duckdb, sales_transform_soda: str, output_paths) -> str:
    start = time.perf_counter()

    with duckdb.get_connection() as conn:
        # Write JSON output...

        collect_json_output_metadata(
            context, conn,
            input_path=sales_transform_soda,
            output_path=output_paths.sales_json,
            lineage=SALES_OUTPUT_LINEAGE,
            example_id=("sale_id", 2),
            example_renames={"price_difference": "price_diff"},  # Handle column renames
            extra_metadata={"record_count": count, "filter_threshold": "$50,000"},
            elapsed_ms=(time.perf_counter() - start) * 1000,
        )

    return sales_transform_soda

Example Row Helpers

get_example_row

Get one example row from a parquet file for lineage display.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

DuckDB connection

required
path str

Path to parquet file (can include globs)

required
id_field str | None

Optional field name to filter by (e.g., "sale_id")

None
id_value int | str | None

Optional value to match (e.g., 2)

None

Returns:

Type Description
dict[str, str | None]

Dict mapping column names to formatted values.

dict[str, str | None]

If id_field/id_value provided, returns that specific record.

dict[str, str | None]

Otherwise returns the first row.

Example

examples = get_example_row(conn, "data/sales.parquet", "sale_id", 2)

{"sale_id": "2", "price": "$86.3M", "title": "Day Dream"}

Source code in cogapp_libs/dagster/lineage.py
def get_example_row(
    conn: duckdb.DuckDBPyConnection,
    path: str,
    id_field: str | None = None,
    id_value: int | str | None = None,
) -> dict[str, str | None]:
    """Get one example row from a parquet file for lineage display.

    Args:
        conn: DuckDB connection
        path: Path to parquet file (can include globs)
        id_field: Optional field name to filter by (e.g., "sale_id")
        id_value: Optional value to match (e.g., 2)

    Returns:
        Dict mapping column names to formatted values.
        If id_field/id_value provided, returns that specific record.
        Otherwise returns the first row.

    Example:
        >>> examples = get_example_row(conn, "data/sales.parquet", "sale_id", 2)
        >>> # {"sale_id": "2", "price": "$86.3M", "title": "Day Dream"}
    """
    try:
        if id_field and id_value is not None:
            val = f"'{id_value}'" if isinstance(id_value, str) else id_value
            result = conn.sql(f"SELECT * FROM '{path}' WHERE {id_field} = {val} LIMIT 1")
        else:
            result = conn.sql(f"SELECT * FROM '{path}' LIMIT 1")

        row = result.fetchone()
        if not row:
            return {}
        return {col: format_value(val) for col, val in zip(result.columns, row)}
    except Exception:
        return {}

Example:

from cogapp_libs.dagster.lineage import get_example_row

with duckdb.get_connection() as conn:
    # Get a specific row by ID
    examples = get_example_row(conn, "data/sales.parquet", "sale_id", 2)
    # {"sale_id": "2", "sale_price_usd": "$86.3M", "title": "Day Dream"}

context.add_output_metadata({
    "lineage_examples": examples,
})

format_value

Format a value for display in lineage examples.

Formats numbers with appropriate units: - >= 1M: $1.2M - >= 1K: $1.5K - floats: $123.45 - ints: 1,234

Parameters:

Name Type Description Default
val Any

Any value to format

required

Returns:

Type Description
str | None

Formatted string or None

Source code in cogapp_libs/dagster/lineage.py
def format_value(val: Any) -> str | None:
    """Format a value for display in lineage examples.

    Formats numbers with appropriate units:
    - >= 1M: $1.2M
    - >= 1K: $1.5K
    - floats: $123.45
    - ints: 1,234

    Args:
        val: Any value to format

    Returns:
        Formatted string or None
    """
    if val is None:
        return None
    if isinstance(val, (int, float)):
        abs_val = abs(val)
        sign = "-" if val < 0 else ""
        if abs_val >= 1_000_000:
            return f"{sign}${abs_val / 1_000_000:.1f}M"
        if abs_val >= 1_000:
            return f"{sign}${abs_val / 1_000:.1f}K"
        if isinstance(val, float):
            return f"{sign}${abs_val:.2f}"
        return f"{val:,}"
    return str(val)

Formats values for display in lineage visualizations:

  • 86300000 → $86.3M
  • 1500 → $1.5K
  • -560000 → -$560K
  • 123.45 → $123.45

add_lineage_examples_to_dlt_results

Add lineage examples to dlt MaterializeResults.

Simplifies adding example row data to dlt harvest assets.

Parameters:

Name Type Description Default
results list[MaterializeResult | AssetMaterialization]

List of MaterializeResult from dlt.run()

required
harvest_dir str

Base directory for harvest parquet files

required
config dict[str, dict]

Dict mapping asset names to config with keys: - path: Relative path/glob to parquet files - id_field: Optional field name to filter by - id_value: Optional value to match

required

Yields:

Type Description
MaterializeResult | AssetMaterialization

MaterializeResult with lineage_examples metadata added

Example
ASSET_CONFIG = {
    "dlt_harvest_sales_raw": {
        "path": "raw/sales_raw/**/*.parquet",
        "id_field": "sale_id",
        "id_value": 2,
    },
}

@dg.multi_asset(specs=HARVEST_ASSET_SPECS)
def dlt_harvest_assets(context, dlt, paths, database):
    results = list(dlt.run(...))
    yield from add_lineage_examples_to_dlt_results(
        results, paths.harvest_dir, ASSET_CONFIG
    )
Source code in cogapp_libs/dagster/lineage.py
def add_lineage_examples_to_dlt_results(
    results: list[dg.MaterializeResult | dg.AssetMaterialization],
    harvest_dir: str,
    config: dict[str, dict],
) -> Iterator[dg.MaterializeResult | dg.AssetMaterialization]:
    """Add lineage examples to dlt MaterializeResults.

    Simplifies adding example row data to dlt harvest assets.

    Args:
        results: List of MaterializeResult from dlt.run()
        harvest_dir: Base directory for harvest parquet files
        config: Dict mapping asset names to config with keys:
            - path: Relative path/glob to parquet files
            - id_field: Optional field name to filter by
            - id_value: Optional value to match

    Yields:
        MaterializeResult with lineage_examples metadata added

    Example:
        ```python
        ASSET_CONFIG = {
            "dlt_harvest_sales_raw": {
                "path": "raw/sales_raw/**/*.parquet",
                "id_field": "sale_id",
                "id_value": 2,
            },
        }

        @dg.multi_asset(specs=HARVEST_ASSET_SPECS)
        def dlt_harvest_assets(context, dlt, paths, database):
            results = list(dlt.run(...))
            yield from add_lineage_examples_to_dlt_results(
                results, paths.harvest_dir, ASSET_CONFIG
            )
        ```
    """
    with duckdb.connect(":memory:") as conn:
        for result in results:
            asset_key = result.asset_key
            if asset_key:
                asset_name = asset_key.to_user_string()
                cfg = config.get(asset_name)
                if cfg:
                    parquet_path = f"{harvest_dir}/{cfg['path']}"
                    examples = get_example_row(
                        conn,
                        parquet_path,
                        cfg.get("id_field"),
                        cfg.get("id_value"),
                    )
                    if examples:
                        metadata = dict(result.metadata) if result.metadata else {}
                        metadata["lineage_examples"] = examples
                        yield dg.MaterializeResult(asset_key=asset_key, metadata=metadata)
                        continue
            yield result

Example:

from cogapp_libs.dagster.lineage import add_lineage_examples_to_dlt_results

ASSET_CONFIG = {
    "dlt_harvest_sales_raw": {
        "path": "raw/sales_raw/**/*.parquet",
        "id_field": "sale_id",
        "id_value": 2,
    },
}

@dg.multi_asset(specs=HARVEST_SPECS)
def dlt_harvest_assets(context, dlt, paths):
    results = list(dlt.run(...))
    yield from add_lineage_examples_to_dlt_results(
        results, paths.harvest_dir, ASSET_CONFIG
    )

DuckDB Asset Factories

Declarative factories for building DuckDB-based Dagster assets with minimal boilerplate.

duckdb_transform_asset

Create a DuckDB SQL transform asset.

The asset executes SQL and writes results to Parquet. Sources (harvest views and upstream transforms) are registered as DuckDB views, so SQL can reference them by name.

Parameters:

Name Type Description Default
name str

Asset name (also used for output parquet filename)

required
sql str

SQL query to execute (can reference views by name)

required
harvest_views list[str] | None

Harvest tables to register as views (e.g., ["sales", "artworks"])

None
upstream list[str] | None

Upstream transform assets to depend on (registered as views)

None
lineage TableColumnLineage | None

Column lineage definition for Dagster catalog

None
example_id tuple[str, int | str] | None

(id_field, id_value) for example row in metadata

None
stats_sql str | None

Custom stats SQL (default: count + numeric sums from output)

None
group_name str

Dagster asset group

'transform'
kinds set[str] | None

Asset kinds (default: {"duckdb", "sql"})

None
check_fn Any | None

Optional asset check function

None

Returns:

Type Description
AssetsDefinition

AssetsDefinition configured with ParquetPathIOManager

Example

First transform: reads from harvest

sales_enriched = duckdb_transform_asset( name="sales_enriched", sql="SELECT s.*, a.title FROM sales s JOIN artworks a USING (artwork_id)", harvest_views=["sales", "artworks"], lineage=SALES_ENRICHED_LINEAGE, example_id=("sale_id", 2), )

Chained transform: reads from upstream

sales_filtered = duckdb_transform_asset( name="sales_filtered", sql="SELECT * FROM sales_enriched WHERE sale_price_usd > 1000", upstream=["sales_enriched"], )

Mixed sources: upstream + harvest

sales_with_artists = duckdb_transform_asset( name="sales_with_artists", sql="SELECT e.*, ar.name FROM sales_enriched e JOIN artists ar USING (artist_id)", upstream=["sales_enriched"], harvest_views=["artists"], )

Source code in cogapp_libs/dagster/duckdb.py
def duckdb_transform_asset(
    name: str,
    sql: str,
    *,
    harvest_views: list[str] | None = None,
    upstream: list[str] | None = None,
    lineage: dg.TableColumnLineage | None = None,
    example_id: tuple[str, int | str] | None = None,
    stats_sql: str | None = None,
    group_name: str = "transform",
    kinds: set[str] | None = None,
    check_fn: Any | None = None,
) -> dg.AssetsDefinition:
    """Create a DuckDB SQL transform asset.

    The asset executes SQL and writes results to Parquet. Sources (harvest views
    and upstream transforms) are registered as DuckDB views, so SQL can reference
    them by name.

    Args:
        name: Asset name (also used for output parquet filename)
        sql: SQL query to execute (can reference views by name)
        harvest_views: Harvest tables to register as views (e.g., ["sales", "artworks"])
        upstream: Upstream transform assets to depend on (registered as views)
        lineage: Column lineage definition for Dagster catalog
        example_id: (id_field, id_value) for example row in metadata
        stats_sql: Custom stats SQL (default: count + numeric sums from output)
        group_name: Dagster asset group
        kinds: Asset kinds (default: {"duckdb", "sql"})
        check_fn: Optional asset check function

    Returns:
        AssetsDefinition configured with ParquetPathIOManager

    Example:
        # First transform: reads from harvest
        sales_enriched = duckdb_transform_asset(
            name="sales_enriched",
            sql="SELECT s.*, a.title FROM sales s JOIN artworks a USING (artwork_id)",
            harvest_views=["sales", "artworks"],
            lineage=SALES_ENRICHED_LINEAGE,
            example_id=("sale_id", 2),
        )

        # Chained transform: reads from upstream
        sales_filtered = duckdb_transform_asset(
            name="sales_filtered",
            sql="SELECT * FROM sales_enriched WHERE sale_price_usd > 1000",
            upstream=["sales_enriched"],
        )

        # Mixed sources: upstream + harvest
        sales_with_artists = duckdb_transform_asset(
            name="sales_with_artists",
            sql="SELECT e.*, ar.name FROM sales_enriched e JOIN artists ar USING (artist_id)",
            upstream=["sales_enriched"],
            harvest_views=["artists"],
        )
    """
    asset_kinds = kinds or {"duckdb", "sql"}
    harvest_views = harvest_views or []
    upstream = upstream or []

    # Build deps: harvest deps if using harvest_views, plus any upstream assets
    deps: list[dg.AssetDep | str] = []
    if harvest_views:
        deps.extend(
            [
                dg.AssetDep("dlt_harvest_sales_raw"),
                dg.AssetDep("dlt_harvest_artworks_raw"),
                dg.AssetDep("dlt_harvest_artists_raw"),
                dg.AssetDep("dlt_harvest_media"),
            ]
        )

    # Upstream assets are handled via function parameters, not deps
    # (they're passed through ParquetPathIOManager)

    # Import resource types for proper Dagster injection
    # These are imported here to avoid circular imports at module level
    from honey_duck.defs.shared.resources import OutputPathsResource, PathsResource

    # Build the asset function dynamically based on upstream dependencies
    if upstream:
        # Has upstream dependencies - they come as string paths
        def make_asset_fn(
            upstream_names: list[str],
            paths_cls: type,
            output_paths_cls: type,
        ):
            # Create function with explicit upstream parameters
            def _asset_fn(context, duckdb, paths, output_paths, **kwargs) -> str:
                return _execute_transform(
                    context=context,
                    duckdb=duckdb,
                    paths=paths,
                    output_paths=output_paths,
                    name=name,
                    sql=sql,
                    harvest_views=harvest_views,
                    upstream_assets=kwargs,
                    lineage=lineage,
                    example_id=example_id,
                    stats_sql=stats_sql,
                )

            # Set proper annotations for Dagster resource injection
            # NOTE: upstream assets must be POSITIONAL_OR_KEYWORD (not KEYWORD_ONLY)
            # for Dagster to auto-wire them via the IO manager
            import inspect

            params = [
                inspect.Parameter(
                    "context",
                    inspect.Parameter.POSITIONAL_OR_KEYWORD,
                    annotation=dg.AssetExecutionContext,
                ),
                inspect.Parameter(
                    "duckdb", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=DuckDBResource
                ),
                inspect.Parameter(
                    "paths", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=paths_cls
                ),
                inspect.Parameter(
                    "output_paths",
                    inspect.Parameter.POSITIONAL_OR_KEYWORD,
                    annotation=output_paths_cls,
                ),
            ]
            # Add upstream assets as positional parameters for auto-wiring
            for upstream_name in upstream_names:
                params.append(
                    inspect.Parameter(
                        upstream_name, inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=str
                    )
                )
            _asset_fn.__signature__ = inspect.Signature(params, return_annotation=str)  # type: ignore
            _asset_fn.__annotations__ = {
                "context": dg.AssetExecutionContext,
                "duckdb": DuckDBResource,
                "paths": paths_cls,
                "output_paths": output_paths_cls,
                "return": str,
                **{name: str for name in upstream_names},
            }
            return _asset_fn

        asset_fn = make_asset_fn(upstream, PathsResource, OutputPathsResource)
    else:
        # No upstream - simpler signature with proper type annotations
        def asset_fn(
            context: dg.AssetExecutionContext,
            duckdb: DuckDBResource,
            paths: PathsResource,
            output_paths: OutputPathsResource,
        ) -> str:
            return _execute_transform(
                context=context,
                duckdb=duckdb,
                paths=paths,
                output_paths=output_paths,
                name=name,
                sql=sql,
                harvest_views=harvest_views,
                upstream_assets={},
                lineage=lineage,
                example_id=example_id,
                stats_sql=stats_sql,
            )

    # Create the asset
    asset_def = dg.asset(
        name=name,
        kinds=asset_kinds,
        deps=deps if deps else None,
        group_name=group_name,
        io_manager_key="parquet_path_io_manager",
    )(asset_fn)

    return asset_def

Example:

from cogapp_libs.dagster.duckdb import duckdb_transform_asset

sales_transform_soda = duckdb_transform_asset(
    name="sales_transform_soda",
    sql="""
        SELECT s.sale_id, s.sale_date, s.sale_price_usd,
               aw.title, ar.name.trim().upper() AS artist_name
        FROM sales s
        LEFT JOIN artworks aw USING (artwork_id)
        LEFT JOIN artists ar USING (artist_id)
        ORDER BY s.sale_date DESC
    """,
    harvest_views=["sales", "artworks", "artists"],
    lineage=SALES_TRANSFORM_LINEAGE,
    example_id=("sale_id", 2),
    group_name="transform_soda",
    kinds={"duckdb", "sql"},
)

duckdb_output_asset

Create a DuckDB output asset that writes to JSON.

Reads from an upstream transform's parquet and writes filtered/transformed output to JSON format.

Parameters:

Name Type Description Default
name str

Asset name

required
source str

Upstream transform asset name (provides parquet path)

required
output_path_attr str | None

Attribute name on OutputPathsResource for output path (e.g., "sales_soda" for OutputPathsResource.sales_soda)

None
output_format str

Output format ("json" supported)

'json'
where str | None

Optional SQL WHERE clause for filtering

None
select str

SQL SELECT clause (default "*", use for renames/excludes)

'*'
lineage TableColumnLineage | None

Column lineage definition

None
example_id tuple[str, int | str] | None

(id_field, id_value) for example row

None
example_renames dict[str, str] | None

Column renames to apply to example data

None
group_name str

Dagster asset group

'output'
kinds set[str] | None

Asset kinds (default: {"duckdb", "json"})

None
deps list[str] | None

Additional asset dependencies (e.g., for ordering)

None

Returns:

Type Description
AssetsDefinition

AssetsDefinition configured with ParquetPathIOManager

Example

sales_output = duckdb_output_asset( name="sales_output_soda", source="sales_transform_soda", output_path_attr="sales_soda", where="sale_price_usd >= 50000", select="* EXCLUDE (price_diff), price_diff AS price_difference", lineage=SALES_OUTPUT_LINEAGE, example_id=("sale_id", 2), example_renames={"price_difference": "price_diff"}, deps=["artworks_output_soda"], # Ordering dependency )

Source code in cogapp_libs/dagster/duckdb.py
def duckdb_output_asset(
    name: str,
    source: str,
    *,
    output_path_attr: str | None = None,
    output_format: str = "json",
    where: str | None = None,
    select: str = "*",
    lineage: dg.TableColumnLineage | None = None,
    example_id: tuple[str, int | str] | None = None,
    example_renames: dict[str, str] | None = None,
    group_name: str = "output",
    kinds: set[str] | None = None,
    deps: list[str] | None = None,
) -> dg.AssetsDefinition:
    """Create a DuckDB output asset that writes to JSON.

    Reads from an upstream transform's parquet and writes filtered/transformed
    output to JSON format.

    Args:
        name: Asset name
        source: Upstream transform asset name (provides parquet path)
        output_path_attr: Attribute name on OutputPathsResource for output path
            (e.g., "sales_soda" for OutputPathsResource.sales_soda)
        output_format: Output format ("json" supported)
        where: Optional SQL WHERE clause for filtering
        select: SQL SELECT clause (default "*", use for renames/excludes)
        lineage: Column lineage definition
        example_id: (id_field, id_value) for example row
        example_renames: Column renames to apply to example data
        group_name: Dagster asset group
        kinds: Asset kinds (default: {"duckdb", "json"})
        deps: Additional asset dependencies (e.g., for ordering)

    Returns:
        AssetsDefinition configured with ParquetPathIOManager

    Example:
        sales_output = duckdb_output_asset(
            name="sales_output_soda",
            source="sales_transform_soda",
            output_path_attr="sales_soda",
            where="sale_price_usd >= 50000",
            select="* EXCLUDE (price_diff), price_diff AS price_difference",
            lineage=SALES_OUTPUT_LINEAGE,
            example_id=("sale_id", 2),
            example_renames={"price_difference": "price_diff"},
            deps=["artworks_output_soda"],  # Ordering dependency
        )
    """
    # Import resource types for proper Dagster injection
    from honey_duck.defs.shared.resources import OutputPathsResource

    asset_kinds = kinds or {"duckdb", output_format}
    extra_deps = deps or []

    def asset_fn(
        context: dg.AssetExecutionContext,
        duckdb: DuckDBResource,
        output_paths: OutputPathsResource,
        **upstream: str,
    ) -> str:
        start = time.perf_counter()

        # Get source path from upstream
        source_path = upstream[source]

        # Determine output path
        if output_path_attr:
            output_path = getattr(output_paths, output_path_attr)
        else:
            # Fallback: try to derive from name (e.g., sales_output_soda -> sales_soda)
            attr_name = name.replace("_output", "")
            output_path = getattr(output_paths, attr_name, None)
            if not output_path:
                raise ValueError(
                    f"Could not determine output path for {name}. "
                    f"Please specify output_path_attr parameter."
                )
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)

        with duckdb.get_connection() as conn:
            # Build query
            query = f"SELECT {select} FROM '{source_path}'"
            if where:
                query += f" WHERE {where}"

            # Get counts for metadata
            total = conn.sql(f"SELECT count(*) FROM '{source_path}'").fetchone()[0]
            if where:
                filtered_count = conn.sql(
                    f"SELECT count(*) FROM '{source_path}' WHERE {where}"
                ).fetchone()[0]
            else:
                filtered_count = total

            # Write output
            conn.execute(f"COPY ({query}) TO '{output_path}' (FORMAT JSON, ARRAY true)")

            elapsed_ms = (time.perf_counter() - start) * 1000

            # Collect metadata
            if lineage:
                collect_json_output_metadata(
                    context,
                    conn,
                    input_path=source_path,
                    output_path=output_path,
                    lineage=lineage,
                    example_id=example_id,
                    example_renames=example_renames,
                    extra_metadata={
                        "record_count": filtered_count,
                        "filtered_from": total,
                    },
                    elapsed_ms=elapsed_ms,
                )
            else:
                context.add_output_metadata(
                    {
                        "record_count": filtered_count,
                        "filtered_from": total,
                        "json_output": dg.MetadataValue.path(output_path),
                        "processing_time_ms": round(elapsed_ms, 2),
                    }
                )

        context.log.info(f"Output {filtered_count} records to {name} in {elapsed_ms:.1f}ms")
        return source_path  # Return source path for downstream

    # Set up function signature for Dagster with proper type annotations
    # NOTE: upstream asset must be POSITIONAL_OR_KEYWORD (not KEYWORD_ONLY)
    # for Dagster to auto-wire it via the IO manager
    import inspect

    params = [
        inspect.Parameter(
            "context", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=dg.AssetExecutionContext
        ),
        inspect.Parameter(
            "duckdb", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=DuckDBResource
        ),
        inspect.Parameter(source, inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=str),
        inspect.Parameter(
            "output_paths", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=OutputPathsResource
        ),
    ]
    asset_fn.__signature__ = inspect.Signature(params, return_annotation=str)  # type: ignore
    asset_fn.__annotations__ = {
        "context": dg.AssetExecutionContext,
        "duckdb": DuckDBResource,
        source: str,
        "output_paths": OutputPathsResource,
        "return": str,
    }

    return dg.asset(
        name=name,
        kinds=asset_kinds,
        deps=extra_deps if extra_deps else None,
        group_name=group_name,
        io_manager_key="parquet_path_io_manager",
    )(asset_fn)

Example:

from cogapp_libs.dagster.duckdb import duckdb_output_asset

sales_output_soda = duckdb_output_asset(
    name="sales_output_soda",
    source="sales_transform_soda",
    output_path_attr="sales_soda",
    where="sale_price_usd >= 50_000",
    select="* EXCLUDE (price_diff), price_diff AS price_difference",
    lineage=SALES_OUTPUT_LINEAGE,
    example_id=("sale_id", 2),
    example_renames={"price_difference": "price_diff"},
    group_name="output_soda",
    kinds={"duckdb", "json"},
)

DuckDBContext

Context manager for DuckDB operations.

Source code in cogapp_libs/dagster/duckdb.py
def __init__(
    self,
    duckdb: DuckDBResource,
    harvest_dir: str | None,
    harvest_views: list[str],
    upstream_assets: dict[str, str],
):
    import duckdb as duckdb_module

    self._duckdb = duckdb
    self._harvest_dir = harvest_dir
    self._harvest_views = harvest_views
    self._upstream_assets = upstream_assets
    self._conn: duckdb_module.DuckDBPyConnection | None = None
    self._start_time: float | None = None

elapsed_ms property

elapsed_ms: float

Elapsed time since context entry in milliseconds.

execute

execute(sql: str) -> None

Execute SQL statement.

Source code in cogapp_libs/dagster/duckdb.py
def execute(self, sql: str) -> None:
    """Execute SQL statement."""
    self._get_conn().execute(sql)

sql

sql(query: str)

Execute SQL query and return result.

Source code in cogapp_libs/dagster/duckdb.py
def sql(self, query: str):
    """Execute SQL query and return result."""
    return self._get_conn().sql(query)

fetchone

fetchone(query: str) -> tuple

Execute query and fetch one row.

Source code in cogapp_libs/dagster/duckdb.py
def fetchone(self, query: str) -> tuple:
    """Execute query and fetch one row."""
    return self._get_conn().sql(query).fetchone()

fetchall

fetchall(query: str) -> list[tuple]

Execute query and fetch all rows.

Source code in cogapp_libs/dagster/duckdb.py
def fetchall(self, query: str) -> list[tuple]:
    """Execute query and fetch all rows."""
    return self._get_conn().sql(query).fetchall()

write_parquet

write_parquet(sql: str, output_path: str | Path, compression: str = 'zstd') -> str

Execute SQL and write results to parquet.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
output_path str | Path

Path to write parquet file

required
compression str

Compression algorithm (default: zstd)

'zstd'

Returns:

Type Description
str

String path to written parquet file

Source code in cogapp_libs/dagster/duckdb.py
def write_parquet(
    self,
    sql: str,
    output_path: str | Path,
    compression: str = "zstd",
) -> str:
    """Execute SQL and write results to parquet.

    Args:
        sql: SQL query to execute
        output_path: Path to write parquet file
        compression: Compression algorithm (default: zstd)

    Returns:
        String path to written parquet file
    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    self._get_conn().sql(sql).write_parquet(str(output_path), compression=compression)
    return str(output_path)

write_json

write_json(sql: str, output_path: str | Path) -> str

Execute SQL and write results to JSON array.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
output_path str | Path

Path to write JSON file

required

Returns:

Type Description
str

String path to written JSON file

Source code in cogapp_libs/dagster/duckdb.py
def write_json(
    self,
    sql: str,
    output_path: str | Path,
) -> str:
    """Execute SQL and write results to JSON array.

    Args:
        sql: SQL query to execute
        output_path: Path to write JSON file

    Returns:
        String path to written JSON file
    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    self._get_conn().execute(f"COPY ({sql}) TO '{output_path}' (FORMAT JSON, ARRAY true)")
    return str(output_path)

get_example_row

get_example_row(path: str, id_field: str | None = None, id_value: int | str | None = None) -> dict[str, str | None]

Get formatted example row for lineage display.

Source code in cogapp_libs/dagster/duckdb.py
def get_example_row(
    self,
    path: str,
    id_field: str | None = None,
    id_value: int | str | None = None,
) -> dict[str, str | None]:
    """Get formatted example row for lineage display."""
    return get_example_row(self._get_conn(), path, id_field, id_value)

register_view

register_view(name: str, path: str) -> None

Register a parquet file as a DuckDB view.

Source code in cogapp_libs/dagster/duckdb.py
def register_view(self, name: str, path: str) -> None:
    """Register a parquet file as a DuckDB view."""
    self._get_conn().execute(f"CREATE OR REPLACE VIEW {name} AS SELECT * FROM '{path}'")

For complex operations beyond what factories support:

from cogapp_libs.dagster.duckdb import DuckDBContext

@dg.asset
def custom_transform(context, duckdb, paths) -> str:
    with DuckDBContext(duckdb, paths, context) as ctx:
        ctx.register_views(["sales", "artworks"])

        # Custom SQL operations
        ctx.conn.sql("CREATE TABLE temp AS SELECT ...").write_parquet(path)

        ctx.add_parquet_metadata(path, lineage=LINEAGE, example_id=("id", 1))

    return str(path)

DuckDB Friendly SQL

DuckDB provides SQL extensions that make queries more readable and concise.

Dot Notation for String Functions

Chain string functions using method syntax instead of nested function calls:

-- Traditional SQL
SELECT upper(trim(name)) AS artist_name FROM artists

-- DuckDB friendly
SELECT name.trim().upper() AS artist_name FROM artists

Common chainable functions: upper(), lower(), trim(), ltrim(), rtrim(), replace(), substring().

GROUP BY ALL

Automatically group by all non-aggregated columns:

-- Traditional SQL
SELECT artwork_id, count(*) AS sale_count, sum(sale_price_usd) AS total
FROM sales
GROUP BY artwork_id

-- DuckDB friendly
SELECT artwork_id, count(*) AS sale_count, sum(sale_price_usd) AS total
FROM sales
GROUP BY ALL

SELECT * EXCLUDE / REPLACE

Select all columns except specific ones, or replace column definitions:

-- Exclude columns
SELECT * EXCLUDE (internal_id, created_at) FROM sales

-- Replace column values
SELECT * REPLACE (sale_price_usd / 100 AS sale_price_usd) FROM sales

-- Combine with rename
SELECT * EXCLUDE (price_diff), price_diff AS price_difference FROM sales

Numeric Underscores

Improve readability of large numbers:

SELECT * FROM sales WHERE sale_price_usd >= 1_000_000  -- $1M threshold

Aggregate with ORDER BY

Order values within aggregations:

-- Ordered list aggregation
SELECT artwork_id, list(filename ORDER BY sort_order) AS media_files
FROM media
GROUP BY ALL

-- String aggregation with order
SELECT artist_id, string_agg(title, ', ' ORDER BY year DESC) AS works
FROM artworks
GROUP BY ALL

Conditional Aggregates

Count or aggregate based on conditions:

-- Count matching rows
SELECT
    count(*) AS total_sales,
    count_if(sale_price_usd > 1_000_000) AS million_dollar_sales
FROM sales

-- Filter within aggregate
SELECT sum(sale_price_usd) FILTER (WHERE buyer_country = 'United States') AS us_sales
FROM sales

ifnull / coalesce

Handle NULL values concisely:

-- Two-argument null replacement
SELECT ifnull(sale_count, 0) AS sale_count FROM artworks

-- Multiple fallback values
SELECT coalesce(preferred_name, display_name, 'Unknown') AS name FROM artists

For more DuckDB SQL features, see:


Polars vs DuckDB Comparison

Common data transformations shown in both Polars and DuckDB SQL.

String Operations

Operation Polars DuckDB
Strip whitespace col("name").str.strip_chars() name.trim()
Uppercase col("name").str.to_uppercase() name.upper()
Lowercase col("name").str.to_lowercase() name.lower()
Replace col("name").str.replace("old", "new") name.replace('old', 'new')
Contains (bool) col("name").str.contains("pattern") name.contains('pattern')
Concatenate pl.concat_str([col("a"), col("b")], separator=" ") concat_ws(' ', a, b)
Extract first char col("name").str.slice(0, 1) name[1] or left(name, 1)

Example: Clean artist name

# Polars
df.with_columns(
    pl.col("name").str.strip_chars().str.to_uppercase().alias("artist_name")
)

# DuckDB
SELECT name.trim().upper() AS artist_name FROM artists

Null Handling

Operation Polars DuckDB
Fill with constant col("x").fill_null(0) coalesce(x, 0) or ifnull(x, 0)
Fill from column col("x").fill_null(col("y")) coalesce(x, y)
Check if null col("x").is_null() x IS NULL
Drop nulls df.drop_nulls("x") WHERE x IS NOT NULL

Example: Fill missing prices

# Polars
df.with_columns(
    pl.col("sale_price").fill_null(pl.col("list_price")).alias("final_price")
)

# DuckDB
SELECT coalesce(sale_price, list_price) AS final_price FROM sales

Filtering (Drop Rows)

Operation Polars DuckDB
Equals df.filter(col("status") == "active") WHERE status = 'active'
Not equals df.filter(col("status") != "deleted") WHERE status != 'deleted'
Greater than df.filter(col("price") > 1000) WHERE price > 1000
Multiple conditions df.filter((col("a") > 1) & (col("b") < 10)) WHERE a > 1 AND b < 10

Example: Filter high-value sales

# Polars
df.filter(pl.col("sale_price_usd") >= 50_000)

# DuckDB
SELECT * FROM sales WHERE sale_price_usd >= 50_000

Conditional Replace

Operation Polars DuckDB
When/then pl.when(cond).then(val).otherwise(col("x")) CASE WHEN cond THEN val ELSE x END
Simple if pl.when(cond).then(val).otherwise(col("x")) if(cond, val, x)

Example: Price tier categorization

# Polars
df.with_columns(
    pl.when(pl.col("price") < 1000).then(pl.lit("budget"))
      .when(pl.col("price") < 10000).then(pl.lit("mid"))
      .otherwise(pl.lit("premium"))
      .alias("price_tier")
)

# DuckDB
SELECT CASE
    WHEN price < 1000 THEN 'budget'
    WHEN price < 10_000 THEN 'mid'
    ELSE 'premium'
END AS price_tier
FROM artworks

List/Array Operations

Operation Polars DuckDB
Get first col("items").list.first() items[1] or list_extract(items, 1)
Explode to rows df.explode("items") UNNEST(items)
Aggregate to list col("x").implode() in group_by list(x) or array_agg(x)
List length col("items").list.len() len(items) or array_length(items)
Remove nulls col("items").list.drop_nulls() list_filter(items, x -> x IS NOT NULL)

Example: Aggregate media files

# Polars
df.group_by("artwork_id").agg(
    pl.col("filename").alias("media_files")
)

# DuckDB
SELECT artwork_id, list(filename ORDER BY sort_order) AS media_files
FROM media
GROUP BY ALL

Type Casting

Operation Polars DuckDB
To string col("x").cast(pl.Utf8) x::VARCHAR or CAST(x AS VARCHAR)
To integer col("x").cast(pl.Int64) x::INTEGER or CAST(x AS INTEGER)
To float col("x").cast(pl.Float64) x::DOUBLE or CAST(x AS DOUBLE)
To date col("x").str.to_date("%Y-%m-%d") x::DATE or strptime(x, '%Y-%m-%d')

Column Operations

Operation Polars DuckDB
Rename df.rename({"old": "new"}) SELECT old AS new
Select columns df.select(["a", "b"]) SELECT a, b
Drop columns df.drop(["x", "y"]) SELECT * EXCLUDE (x, y)
Add constant df.with_columns(pl.lit("value").alias("new")) SELECT *, 'value' AS new
Copy column df.with_columns(pl.col("a").alias("b")) SELECT *, a AS b

Honeysuckle Processor Equivalents

See the Processor Equivalents Guide for detailed examples showing how the top 15 most-used Honeysuckle processors map to native Polars and DuckDB operations.


Exceptions

PipelineError

Bases: Exception

Base exception for all pipeline errors.

MissingTableError

Bases: DataValidationError

Raised when a required database table doesn't exist.

Automatically lists available tables to help debugging.

Parameters:

Name Type Description Default
asset_name str

Name of the asset where error occurred

required
table_name str

Name of the missing table

required
available_tables list[str]

List of tables that exist in the database

required

Attributes:

Name Type Description
table_name

Name of the missing table

available_tables

List of available tables

Example

raise MissingTableError( "sales_transform", "raw.sales", ["raw.artworks", "raw.artists"] )

Source code in cogapp_libs/dagster/exceptions.py
def __init__(self, asset_name: str, table_name: str, available_tables: list[str]) -> None:
    self.table_name = table_name
    self.available_tables = available_tables

    message = (
        f"Table '{table_name}' not found in database. "
        f"Available tables: {available_tables}. "
        f"Did you run the harvest job first?"
    )
    super().__init__(asset_name, message)

MissingColumnError

Bases: DataValidationError

Raised when required columns are missing from a DataFrame.

Automatically lists available columns to help debugging.

Parameters:

Name Type Description Default
asset_name str

Name of the asset where error occurred

required
missing_columns set[str]

Set of column names that are missing

required
available_columns list[str]

List of columns that exist in the DataFrame

required

Attributes:

Name Type Description
missing_columns

Set of missing column names

available_columns

List of available column names

Example

raise MissingColumnError( "sales_transform", {"sale_price", "buyer_id"}, ["sale_id", "artwork_id", "sale_date"] )

Source code in cogapp_libs/dagster/exceptions.py
def __init__(
    self,
    asset_name: str,
    missing_columns: set[str],
    available_columns: list[str],
) -> None:
    self.missing_columns = missing_columns
    self.available_columns = available_columns

    message = (
        f"Missing required columns: {sorted(missing_columns)}. "
        f"Available columns: {sorted(available_columns)}"
    )
    super().__init__(asset_name, message)

DataValidationError

Bases: PipelineError

Base exception for data validation errors.

Parameters:

Name Type Description Default
asset_name str

Name of the asset where error occurred

required
message str

Detailed error message

required

Attributes:

Name Type Description
asset_name

Name of the asset (included in error message)

Source code in cogapp_libs/dagster/exceptions.py
def __init__(self, asset_name: str, message: str):
    self.asset_name = asset_name
    super().__init__(f"[{asset_name}] {message}")