Skip to content

Soda Data Quality Validation

Validate data quality using Soda Core v4 with DuckDB. Parquet files are validated directly via SQL - no data loads into Python memory.

Why Soda?

Soda provides:

  • Contract-based validation: Define expected schema and quality rules in YAML
  • SQL-native execution: Validation runs as DuckDB queries, not Python loops
  • Blocking checks: Prevent downstream assets from running if validation fails
  • Rich metadata: Check results appear in Dagster UI

Installation

Soda v4 is distributed via a private PyPI index:

pip install -i https://pypi.cloud.soda.io/simple soda-duckdb>=4

DuckDB Version Compatibility

The standard soda-core-duckdb package requires duckdb<1.1.0. Use the v4 package from pypi.cloud.soda.io for compatibility with modern DuckDB versions.

Quick Start

1. Define a Contract

Create a YAML contract defining expected schema and quality rules:

# contracts/sales_transform.yml
dataset: duckdb_check/sales_transform

columns:
  - name: sale_id
  - name: sale_price_usd
    checks:
      - missing:
          must_be: 0
      - invalid:
          must_be: 0
          valid_min: 1
  - name: artist_name
    checks:
      - missing:
          must_be: 0

checks:
  - row_count:
      must_be_greater_than: 0
  - duplicate:
      columns: [sale_id]
      must_be: 0

2. Create an Asset Check

import dagster as dg
from .assets import sales_transform_soda

@dg.asset_check(asset=sales_transform_soda, blocking=True)
def check_sales_transform_soda(
    context: dg.AssetCheckExecutionContext,
    sales_transform_soda: str,  # Receives path from ParquetPathIOManager
) -> dg.AssetCheckResult:
    """Validate parquet against Soda contract.

    Blocking: If this fails, downstream assets won't materialize.
    """
    from soda_core.contracts import verify_contract_locally

    # Create temp data source config pointing to parquet
    ds_config = f"""
name: duckdb_check
type: duckdb
connection:
  database: "{sales_transform_soda}"
"""

    result = verify_contract_locally(
        data_source_file_path=ds_config,
        contract_file_path="contracts/sales_transform.yml",
        publish=False,
    )

    return dg.AssetCheckResult(
        passed=result.is_passed,
        metadata={
            "checks_passed": result.number_of_checks_passed,
            "checks_failed": result.number_of_checks_failed,
        },
    )

3. Use ParquetPathIOManager

The DuckDB+Soda pipeline uses ParquetPathIOManager to pass file paths (not DataFrames) between assets:

from cogapp_libs.dagster import ParquetPathIOManager

defs = dg.Definitions(
    resources={
        "parquet_path_io_manager": ParquetPathIOManager(
            base_dir="data/transforms"
        ),
    },
)

Contract Syntax

Column Checks

columns:
  - name: price
    checks:
      - missing:
          must_be: 0
      - invalid:
          must_be: 0
          valid_min: 0
          valid_max: 1000000

Row-Level Checks

checks:
  - row_count:
      must_be_greater_than: 0
  - duplicate:
      columns: [id]
      must_be: 0

Freshness Checks

checks:
  - freshness:
      column: updated_at
      fail_when_missing: true
      freshness_limit: P1D  # ISO 8601 duration

Complete Pipeline Example

Here's the full DuckDB+Soda pipeline pattern used in honey-duck:

import time
import dagster as dg
from dagster_duckdb import DuckDBResource
from cogapp_libs.dagster.lineage import (
    build_lineage,
    collect_parquet_metadata,
    register_harvest_views,
)

# Define column lineage using the DSL
SALES_RAW = dg.AssetKey("dlt_harvest_sales_raw")
ARTWORKS_RAW = dg.AssetKey("dlt_harvest_artworks_raw")

SALES_TRANSFORM_LINEAGE = build_lineage(
    passthrough={
        "sale_id": SALES_RAW,
        "artwork_id": SALES_RAW,
        "title": ARTWORKS_RAW,
    },
    rename={
        "artwork_year": (ARTWORKS_RAW, "year"),
        "list_price_usd": (ARTWORKS_RAW, "price_usd"),
    },
    computed={
        "price_diff": [(SALES_RAW, "sale_price_usd"), (ARTWORKS_RAW, "price_usd")],
    },
)

STATS_SQL = """
SELECT count(*) AS record_count, sum(sale_price_usd) AS total_value
FROM '{path}'
"""

@dg.asset(
    kinds={"duckdb", "sql", "soda"},
    deps=HARVEST_DEPS,
    group_name="transform_soda",
    io_manager_key="parquet_path_io_manager",
)
def sales_transform_soda(
    context: dg.AssetExecutionContext,
    duckdb: DuckDBResource,
    paths: PathsResource,
) -> str:
    """Transform sales data with column lineage."""
    start = time.perf_counter()
    output_path = Path(paths.transforms_dir) / "sales.parquet"

    with duckdb.get_connection() as conn:
        # Register source views (single line replaces manual CREATE VIEW statements)
        register_harvest_views(conn, paths.harvest_dir, ["sales", "artworks", "artists"])

        # Transform and write directly to parquet
        conn.sql(TRANSFORM_SQL).write_parquet(str(output_path), compression="zstd")

        # Collect all metadata in one call
        collect_parquet_metadata(
            context, conn, str(output_path),
            lineage=SALES_TRANSFORM_LINEAGE,
            stats_sql=STATS_SQL.format(path=output_path),
            example_id=("sale_id", 2),
            elapsed_ms=(time.perf_counter() - start) * 1000,
        )

    return str(output_path)  # Return path, not DataFrame

Memory Efficiency

The ParquetPathIOManager pattern keeps data in DuckDB:

Traditional Pattern:
  Asset 1 → DataFrame in memory → IO Manager → Parquet
  Asset 2 ← DataFrame in memory ← IO Manager ← Parquet

Path-based Pattern (DuckDB+Soda):
  Asset 1 → DuckDB writes parquet → returns path string
  Asset 2 ← receives path string → DuckDB reads parquet

Benefits:

  • Zero Python memory: Data stays in DuckDB/Parquet
  • Streaming: DuckDB processes in batches
  • Direct SQL: Use FROM 'path' in queries

Column Lineage

The pipeline tracks column-level data flow using the lineage DSL helpers:

from cogapp_libs.dagster.lineage import build_lineage, passthrough_lineage
import dagster as dg

SALES_RAW = dg.AssetKey("dlt_harvest_sales_raw")
ARTWORKS_RAW = dg.AssetKey("dlt_harvest_artworks_raw")
ARTISTS_RAW = dg.AssetKey("dlt_harvest_artists_raw")
SALES_TRANSFORM = dg.AssetKey("sales_transform_soda")

# Transform asset: joins multiple sources
SALES_TRANSFORM_LINEAGE = build_lineage(
    passthrough={
        "sale_id": SALES_RAW,
        "title": ARTWORKS_RAW,
    },
    rename={
        "artist_name": (ARTISTS_RAW, "name"),
    },
    computed={
        "price_diff": [(SALES_RAW, "sale_price_usd"), (ARTWORKS_RAW, "price_usd")],
    },
)

# Output asset: columns pass through from single source
SALES_OUTPUT_LINEAGE = passthrough_lineage(
    source=SALES_TRANSFORM,
    columns=["sale_id", "title", "artist_name", "pct_change"],
    renames={"price_difference": "price_diff"},  # output_col: source_col
)

The DSL is more compact than manual TableColumnLineage construction and groups columns by their lineage type (passthrough, rename, computed).

Example Values

Each asset emits lineage_examples metadata showing actual values:

from cogapp_libs.dagster.lineage import get_example_row

# Fetch one row formatted for display
examples = get_example_row(conn, parquet_path, "sale_id", 2)
# {"sale_id": "2", "sale_price_usd": "$86.3M", "title": "Day Dream"}

context.add_output_metadata({
    "lineage_examples": examples,
})

Threshold Checks

Soda contracts support pipeline thresholds natively — no Python code needed. Use these to catch broken sources or corrupt data before downstream assets run.

Minimum Record Count

Block the pipeline if a source delivers fewer rows than expected:

# contracts/sales_transform.yml
dataset: sales_transform

checks:
  # Minimum record count — catches empty or truncated harvests
  - row_count:
      must_be_greater_than: 800

Maximum Failure Rate (Per Column)

Block on excessive missing or invalid values. Soda expresses this per column rather than across all checks:

columns:
  - name: sale_price_usd
    checks:
      - invalid_percent:
          must_be_less_than: 5  # No more than 5% invalid prices
          valid_min: 1
      - missing_percent:
          must_be_less_than: 2  # No more than 2% nulls

  - name: artist_name
    checks:
      - missing_count:
          must_be_less_than: 50  # Absolute count threshold

Combined Example

A contract with both record count and quality thresholds:

# contracts/famsf_artworks.yml
dataset: famsf_artworks

checks:
  - row_count:
      must_be_greater_than: 7000
  - duplicate:
      columns: [artwork_id]
      must_be: 0

columns:
  - name: title
    checks:
      - missing_percent:
          must_be_less_than: 1
  - name: artist_name
    checks:
      - missing_percent:
          must_be_less_than: 5
  - name: list_price_usd
    checks:
      - invalid_percent:
          must_be_less_than: 5
          valid_min: 1

Pandera equivalent

For in-memory Polars validation with percentage-based thresholds, see Pandera Validation: Pipeline Thresholds.

Referential Integrity

Check that foreign keys in one parquet file reference valid records in another. Soda doesn't have built-in cross-file checks, but DuckDB SQL handles this directly:

-- Find artwork_ids in sales that don't exist in artworks
SELECT DISTINCT s.artwork_id
FROM 'data/storage/sales_transform.parquet' s
WHERE s.artwork_id NOT IN (
    SELECT artwork_id FROM 'data/harvest/raw/artworks_raw/**/*.parquet'
)

As a Dagster Asset Check

import dagster as dg

@dg.asset_check(asset=sales_transform_soda)
def check_artwork_ids_soda(
    context, sales_transform_soda: str, database: DatabaseResource,
) -> dg.AssetCheckResult:
    """SQL-based referential integrity check via DuckDB."""
    with database.get_connection() as conn:
        orphans = conn.sql(f"""
            SELECT DISTINCT artwork_id
            FROM '{sales_transform_soda}'
            WHERE artwork_id NOT IN (
                SELECT artwork_id FROM 'data/harvest/raw/artworks_raw/**/*.parquet'
            )
        """).fetchall()

    return dg.AssetCheckResult(
        passed=len(orphans) == 0,
        metadata={
            "orphan_count": len(orphans),
            "orphan_sample": [r[0] for r in orphans[:10]] if orphans else None,
        },
    )

Polars equivalent

For in-memory referential integrity checks using anti-joins, see Pandera Validation: Cross-File Referential Integrity.

Troubleshooting

Soda Not Installed

If Soda is not installed, checks pass with a warning:

try:
    from soda_core.contracts import verify_contract_locally
except ImportError:
    return dg.AssetCheckResult(
        passed=True,
        metadata={"warning": "Soda not installed - check skipped"},
    )

Contract File Not Found

Ensure contract paths are relative to your project root or use absolute paths:

CONTRACTS_DIR = Path(__file__).parent / "contracts"
contract_path = CONTRACTS_DIR / "sales_transform.yml"

Resources