Dagster Helpers¶
Utilities for building Dagster assets with less boilerplate.
External Documentation
- Dagster: docs.dagster.io | Assets Guide
- Polars: pola.rs/docs | API Reference
- DuckDB: duckdb.org/docs
Data Loading¶
read_harvest_table_lazy¶
Read harvest table from Parquet files as Polars LazyFrame.
Convenience wrapper around read_parquet_table_lazy for harvest tables. Automatically handles the schema subdirectory structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
harvest_dir
|
str | Path
|
Path to harvest Parquet directory (e.g., data/output/dlt/harvest_parquet) |
required |
table_name
|
str
|
Name of table to read |
required |
schema
|
str
|
Schema/subdirectory name (default: "raw") |
'raw'
|
required_columns
|
list[str] | None
|
Optional list of required column names |
None
|
asset_name
|
str
|
Name of calling asset (for error context) |
'unknown'
|
Returns:
| Type | Description |
|---|---|
LazyFrame
|
LazyFrame with data from Parquet files |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If harvest directory doesn't exist |
MissingTableError
|
If table doesn't exist (lists available tables) |
MissingColumnError
|
If required columns are missing (lists available columns) |
Example
Source code in cogapp_libs/dagster/validation.py
read_harvest_tables_lazy¶
Read multiple harvest tables in one call with validation.
Convenience function to batch-read multiple related tables. Reduces boilerplate when you need to join or process multiple tables together.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
harvest_dir
|
str | Path
|
Path to harvest Parquet directory (e.g., data/output/dlt/harvest_parquet) |
required |
*table_specs
|
tuple[str, list[str] | None]
|
Variable number of (table_name, required_columns) tuples. required_columns can be None to skip column validation. |
()
|
schema
|
str
|
Schema/subdirectory name (default: "raw") |
'raw'
|
asset_name
|
str
|
Name of calling asset (for error context) |
'unknown'
|
Returns:
| Type | Description |
|---|---|
dict[str, LazyFrame]
|
Dictionary mapping table names to LazyFrames |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If harvest directory doesn't exist |
MissingTableError
|
If any table doesn't exist (lists available tables) |
MissingColumnError
|
If required columns are missing (lists available columns) |
Example
tables = read_harvest_tables_lazy(
Path("data/output/dlt/harvest_parquet"),
("sales_raw", ["sale_id", "sale_price_usd"]),
("artworks_raw", ["artwork_id", "title"]),
("artists_raw", None), # No column validation
asset_name="sales_transform"
)
sales = tables["sales_raw"]
artworks = tables["artworks_raw"]
result = sales.join(artworks, on="artwork_id").collect()
Source code in cogapp_libs/dagster/validation.py
read_parquet_table_lazy¶
Read table from Parquet files as Polars LazyFrame with validation.
Validates table existence and required columns before returning. Provides actionable error messages listing available tables/columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parquet_dir
|
str | Path
|
Path to directory containing Parquet files (e.g., harvest_parquet/) |
required |
table_name
|
str
|
Name of table to read (subdirectory name) |
required |
required_columns
|
list[str] | None
|
Optional list of required column names |
None
|
asset_name
|
str
|
Name of calling asset (for error context) |
'unknown'
|
Returns:
| Type | Description |
|---|---|
LazyFrame
|
LazyFrame with data from Parquet files |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If parquet directory doesn't exist |
MissingTableError
|
If table directory doesn't exist (lists available tables) |
MissingColumnError
|
If required columns are missing (lists available columns) |
Example
Source code in cogapp_libs/dagster/validation.py
Validation¶
validate_dataframe¶
Validate DataFrame has required columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame to validate |
required |
required_columns
|
list[str]
|
List of required column names |
required |
asset_name
|
str
|
Name of asset (for error messages) |
required |
Raises:
| Type | Description |
|---|---|
Failure
|
If required columns are missing (with metadata) |
Source code in cogapp_libs/dagster/validation.py
Metadata¶
add_dataframe_metadata¶
Add standard metadata for a DataFrame result.
Automatically includes: - Record count - Column list - Preview (first 5 rows as markdown table) - Any extra metadata provided
Passes through all Dagster parameters to maintain full compatibility with multi-output assets and dynamic partitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
AssetExecutionContext
|
Asset execution context |
required |
df
|
DataFrame
|
Result DataFrame |
required |
output_name
|
str | None
|
(Optional) For multi-output assets - specify which output this metadata is for |
None
|
mapping_key
|
str | None
|
(Optional) For dynamic partitions - specify partition key |
None
|
**extra_metadata
|
MetadataValue | str | int | float | bool | None
|
Additional metadata to include |
{}
|
Example
# Simple usage
add_dataframe_metadata(
context,
result,
unique_artworks=result["artwork_id"].n_unique(),
total_value=float(result["sale_price_usd"].sum()),
)
# With multi-output asset
add_dataframe_metadata(
context,
result,
output_name="sales_joined",
unique_artworks=result["artwork_id"].n_unique(),
)
Source code in cogapp_libs/dagster/helpers.py
track_timing¶
Context manager to track and log execution time.
Automatically adds processing_time_ms to asset metadata and logs completion message. Use this to eliminate manual timing boilerplate.
Passes through all Dagster parameters to maintain full compatibility with multi-output assets and dynamic partitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
AssetExecutionContext | OpExecutionContext
|
Asset execution context |
required |
operation
|
str
|
Description of operation (e.g., "transform", "processing") |
'processing'
|
log_message
|
str | None
|
Optional custom log message template. Use {elapsed_ms} placeholder. |
None
|
output_name
|
str | None
|
(Optional) For multi-output assets - specify which output this metadata is for |
None
|
mapping_key
|
str | None
|
(Optional) For dynamic partitions - specify partition key |
None
|
Example
@dg.asset(kinds={"polars"})
def my_asset(context: dg.AssetExecutionContext) -> pl.DataFrame:
with track_timing(context, "transformation"):
result = expensive_operation()
# Automatically logs: "Completed transformation in 123.4ms"
# Automatically adds processing_time_ms to metadata
return result
# With multi-output asset
with track_timing(context, "loading", output_name="sales_joined"):
result = load_data()
Source code in cogapp_libs/dagster/helpers.py
write_json_output¶
Write DataFrame to JSON using DuckDB's native COPY command.
Creates parent directories if needed. Adds record_count, path, and preview to asset metadata automatically. Accepts both pandas and polars DataFrames.
Uses DuckDB's native JSON export for better performance - avoids the DataFrame → pandas → json serialization path.
Passes through all Dagster parameters to maintain full compatibility with multi-output assets and dynamic partitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | 'pl.DataFrame'
|
DataFrame to write (pandas or polars) |
required |
output_path
|
str | Path
|
Path to write JSON file |
required |
context
|
AssetExecutionContext
|
Dagster asset execution context |
required |
extra_metadata
|
dict | None
|
Additional metadata to include (optional) |
None
|
output_name
|
str | None
|
(Optional) For multi-output assets - specify which output this metadata is for |
None
|
mapping_key
|
str | None
|
(Optional) For dynamic partitions - specify partition key |
None
|
Source code in cogapp_libs/dagster/io.py
Visualization¶
altair_to_metadata¶
Convert an Altair chart to Dagster metadata with embedded PNG.
Creates a base64-encoded PNG image and wraps it in markdown for display in the Dagster UI asset metadata panel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chart
|
Chart
|
Altair Chart object (e.g., from df.plot.bar()) |
required |
title
|
str
|
Metadata key name (default: "chart") |
'chart'
|
Returns:
| Type | Description |
|---|---|
dict[str, MetadataValue]
|
Dictionary with single key containing MetadataValue.md() with embedded image |
Example
Source code in cogapp_libs/dagster/helpers.py
table_preview_to_metadata¶
Convert a Polars DataFrame to a markdown table for Dagster metadata.
Creates a clean markdown table preview suitable for Dagster UI display.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Polars DataFrame to preview |
required |
title
|
str
|
Metadata key name (default: "preview") |
'preview'
|
header
|
str | None
|
Optional header text to display above the table |
None
|
max_rows
|
int
|
Maximum rows to include (default: 10) |
10
|
Returns:
| Type | Description |
|---|---|
dict[str, MetadataValue]
|
Dictionary with single key containing MetadataValue.md() with markdown table |
Example
Source code in cogapp_libs/dagster/helpers.py
Column Lineage¶
Helpers for tracking column-level data flow and displaying example values in the Dagster UI.
Lineage DSL¶
Build column lineage definitions with a compact, declarative syntax.
build_lineage¶
Build column lineage from compact definitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
passthrough
|
dict[str, AssetKey] | None
|
{output_col: source_asset} - same column name in source |
None
|
rename
|
dict[str, tuple[AssetKey, str]] | None
|
{output_col: (source_asset, source_col)} - renamed columns |
None
|
computed
|
dict[str, list[tuple[AssetKey, str]]] | None
|
{output_col: [(asset, col), ...]} - derived from multiple columns |
None
|
Returns:
| Type | Description |
|---|---|
TableColumnLineage
|
TableColumnLineage for use with dagster/column_lineage metadata |
Example
SALES_RAW = dg.AssetKey("dlt_harvest_sales_raw") ARTWORKS_RAW = dg.AssetKey("dlt_harvest_artworks_raw") lineage = build_lineage( ... passthrough={"sale_id": SALES_RAW, "title": ARTWORKS_RAW}, ... rename={"artwork_year": (ARTWORKS_RAW, "year")}, ... computed={"price_diff": [(SALES_RAW, "sale_price_usd"), (ARTWORKS_RAW, "price_usd")]}, ... )
Source code in cogapp_libs/dagster/lineage.py
Example:
from cogapp_libs.dagster.lineage import build_lineage
import dagster as dg
SALES_RAW = dg.AssetKey("dlt_harvest_sales_raw")
ARTWORKS_RAW = dg.AssetKey("dlt_harvest_artworks_raw")
ARTISTS_RAW = dg.AssetKey("dlt_harvest_artists_raw")
SALES_TRANSFORM_LINEAGE = build_lineage(
passthrough={
"sale_id": SALES_RAW,
"artwork_id": SALES_RAW,
"title": ARTWORKS_RAW,
"artist_name": ARTISTS_RAW,
},
rename={
"artwork_year": (ARTWORKS_RAW, "year"),
"list_price_usd": (ARTWORKS_RAW, "price_usd"),
},
computed={
"price_diff": [(SALES_RAW, "sale_price_usd"), (ARTWORKS_RAW, "price_usd")],
},
)
context.add_output_metadata({
"dagster/column_lineage": SALES_TRANSFORM_LINEAGE,
})
passthrough_lineage¶
Build lineage for passthrough assets (output columns match input).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
AssetKey
|
Source asset key |
required |
columns
|
list[str]
|
List of column names that pass through unchanged |
required |
renames
|
dict[str, str] | None
|
{output_col: source_col} for renamed columns |
None
|
Returns:
| Type | Description |
|---|---|
TableColumnLineage
|
TableColumnLineage for use with dagster/column_lineage metadata |
Example
lineage = passthrough_lineage( ... source=dg.AssetKey("sales_transform"), ... columns=["sale_id", "title", "artist_name"], ... renames={"price_difference": "price_diff"}, ... )
Source code in cogapp_libs/dagster/lineage.py
Example:
from cogapp_libs.dagster.lineage import passthrough_lineage
import dagster as dg
# For output assets where columns pass through from a single source
SALES_OUTPUT_LINEAGE = passthrough_lineage(
source=dg.AssetKey("sales_transform_soda"),
columns=["sale_id", "artwork_id", "title", "artist_name", "pct_change"],
renames={"price_difference": "price_diff"}, # output_col: source_col
)
DuckDB View Registration¶
register_harvest_views¶
Register DuckDB views for harvest parquet files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn
|
DuckDBPyConnection
|
DuckDB connection |
required |
harvest_dir
|
str
|
Base harvest directory (e.g., "data/harvest") |
required |
views
|
list[str] | None
|
View names to register (default: all from HARVEST_VIEWS) |
None
|
Example
with duckdb.connect() as conn: ... register_harvest_views(conn, "data/harvest", ["sales", "artworks"]) ... result = conn.sql("SELECT * FROM sales LIMIT 5")
Source code in cogapp_libs/dagster/lineage.py
Simplifies registering harvest parquet files as DuckDB views.
Example:
from cogapp_libs.dagster.lineage import register_harvest_views
with duckdb.get_connection() as conn:
# Register views for sales, artworks, and artists
register_harvest_views(conn, paths.harvest_dir, ["sales", "artworks", "artists"])
# Now query using simple view names
conn.sql("SELECT * FROM sales JOIN artworks USING (artwork_id)")
Default view mappings (defined in HARVEST_VIEWS):
| View Name | Harvest Subdirectory |
|---|---|
sales |
raw/sales_raw/**/*.parquet |
artworks |
raw/artworks_raw/**/*.parquet |
artists |
raw/artists_raw/**/*.parquet |
media |
raw/media/**/*.parquet |
Metadata Collection¶
Helpers that consolidate lineage, stats, examples, and timing into a single call.
collect_parquet_metadata¶
Add standard parquet asset metadata to context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
AssetExecutionContext
|
Dagster asset execution context |
required |
conn
|
DuckDBPyConnection
|
DuckDB connection (for running stats query and getting examples) |
required |
output_path
|
str
|
Path to the output parquet file |
required |
lineage
|
TableColumnLineage
|
Column lineage definition |
required |
stats_sql
|
str
|
SQL returning stat columns (first row used as metadata) |
required |
example_id
|
tuple[str, int | str] | None
|
(id_field, id_value) for example row lookup |
None
|
extra_metadata
|
dict | None
|
Additional metadata to include |
None
|
elapsed_ms
|
float | None
|
Processing time in milliseconds |
None
|
Example
collect_parquet_metadata( ... context, conn, "data/sales.parquet", ... lineage=SALES_LINEAGE, ... stats_sql="SELECT count(*) as record_count, sum(value) as total FROM 'data/sales.parquet'", ... example_id=("sale_id", 2), ... )
Source code in cogapp_libs/dagster/lineage.py
Example:
from cogapp_libs.dagster.lineage import collect_parquet_metadata, register_harvest_views
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_transform_soda(context, duckdb, paths, output_paths) -> str:
start = time.perf_counter()
output_path = Path(output_paths.transforms_dir) / "sales.parquet"
with duckdb.get_connection() as conn:
register_harvest_views(conn, paths.harvest_dir, ["sales", "artworks", "artists"])
conn.sql(TRANSFORM_SQL).write_parquet(str(output_path), compression="zstd")
collect_parquet_metadata(
context, conn, str(output_path),
lineage=SALES_TRANSFORM_LINEAGE,
stats_sql=f"SELECT count(*) AS record_count, sum(value) AS total FROM '{output_path}'",
example_id=("sale_id", 2),
elapsed_ms=(time.perf_counter() - start) * 1000,
)
return str(output_path)
collect_json_output_metadata¶
Add standard JSON output asset metadata to context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
AssetExecutionContext
|
Dagster asset execution context |
required |
conn
|
DuckDBPyConnection
|
DuckDB connection |
required |
input_path
|
str
|
Path to input parquet file (for example row) |
required |
output_path
|
str
|
Path to output JSON file |
required |
lineage
|
TableColumnLineage
|
Column lineage definition |
required |
example_id
|
tuple[str, int | str] | None
|
(id_field, id_value) for example row lookup |
None
|
example_renames
|
dict[str, str] | None
|
{output_col: input_col} for renamed columns in examples |
None
|
extra_metadata
|
dict | None
|
Additional metadata to include |
None
|
elapsed_ms
|
float | None
|
Processing time in milliseconds |
None
|
Example
collect_json_output_metadata( ... context, conn, ... input_path="data/sales_transform.parquet", ... output_path="data/output/sales.json", ... lineage=SALES_OUTPUT_LINEAGE, ... example_id=("sale_id", 2), ... example_renames={"price_difference": "price_diff"}, ... )
Source code in cogapp_libs/dagster/lineage.py
Example:
from cogapp_libs.dagster.lineage import collect_json_output_metadata
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_output_soda(context, duckdb, sales_transform_soda: str, output_paths) -> str:
start = time.perf_counter()
with duckdb.get_connection() as conn:
# Write JSON output...
collect_json_output_metadata(
context, conn,
input_path=sales_transform_soda,
output_path=output_paths.sales_json,
lineage=SALES_OUTPUT_LINEAGE,
example_id=("sale_id", 2),
example_renames={"price_difference": "price_diff"}, # Handle column renames
extra_metadata={"record_count": count, "filter_threshold": "$50,000"},
elapsed_ms=(time.perf_counter() - start) * 1000,
)
return sales_transform_soda
Example Row Helpers¶
get_example_row¶
Get one example row from a parquet file for lineage display.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn
|
DuckDBPyConnection
|
DuckDB connection |
required |
path
|
str
|
Path to parquet file (can include globs) |
required |
id_field
|
str | None
|
Optional field name to filter by (e.g., "sale_id") |
None
|
id_value
|
int | str | None
|
Optional value to match (e.g., 2) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, str | None]
|
Dict mapping column names to formatted values. |
dict[str, str | None]
|
If id_field/id_value provided, returns that specific record. |
dict[str, str | None]
|
Otherwise returns the first row. |
Example
examples = get_example_row(conn, "data/sales.parquet", "sale_id", 2)
{"sale_id": "2", "price": "$86.3M", "title": "Day Dream"}¶
Source code in cogapp_libs/dagster/lineage.py
Example:
from cogapp_libs.dagster.lineage import get_example_row
with duckdb.get_connection() as conn:
# Get a specific row by ID
examples = get_example_row(conn, "data/sales.parquet", "sale_id", 2)
# {"sale_id": "2", "sale_price_usd": "$86.3M", "title": "Day Dream"}
context.add_output_metadata({
"lineage_examples": examples,
})
format_value¶
Format a value for display in lineage examples.
Formats numbers with appropriate units: - >= 1M: $1.2M - >= 1K: $1.5K - floats: $123.45 - ints: 1,234
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
val
|
Any
|
Any value to format |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Formatted string or None |
Source code in cogapp_libs/dagster/lineage.py
Formats values for display in lineage visualizations:
86300000→$86.3M1500→$1.5K-560000→-$560K123.45→$123.45
add_lineage_examples_to_dlt_results¶
Add lineage examples to dlt MaterializeResults.
Simplifies adding example row data to dlt harvest assets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
results
|
list[MaterializeResult | AssetMaterialization]
|
List of MaterializeResult from dlt.run() |
required |
harvest_dir
|
str
|
Base directory for harvest parquet files |
required |
config
|
dict[str, dict]
|
Dict mapping asset names to config with keys: - path: Relative path/glob to parquet files - id_field: Optional field name to filter by - id_value: Optional value to match |
required |
Yields:
| Type | Description |
|---|---|
MaterializeResult | AssetMaterialization
|
MaterializeResult with lineage_examples metadata added |
Example
ASSET_CONFIG = {
"dlt_harvest_sales_raw": {
"path": "raw/sales_raw/**/*.parquet",
"id_field": "sale_id",
"id_value": 2,
},
}
@dg.multi_asset(specs=HARVEST_ASSET_SPECS)
def dlt_harvest_assets(context, dlt, paths, database):
results = list(dlt.run(...))
yield from add_lineage_examples_to_dlt_results(
results, paths.harvest_dir, ASSET_CONFIG
)
Source code in cogapp_libs/dagster/lineage.py
Example:
from cogapp_libs.dagster.lineage import add_lineage_examples_to_dlt_results
ASSET_CONFIG = {
"dlt_harvest_sales_raw": {
"path": "raw/sales_raw/**/*.parquet",
"id_field": "sale_id",
"id_value": 2,
},
}
@dg.multi_asset(specs=HARVEST_SPECS)
def dlt_harvest_assets(context, dlt, paths):
results = list(dlt.run(...))
yield from add_lineage_examples_to_dlt_results(
results, paths.harvest_dir, ASSET_CONFIG
)
DuckDB Asset Factories¶
Declarative factories for building DuckDB-based Dagster assets with minimal boilerplate.
duckdb_transform_asset¶
Create a DuckDB SQL transform asset.
The asset executes SQL and writes results to Parquet. Sources (harvest views and upstream transforms) are registered as DuckDB views, so SQL can reference them by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Asset name (also used for output parquet filename) |
required |
sql
|
str
|
SQL query to execute (can reference views by name) |
required |
harvest_views
|
list[str] | None
|
Harvest tables to register as views (e.g., ["sales", "artworks"]) |
None
|
upstream
|
list[str] | None
|
Upstream transform assets to depend on (registered as views) |
None
|
lineage
|
TableColumnLineage | None
|
Column lineage definition for Dagster catalog |
None
|
example_id
|
tuple[str, int | str] | None
|
(id_field, id_value) for example row in metadata |
None
|
stats_sql
|
str | None
|
Custom stats SQL (default: count + numeric sums from output) |
None
|
group_name
|
str
|
Dagster asset group |
'transform'
|
kinds
|
set[str] | None
|
Asset kinds (default: {"duckdb", "sql"}) |
None
|
check_fn
|
Any | None
|
Optional asset check function |
None
|
Returns:
| Type | Description |
|---|---|
AssetsDefinition
|
AssetsDefinition configured with ParquetPathIOManager |
Example
First transform: reads from harvest¶
sales_enriched = duckdb_transform_asset( name="sales_enriched", sql="SELECT s.*, a.title FROM sales s JOIN artworks a USING (artwork_id)", harvest_views=["sales", "artworks"], lineage=SALES_ENRICHED_LINEAGE, example_id=("sale_id", 2), )
Chained transform: reads from upstream¶
sales_filtered = duckdb_transform_asset( name="sales_filtered", sql="SELECT * FROM sales_enriched WHERE sale_price_usd > 1000", upstream=["sales_enriched"], )
Mixed sources: upstream + harvest¶
sales_with_artists = duckdb_transform_asset( name="sales_with_artists", sql="SELECT e.*, ar.name FROM sales_enriched e JOIN artists ar USING (artist_id)", upstream=["sales_enriched"], harvest_views=["artists"], )
Source code in cogapp_libs/dagster/duckdb.py
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | |
Example:
from cogapp_libs.dagster.duckdb import duckdb_transform_asset
sales_transform_soda = duckdb_transform_asset(
name="sales_transform_soda",
sql="""
SELECT s.sale_id, s.sale_date, s.sale_price_usd,
aw.title, ar.name.trim().upper() AS artist_name
FROM sales s
LEFT JOIN artworks aw USING (artwork_id)
LEFT JOIN artists ar USING (artist_id)
ORDER BY s.sale_date DESC
""",
harvest_views=["sales", "artworks", "artists"],
lineage=SALES_TRANSFORM_LINEAGE,
example_id=("sale_id", 2),
group_name="transform_soda",
kinds={"duckdb", "sql"},
)
duckdb_output_asset¶
Create a DuckDB output asset that writes to JSON.
Reads from an upstream transform's parquet and writes filtered/transformed output to JSON format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Asset name |
required |
source
|
str
|
Upstream transform asset name (provides parquet path) |
required |
output_path_attr
|
str | None
|
Attribute name on OutputPathsResource for output path (e.g., "sales_soda" for OutputPathsResource.sales_soda) |
None
|
output_format
|
str
|
Output format ("json" supported) |
'json'
|
where
|
str | None
|
Optional SQL WHERE clause for filtering |
None
|
select
|
str
|
SQL SELECT clause (default "*", use for renames/excludes) |
'*'
|
lineage
|
TableColumnLineage | None
|
Column lineage definition |
None
|
example_id
|
tuple[str, int | str] | None
|
(id_field, id_value) for example row |
None
|
example_renames
|
dict[str, str] | None
|
Column renames to apply to example data |
None
|
group_name
|
str
|
Dagster asset group |
'output'
|
kinds
|
set[str] | None
|
Asset kinds (default: {"duckdb", "json"}) |
None
|
deps
|
list[str] | None
|
Additional asset dependencies (e.g., for ordering) |
None
|
Returns:
| Type | Description |
|---|---|
AssetsDefinition
|
AssetsDefinition configured with ParquetPathIOManager |
Example
sales_output = duckdb_output_asset( name="sales_output_soda", source="sales_transform_soda", output_path_attr="sales_soda", where="sale_price_usd >= 50000", select="* EXCLUDE (price_diff), price_diff AS price_difference", lineage=SALES_OUTPUT_LINEAGE, example_id=("sale_id", 2), example_renames={"price_difference": "price_diff"}, deps=["artworks_output_soda"], # Ordering dependency )
Source code in cogapp_libs/dagster/duckdb.py
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 | |
Example:
from cogapp_libs.dagster.duckdb import duckdb_output_asset
sales_output_soda = duckdb_output_asset(
name="sales_output_soda",
source="sales_transform_soda",
output_path_attr="sales_soda",
where="sale_price_usd >= 50_000",
select="* EXCLUDE (price_diff), price_diff AS price_difference",
lineage=SALES_OUTPUT_LINEAGE,
example_id=("sale_id", 2),
example_renames={"price_difference": "price_diff"},
group_name="output_soda",
kinds={"duckdb", "json"},
)
DuckDBContext¶
Context manager for DuckDB operations.
Source code in cogapp_libs/dagster/duckdb.py
execute ¶
sql ¶
fetchone ¶
fetchall ¶
write_parquet ¶
Execute SQL and write results to parquet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str
|
SQL query to execute |
required |
output_path
|
str | Path
|
Path to write parquet file |
required |
compression
|
str
|
Compression algorithm (default: zstd) |
'zstd'
|
Returns:
| Type | Description |
|---|---|
str
|
String path to written parquet file |
Source code in cogapp_libs/dagster/duckdb.py
write_json ¶
Execute SQL and write results to JSON array.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str
|
SQL query to execute |
required |
output_path
|
str | Path
|
Path to write JSON file |
required |
Returns:
| Type | Description |
|---|---|
str
|
String path to written JSON file |
Source code in cogapp_libs/dagster/duckdb.py
get_example_row ¶
get_example_row(path: str, id_field: str | None = None, id_value: int | str | None = None) -> dict[str, str | None]
Get formatted example row for lineage display.
Source code in cogapp_libs/dagster/duckdb.py
register_view ¶
For complex operations beyond what factories support:
from cogapp_libs.dagster.duckdb import DuckDBContext
@dg.asset
def custom_transform(context, duckdb, paths) -> str:
with DuckDBContext(duckdb, paths, context) as ctx:
ctx.register_views(["sales", "artworks"])
# Custom SQL operations
ctx.conn.sql("CREATE TABLE temp AS SELECT ...").write_parquet(path)
ctx.add_parquet_metadata(path, lineage=LINEAGE, example_id=("id", 1))
return str(path)
DuckDB Friendly SQL¶
DuckDB provides SQL extensions that make queries more readable and concise.
Dot Notation for String Functions¶
Chain string functions using method syntax instead of nested function calls:
-- Traditional SQL
SELECT upper(trim(name)) AS artist_name FROM artists
-- DuckDB friendly
SELECT name.trim().upper() AS artist_name FROM artists
Common chainable functions: upper(), lower(), trim(), ltrim(), rtrim(), replace(), substring().
GROUP BY ALL¶
Automatically group by all non-aggregated columns:
-- Traditional SQL
SELECT artwork_id, count(*) AS sale_count, sum(sale_price_usd) AS total
FROM sales
GROUP BY artwork_id
-- DuckDB friendly
SELECT artwork_id, count(*) AS sale_count, sum(sale_price_usd) AS total
FROM sales
GROUP BY ALL
SELECT * EXCLUDE / REPLACE¶
Select all columns except specific ones, or replace column definitions:
-- Exclude columns
SELECT * EXCLUDE (internal_id, created_at) FROM sales
-- Replace column values
SELECT * REPLACE (sale_price_usd / 100 AS sale_price_usd) FROM sales
-- Combine with rename
SELECT * EXCLUDE (price_diff), price_diff AS price_difference FROM sales
Numeric Underscores¶
Improve readability of large numbers:
Aggregate with ORDER BY¶
Order values within aggregations:
-- Ordered list aggregation
SELECT artwork_id, list(filename ORDER BY sort_order) AS media_files
FROM media
GROUP BY ALL
-- String aggregation with order
SELECT artist_id, string_agg(title, ', ' ORDER BY year DESC) AS works
FROM artworks
GROUP BY ALL
Conditional Aggregates¶
Count or aggregate based on conditions:
-- Count matching rows
SELECT
count(*) AS total_sales,
count_if(sale_price_usd > 1_000_000) AS million_dollar_sales
FROM sales
-- Filter within aggregate
SELECT sum(sale_price_usd) FILTER (WHERE buyer_country = 'United States') AS us_sales
FROM sales
ifnull / coalesce¶
Handle NULL values concisely:
-- Two-argument null replacement
SELECT ifnull(sale_count, 0) AS sale_count FROM artworks
-- Multiple fallback values
SELECT coalesce(preferred_name, display_name, 'Unknown') AS name FROM artists
For more DuckDB SQL features, see:
Polars vs DuckDB Comparison¶
Common data transformations shown in both Polars and DuckDB SQL.
String Operations¶
| Operation | Polars | DuckDB |
|---|---|---|
| Strip whitespace | col("name").str.strip_chars() |
name.trim() |
| Uppercase | col("name").str.to_uppercase() |
name.upper() |
| Lowercase | col("name").str.to_lowercase() |
name.lower() |
| Replace | col("name").str.replace("old", "new") |
name.replace('old', 'new') |
| Contains (bool) | col("name").str.contains("pattern") |
name.contains('pattern') |
| Concatenate | pl.concat_str([col("a"), col("b")], separator=" ") |
concat_ws(' ', a, b) |
| Extract first char | col("name").str.slice(0, 1) |
name[1] or left(name, 1) |
Example: Clean artist name
# Polars
df.with_columns(
pl.col("name").str.strip_chars().str.to_uppercase().alias("artist_name")
)
# DuckDB
SELECT name.trim().upper() AS artist_name FROM artists
Null Handling¶
| Operation | Polars | DuckDB |
|---|---|---|
| Fill with constant | col("x").fill_null(0) |
coalesce(x, 0) or ifnull(x, 0) |
| Fill from column | col("x").fill_null(col("y")) |
coalesce(x, y) |
| Check if null | col("x").is_null() |
x IS NULL |
| Drop nulls | df.drop_nulls("x") |
WHERE x IS NOT NULL |
Example: Fill missing prices
# Polars
df.with_columns(
pl.col("sale_price").fill_null(pl.col("list_price")).alias("final_price")
)
# DuckDB
SELECT coalesce(sale_price, list_price) AS final_price FROM sales
Filtering (Drop Rows)¶
| Operation | Polars | DuckDB |
|---|---|---|
| Equals | df.filter(col("status") == "active") |
WHERE status = 'active' |
| Not equals | df.filter(col("status") != "deleted") |
WHERE status != 'deleted' |
| Greater than | df.filter(col("price") > 1000) |
WHERE price > 1000 |
| Multiple conditions | df.filter((col("a") > 1) & (col("b") < 10)) |
WHERE a > 1 AND b < 10 |
Example: Filter high-value sales
# Polars
df.filter(pl.col("sale_price_usd") >= 50_000)
# DuckDB
SELECT * FROM sales WHERE sale_price_usd >= 50_000
Conditional Replace¶
| Operation | Polars | DuckDB |
|---|---|---|
| When/then | pl.when(cond).then(val).otherwise(col("x")) |
CASE WHEN cond THEN val ELSE x END |
| Simple if | pl.when(cond).then(val).otherwise(col("x")) |
if(cond, val, x) |
Example: Price tier categorization
# Polars
df.with_columns(
pl.when(pl.col("price") < 1000).then(pl.lit("budget"))
.when(pl.col("price") < 10000).then(pl.lit("mid"))
.otherwise(pl.lit("premium"))
.alias("price_tier")
)
# DuckDB
SELECT CASE
WHEN price < 1000 THEN 'budget'
WHEN price < 10_000 THEN 'mid'
ELSE 'premium'
END AS price_tier
FROM artworks
List/Array Operations¶
| Operation | Polars | DuckDB |
|---|---|---|
| Get first | col("items").list.first() |
items[1] or list_extract(items, 1) |
| Explode to rows | df.explode("items") |
UNNEST(items) |
| Aggregate to list | col("x").implode() in group_by |
list(x) or array_agg(x) |
| List length | col("items").list.len() |
len(items) or array_length(items) |
| Remove nulls | col("items").list.drop_nulls() |
list_filter(items, x -> x IS NOT NULL) |
Example: Aggregate media files
# Polars
df.group_by("artwork_id").agg(
pl.col("filename").alias("media_files")
)
# DuckDB
SELECT artwork_id, list(filename ORDER BY sort_order) AS media_files
FROM media
GROUP BY ALL
Type Casting¶
| Operation | Polars | DuckDB |
|---|---|---|
| To string | col("x").cast(pl.Utf8) |
x::VARCHAR or CAST(x AS VARCHAR) |
| To integer | col("x").cast(pl.Int64) |
x::INTEGER or CAST(x AS INTEGER) |
| To float | col("x").cast(pl.Float64) |
x::DOUBLE or CAST(x AS DOUBLE) |
| To date | col("x").str.to_date("%Y-%m-%d") |
x::DATE or strptime(x, '%Y-%m-%d') |
Column Operations¶
| Operation | Polars | DuckDB |
|---|---|---|
| Rename | df.rename({"old": "new"}) |
SELECT old AS new |
| Select columns | df.select(["a", "b"]) |
SELECT a, b |
| Drop columns | df.drop(["x", "y"]) |
SELECT * EXCLUDE (x, y) |
| Add constant | df.with_columns(pl.lit("value").alias("new")) |
SELECT *, 'value' AS new |
| Copy column | df.with_columns(pl.col("a").alias("b")) |
SELECT *, a AS b |
Honeysuckle Processor Equivalents
See the Processor Equivalents Guide for detailed examples showing how the top 15 most-used Honeysuckle processors map to native Polars and DuckDB operations.
Exceptions¶
PipelineError¶
MissingTableError¶
Bases: DataValidationError
Raised when a required database table doesn't exist.
Automatically lists available tables to help debugging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset_name
|
str
|
Name of the asset where error occurred |
required |
table_name
|
str
|
Name of the missing table |
required |
available_tables
|
list[str]
|
List of tables that exist in the database |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
table_name |
Name of the missing table |
|
available_tables |
List of available tables |
Example
raise MissingTableError( "sales_transform", "raw.sales", ["raw.artworks", "raw.artists"] )
Source code in cogapp_libs/dagster/exceptions.py
MissingColumnError¶
Bases: DataValidationError
Raised when required columns are missing from a DataFrame.
Automatically lists available columns to help debugging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset_name
|
str
|
Name of the asset where error occurred |
required |
missing_columns
|
set[str]
|
Set of column names that are missing |
required |
available_columns
|
list[str]
|
List of columns that exist in the DataFrame |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
missing_columns |
Set of missing column names |
|
available_columns |
List of available column names |
Example
raise MissingColumnError( "sales_transform", {"sale_price", "buyer_id"}, ["sale_id", "artwork_id", "sale_date"] )
Source code in cogapp_libs/dagster/exceptions.py
DataValidationError¶
Bases: PipelineError
Base exception for data validation errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset_name
|
str
|
Name of the asset where error occurred |
required |
message
|
str
|
Detailed error message |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
asset_name |
Name of the asset (included in error message) |