Skip to content

Testing Guide

Comprehensive guide to testing Dagster pipelines in honey-duck.

Running Tests

# Run all tests
uv run pytest

# Run with verbose output
uv run pytest -v

# Run specific test file
uv run pytest tests/test_validation.py

# Run excluding slow tests
uv run pytest -m "not slow"

# Run only integration tests
uv run pytest -m integration

Test Categories

Unit Tests (test_validation.py, test_processors.py)

Test individual functions in isolation:

def test_validate_dataframe():
    df = pl.DataFrame({"a": [1], "b": [2]})
    # Should not raise
    validate_dataframe(df, ["a", "b"], "test_asset")

Smoke Tests (test_smoke.py)

Verify imports and basic configuration work:

def test_definitions_imports():
    from honey_duck.defs.definitions import defs
    assert defs is not None

Integration Tests (test_integration.py)

Test full pipeline execution with real data:

@pytest.mark.slow
def test_polars_ops_pipeline_executes(self, tmp_path: Path):
    job = defs.get_job_def("polars_ops_pipeline")
    result = job.execute_in_process()
    assert result.success

Linting Tests (test_linting.py)

Enforce best practices without running pipelines:

def test_all_assets_have_group_names(self, asset_graph):
    for key in asset_graph.get_all_asset_keys():
        node = asset_graph.get(key)
        if node.is_materializable:
            assert node.group_name is not None

Writing Asset Tests

Testing Assets Directly

Call asset functions with mock inputs:

from honey_duck.defs.polars.assets import sales_joined_polars

def test_sales_joined_polars(smoke_paths):
    # Assets can be called as regular functions
    result = sales_joined_polars(context, paths=smoke_paths)
    assert isinstance(result, pl.LazyFrame)

Testing with dg.materialize()

For assets with dependencies, use Dagster's materialize:

import dagster as dg

def test_asset_chain():
    result = dg.materialize(
        assets=[upstream_asset, downstream_asset],
        resources={"paths": mock_paths},
    )
    assert result.success

    # Check output values
    output = result.output_for_node("downstream_asset")
    assert len(output) > 0

Testing with Context

For assets requiring AssetExecutionContext:

def test_asset_with_context():
    context = dg.build_asset_context()
    result = my_asset(context, paths=mock_paths)
    assert result is not None

Test Fixtures

Common fixtures are defined in tests/conftest.py:

Fixture Purpose
temp_harvest_dir Temporary directory with empty Parquet schema files
smoke_paths PathsResource pointing to temp directories
smoke_output_paths OutputPathsResource for temp output files
noop_io_manager IO manager that accepts but doesn't persist data
mock_resources Complete resource dict for materialize()
sample_sales_df Sample sales DataFrame for unit tests
sample_artworks_df Sample artworks DataFrame for unit tests

Using Fixtures

def test_with_fixtures(temp_harvest_dir, smoke_paths):
    result = read_harvest_table_lazy(
        temp_harvest_dir,
        "sales_raw",
        asset_name="test",
    )
    assert isinstance(result, pl.LazyFrame)

Pytest Markers

Tests can be marked for selective execution:

@pytest.mark.slow
def test_full_pipeline():
    """This test takes a while to run."""
    ...

@pytest.mark.integration
def test_database_connection():
    """This test requires external services."""
    ...

Run specific markers:

# Skip slow tests during development
uv run pytest -m "not slow"

# Run only integration tests
uv run pytest -m integration

Best Practices

1. Separate Calculations from IO

Test pure transformation logic separately from IO:

# Good: Test the transformation logic
def test_calculate_price_tier():
    assert calculate_price_tier(500) == "budget"
    assert calculate_price_tier(5000) == "mid"
    assert calculate_price_tier(50000) == "premium"

# Separate test for asset that uses it
def test_asset_applies_price_tiers(mock_resources):
    result = dg.materialize([my_asset], resources=mock_resources)
    assert result.success

2. Use NoOpIOManager for Smoke Tests

Avoid writing to disk during quick tests:

class NoOpIOManager(IOManager):
    def handle_output(self, context, obj):
        context.log.info(f"NoOp: Would persist {len(obj)} records")

    def load_input(self, context):
        return pl.DataFrame().lazy()

3. Test Error Conditions

Verify assets fail gracefully:

def test_raises_on_missing_table(temp_harvest_dir):
    with pytest.raises(dg.Failure) as exc_info:
        read_harvest_table_lazy(
            temp_harvest_dir / "raw",
            "nonexistent_table",
            asset_name="test",
        )
    assert "nonexistent_table" in str(exc_info.value)

4. Test Asset Graph Structure

Verify dependencies are correct:

def test_transform_depends_on_harvest():
    asset_graph = defs.resolve_asset_graph()
    node = asset_graph.get(dg.AssetKey("sales_transform_polars"))

    assert any("harvest" in str(p) for p in node.parent_keys)

References