Skip to content

IO Managers

Custom IO managers for various storage backends.

External Documentation

Elasticsearch

ElasticsearchIOManager

Bases: IOManager

IO Manager for writing DataFrames to Elasticsearch 8/9 and reading them back.

Provides integration with Elasticsearch for storing and querying pipeline outputs. Supports both Polars and Pandas DataFrames with efficient bulk indexing.

Features: - Bulk indexing for high performance (configurable batch size) - Automatic index creation with dynamic or custom mappings - Support for both Elasticsearch 8 and 9 - Connection pooling and retry logic - Metadata tracking (document count, index name, response time) - Handles both write (dump) and read (load) operations

Parameters:

Name Type Description Default
hosts list[str]

Elasticsearch host(s) (e.g., ["http://localhost:9200"])

required
index_prefix str

Prefix for index names (e.g., "dagster_")

'dagster_'
api_key str | None

Elasticsearch API key (recommended for security)

None
basic_auth tuple[str, str] | None

Tuple of (username, password) if not using API key

None
verify_certs bool

Whether to verify SSL certificates (default: True)

True
bulk_size int

Number of documents per bulk request (default: 500)

500
custom_mappings dict | None

Optional dict of {asset_name: mapping_dict}

None

Environment Variables (recommended for credentials): ELASTICSEARCH_HOST: Comma-separated hosts ELASTICSEARCH_API_KEY: API key for authentication ELASTICSEARCH_USER: Username for basic auth ELASTICSEARCH_PASSWORD: Password for basic auth

Example
# In definitions.py:
import os

defs = dg.Definitions(
    assets=[sales_output, artworks_output],
    resources={
        "elasticsearch_io_manager": ElasticsearchIOManager(
            hosts=[os.getenv("ELASTICSEARCH_HOST", "http://localhost:9200")],
            index_prefix="honey_duck_",
            api_key=os.getenv("ELASTICSEARCH_API_KEY"),
            bulk_size=1000,
        ),
    },
)

# In asset:
@dg.asset(io_manager_key="elasticsearch_io_manager")
def sales_output(sales_transform: pl.DataFrame) -> pl.DataFrame:
    # DataFrame automatically indexed to Elasticsearch
    return sales_transform
Index Naming

Indices are named: {index_prefix}{asset_name}_{timestamp} Example: "honey_duck_sales_output_20240115"

Custom Mappings Example
custom_mappings = {
    "sales_output": {
        "properties": {
            "sale_id": {"type": "keyword"},
            "sale_price_usd": {"type": "float"},
            "sale_date": {"type": "date"},
            "artwork_title": {"type": "text"},
        }
    }
}

ElasticsearchIOManager(
    hosts=["http://localhost:9200"],
    custom_mappings=custom_mappings,
)
Source code in cogapp_libs/dagster/io_managers.py
def __init__(
    self,
    hosts: list[str],
    index_prefix: str = "dagster_",
    api_key: str | None = None,
    basic_auth: tuple[str, str] | None = None,
    verify_certs: bool = True,
    bulk_size: int = 500,
    custom_mappings: dict[str, dict] | None = None,
    **es_kwargs,
):
    self.hosts = hosts
    self.index_prefix = index_prefix
    self.api_key = api_key
    self.basic_auth = basic_auth
    self.verify_certs = verify_certs
    self.bulk_size = bulk_size
    self.custom_mappings = custom_mappings or {}
    self.es_kwargs = es_kwargs
    self._client = None

handle_output

handle_output(context: OutputContext, obj: DataFrame | DataFrame)

Write DataFrame to Elasticsearch using bulk API.

Uses generator-based streaming for memory efficiency - never materializes the full DataFrame as a list of dicts.

Parameters:

Name Type Description Default
context OutputContext

Dagster output context

required
obj DataFrame | DataFrame

DataFrame to index (pandas or polars)

required
Source code in cogapp_libs/dagster/io_managers.py
def handle_output(self, context: OutputContext, obj: pd.DataFrame | pl.DataFrame):
    """Write DataFrame to Elasticsearch using bulk API.

    Uses generator-based streaming for memory efficiency - never materializes
    the full DataFrame as a list of dicts.

    Args:
        context: Dagster output context
        obj: DataFrame to index (pandas or polars)
    """
    import time

    import polars as pl

    client = self._get_client()
    index_name = self._get_index_name(context)
    asset_name = context.asset_key.path[-1]

    # Create index if needed
    self._create_index_if_not_exists(client, index_name, asset_name, context)

    # Get record count without materializing
    record_count = len(obj)
    context.log.info(f"Indexing {record_count:,} documents to '{index_name}'")

    # Bulk index using generator-based streaming
    from elasticsearch.helpers import bulk

    start_time = time.perf_counter()

    # Generate bulk actions using memory-efficient generators
    if isinstance(obj, pl.DataFrame):
        actions = _generate_actions_from_polars(obj, index_name)
    else:  # pandas
        actions = _generate_actions_from_pandas(obj, index_name)

    # Perform bulk indexing
    success_count, failed_items = bulk(
        client,
        actions,
        chunk_size=self.bulk_size,
        raise_on_error=False,
        raise_on_exception=False,
    )

    elapsed_ms = (time.perf_counter() - start_time) * 1000

    # Refresh index to make documents searchable
    client.indices.refresh(index=index_name)

    # Handle any errors
    if failed_items:
        error_sample = failed_items[:5]  # First 5 errors
        context.log.warning(
            f"Failed to index {len(failed_items)} documents. Sample errors: {error_sample}"
        )

    # Add metadata
    context.add_output_metadata(
        {
            "index_name": index_name,
            "document_count": success_count,
            "failed_count": len(failed_items) if failed_items else 0,
            "bulk_size": self.bulk_size,
            "index_time_ms": round(elapsed_ms, 2),
            "elasticsearch_host": self.hosts[0],
        }
    )

    context.log.info(
        f"Successfully indexed {success_count:,} documents to '{index_name}' "
        f"in {elapsed_ms:.0f}ms"
    )

load_input

load_input(context: InputContext) -> pl.DataFrame

Load DataFrame from Elasticsearch index.

Retrieves all documents from the index and returns as Polars DataFrame. For large indices, consider using scroll API or filtering.

Parameters:

Name Type Description Default
context InputContext

Dagster input context

required

Returns:

Type Description
DataFrame

Polars DataFrame with all documents from index

Source code in cogapp_libs/dagster/io_managers.py
def load_input(self, context: InputContext) -> pl.DataFrame:
    """Load DataFrame from Elasticsearch index.

    Retrieves all documents from the index and returns as Polars DataFrame.
    For large indices, consider using scroll API or filtering.

    Args:
        context: Dagster input context

    Returns:
        Polars DataFrame with all documents from index
    """
    import polars as pl

    client = self._get_client()
    index_name = self._get_index_name(context)

    # Check if index exists
    if not client.indices.exists(index=index_name):
        raise ValueError(
            f"Index '{index_name}' does not exist. Materialize the upstream asset first."
        )

    # Get all documents (for large indices, use scan/scroll instead)
    # Get document count first
    count_response = client.count(index=index_name)
    doc_count = count_response["count"]

    context.log.info(f"Loading {doc_count:,} documents from '{index_name}'")

    # Fetch all documents
    # Note: For large datasets (>10k docs), use elasticsearch.helpers.scan
    if doc_count > 10000:
        context.log.warning(
            f"Large index ({doc_count:,} docs). Consider using scan API for better performance."
        )

    response = client.search(
        index=index_name,
        size=min(doc_count, 10000),  # Elasticsearch max size
        query={"match_all": {}},
    )

    # Extract _source from hits
    records = [hit["_source"] for hit in response["hits"]["hits"]]

    # Convert to Polars DataFrame
    if records:
        df = pl.DataFrame(records)
    else:
        df = pl.DataFrame()  # Empty DataFrame

    context.log.info(f"Loaded {len(df):,} records from '{index_name}'")

    return df

Example:

from cogapp_libs.dagster import ElasticsearchIOManager

defs = dg.Definitions(
    resources={
        "elasticsearch_io_manager": ElasticsearchIOManager(
            hosts=["http://localhost:9200"],
            index_prefix="dagster_",
            api_key=os.getenv("ELASTICSEARCH_API_KEY"),
            bulk_size=1000,
        ),
    },
)

@dg.asset(io_manager_key="elasticsearch_io_manager")
def sales_searchable(sales_transform: pl.DataFrame) -> pl.DataFrame:
    return sales_transform  # Automatically indexed

OpenSearch

OpenSearchIOManager

Bases: IOManager

IO Manager for writing DataFrames to OpenSearch (AWS fork of Elasticsearch).

OpenSearch is AWS's fork of Elasticsearch 7.x. This IO Manager provides the same functionality as ElasticsearchIOManager but uses the opensearch-py client.

Features: - Bulk indexing for high performance (configurable batch size) - Automatic index creation with dynamic or custom mappings - Support for OpenSearch 1.x, 2.x - Connection pooling and retry logic - Metadata tracking (document count, index name, response time) - Handles both write (dump) and read (load) operations

Parameters:

Name Type Description Default
hosts list[str]

OpenSearch host(s) (e.g., ["https://search-domain.us-east-1.es.amazonaws.com"])

required
index_prefix str

Prefix for index names (e.g., "dagster_")

'dagster_'
http_auth tuple[str, str] | None

Tuple of (username, password) for basic auth

None
use_ssl bool

Whether to use SSL (default: True)

True
verify_certs bool

Whether to verify SSL certificates (default: True)

True
bulk_size int

Number of documents per bulk request (default: 500)

500
custom_mappings dict | None

Optional dict of {asset_name: mapping_dict}

None
aws_auth bool

Whether to use AWS SigV4 auth (requires boto3)

False
region str | None

AWS region (required if aws_auth=True)

None
Environment Variables

OPENSEARCH_HOST: OpenSearch endpoint URL OPENSEARCH_USER: Username for basic auth OPENSEARCH_PASSWORD: Password for basic auth AWS_REGION: AWS region for SigV4 auth

Example (Basic Auth):

defs = dg.Definitions(
    resources={
        "opensearch_io_manager": OpenSearchIOManager(
            hosts=["https://localhost:9200"],
            http_auth=("admin", "admin"),
            verify_certs=False,  # For local dev
        ),
    },
)

Example (AWS SigV4 Auth):

defs = dg.Definitions(
    resources={
        "opensearch_io_manager": OpenSearchIOManager(
            hosts=["https://search-domain.us-east-1.es.amazonaws.com"],
            aws_auth=True,
            region="us-east-1",
        ),
    },
)

Example (Asset):

@dg.asset(io_manager_key="opensearch_io_manager")
def sales_output(sales_transform: pl.DataFrame) -> pl.DataFrame:
    return sales_transform

Source code in cogapp_libs/dagster/io_managers.py
def __init__(
    self,
    hosts: list[str],
    index_prefix: str = "dagster_",
    http_auth: tuple[str, str] | None = None,
    use_ssl: bool = True,
    verify_certs: bool = True,
    bulk_size: int = 500,
    custom_mappings: dict[str, dict] | None = None,
    aws_auth: bool = False,
    region: str | None = None,
    **opensearch_kwargs,
):
    self.hosts = hosts
    self.index_prefix = index_prefix
    self.http_auth = http_auth
    self.use_ssl = use_ssl
    self.verify_certs = verify_certs
    self.bulk_size = bulk_size
    self.custom_mappings = custom_mappings or {}
    self.aws_auth = aws_auth
    self.region = region
    self.opensearch_kwargs = opensearch_kwargs
    self._client = None

handle_output

handle_output(context: OutputContext, obj: DataFrame | DataFrame)

Write DataFrame to OpenSearch using bulk API.

Uses generator-based streaming for memory efficiency - never materializes the full DataFrame as a list of dicts.

Source code in cogapp_libs/dagster/io_managers.py
def handle_output(self, context: OutputContext, obj: pd.DataFrame | pl.DataFrame):
    """Write DataFrame to OpenSearch using bulk API.

    Uses generator-based streaming for memory efficiency - never materializes
    the full DataFrame as a list of dicts.
    """
    import time

    import polars as pl

    client = self._get_client()
    index_name = self._get_index_name(context)
    asset_name = context.asset_key.path[-1]

    self._create_index_if_not_exists(client, index_name, asset_name, context)

    # Get record count without materializing
    record_count = len(obj)
    context.log.info(f"Indexing {record_count:,} documents to '{index_name}'")

    # Bulk index using generator-based streaming
    from opensearchpy.helpers import bulk

    start_time = time.perf_counter()

    # Generate bulk actions using memory-efficient generators
    if isinstance(obj, pl.DataFrame):
        actions = _generate_actions_from_polars(obj, index_name)
    else:  # pandas
        actions = _generate_actions_from_pandas(obj, index_name)

    success_count, failed_items = bulk(
        client,
        actions,
        chunk_size=self.bulk_size,
        raise_on_error=False,
        raise_on_exception=False,
    )

    elapsed_ms = (time.perf_counter() - start_time) * 1000

    # Refresh index
    client.indices.refresh(index=index_name)

    if failed_items:
        error_sample = failed_items[:5]
        context.log.warning(
            f"Failed to index {len(failed_items)} documents. Sample errors: {error_sample}"
        )

    context.add_output_metadata(
        {
            "index_name": index_name,
            "document_count": success_count,
            "failed_count": len(failed_items) if failed_items else 0,
            "bulk_size": self.bulk_size,
            "index_time_ms": round(elapsed_ms, 2),
            "opensearch_host": self.hosts[0],
        }
    )

    context.log.info(
        f"Successfully indexed {success_count:,} documents to '{index_name}' "
        f"in {elapsed_ms:.0f}ms"
    )

load_input

load_input(context: InputContext) -> pl.DataFrame

Load DataFrame from OpenSearch index.

Source code in cogapp_libs/dagster/io_managers.py
def load_input(self, context: InputContext) -> pl.DataFrame:
    """Load DataFrame from OpenSearch index."""
    import polars as pl

    client = self._get_client()
    index_name = self._get_index_name(context)

    if not client.indices.exists(index=index_name):
        raise ValueError(
            f"Index '{index_name}' does not exist. Materialize the upstream asset first."
        )

    count_response = client.count(index=index_name)
    doc_count = count_response["count"]

    context.log.info(f"Loading {doc_count:,} documents from '{index_name}'")

    if doc_count > 10000:
        context.log.warning(
            f"Large index ({doc_count:,} docs). Consider using scan helper for better performance."
        )

    response = client.search(
        index=index_name,
        size=min(doc_count, 10000),
        body={"query": {"match_all": {}}},
    )

    records = [hit["_source"] for hit in response["hits"]["hits"]]

    if records:
        df = pl.DataFrame(records)
    else:
        df = pl.DataFrame()

    context.log.info(f"Loaded {len(df):,} records from '{index_name}'")

    return df

Example (AWS):

from cogapp_libs.dagster import OpenSearchIOManager

opensearch_io_manager = OpenSearchIOManager(
    hosts=["https://search-domain.us-east-1.es.amazonaws.com"],
    aws_auth=True,
    region="us-east-1",
)

JSON

JSONIOManager

Bases: UPathIOManager

IO Manager that writes DataFrames to JSON files using DuckDB's native export.

Integrates JSON output writing into Dagster's IO system rather than as a side effect. Supports both Polars and Pandas DataFrames.

Features: - Uses DuckDB's native COPY command for performance - Handles both pandas and polars DataFrames - Automatically creates parent directories - Provides standard metadata (record count, preview, path)

Attributes:

Name Type Description
extension str

File extension (default: ".json")

Example
# In definitions.py:
defs = dg.Definitions(
    assets=[...],
    resources={
        "json_io_manager": JSONIOManager(base_path="data/output/json"),
    },
)

# In asset:
@dg.asset(io_manager_key="json_io_manager")
def sales_output(sales_transform: pl.DataFrame) -> pl.DataFrame:
    # JSON is written automatically by IO manager
    return sales_transform.filter(pl.col("price") > 1000)

dump_to_path

dump_to_path(context: OutputContext, obj: DataFrame | DataFrame, path: UPath)

Write DataFrame to JSON file using DuckDB's native COPY command.

Parameters:

Name Type Description Default
context OutputContext

Dagster output context

required
obj DataFrame | DataFrame

DataFrame to write (pandas or polars)

required
path UPath

UPath to write JSON file

required
Source code in cogapp_libs/dagster/io_managers.py
def dump_to_path(self, context: OutputContext, obj: pd.DataFrame | pl.DataFrame, path: UPath):
    """Write DataFrame to JSON file using DuckDB's native COPY command.

    Args:
        context: Dagster output context
        obj: DataFrame to write (pandas or polars)
        path: UPath to write JSON file
    """
    # Ensure parent directory exists
    path.parent.mkdir(parents=True, exist_ok=True)

    # Convert UPath to string for DuckDB
    output_path_str = str(path)

    # Use DuckDB's native JSON export for performance
    conn = duckdb.connect(":memory:")
    conn.register("_df", obj)
    conn.execute(f"COPY (SELECT * FROM _df) TO '{output_path_str}' (FORMAT JSON, ARRAY true)")

    # Get preview as pandas for markdown rendering
    preview_df = conn.sql("SELECT * FROM _df LIMIT 10").df()
    result = conn.sql("SELECT COUNT(*) FROM _df").fetchone()
    record_count = result[0] if result else 0
    conn.close()

    # Add metadata
    context.add_output_metadata(
        {
            "record_count": record_count,
            "json_output": dg.MetadataValue.path(output_path_str),
            "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
        }
    )

    context.log.info(f"Wrote {record_count:,} records to {path}")

load_from_path

load_from_path(context: InputContext, path: UPath) -> pl.DataFrame

Load DataFrame from JSON file.

Parameters:

Name Type Description Default
context InputContext

Dagster input context

required
path UPath

UPath to read JSON file

required

Returns:

Type Description
DataFrame

Polars DataFrame (for downstream compatibility)

Source code in cogapp_libs/dagster/io_managers.py
def load_from_path(self, context: InputContext, path: UPath) -> pl.DataFrame:
    """Load DataFrame from JSON file.

    Args:
        context: Dagster input context
        path: UPath to read JSON file

    Returns:
        Polars DataFrame (for downstream compatibility)
    """
    import polars as pl

    # Load JSON file as Polars DataFrame (convert UPath to str for polars compatibility)
    df = pl.read_json(str(path))

    context.log.info(f"Loaded {len(df):,} records from {path}")

    return df

Example:

from cogapp_libs.dagster import JSONIOManager

defs = dg.Definitions(
    resources={
        "json_io_manager": JSONIOManager(base_dir="data/output/json"),
    },
)

@dg.asset(io_manager_key="json_io_manager")
def my_asset() -> pl.DataFrame:
    return pl.DataFrame({"id": [1, 2, 3]})

Parquet Path

ParquetPathIOManager

Bases: IOManager

IO Manager that passes parquet file paths between assets.

For DuckDB-native pipelines where assets write parquet directly using COPY ... TO and downstream assets read via read_parquet(). Data never materializes as DataFrames in Python memory.

Flow

Asset writes parquet → returns path string → IO manager stores path → downstream asset receives path → uses read_parquet(path) in SQL

The path is stored in a small .path file alongside the parquet file.

Parameters:

Name Type Description Default
base_dir str

Base directory for storing path reference files. Defaults to ".dagster/parquet_paths"

'.dagster/parquet_paths'
Example
# In definitions.py:
defs = dg.Definitions(
    resources={
        "parquet_path_io_manager": ParquetPathIOManager(
            base_dir="data/transforms"
        ),
    },
)

# In asset - write parquet directly, return path:
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_transform(duckdb: DuckDBResource, paths: PathsResource) -> str:
    output = paths.transforms_dir / "sales.parquet"
    with duckdb.get_connection() as conn:
        conn.execute(f"COPY (...) TO '{output}' (FORMAT PARQUET)")
    return str(output)

# Downstream asset receives path string:
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_output(duckdb: DuckDBResource, sales_transform: str) -> str:
    with duckdb.get_connection() as conn:
        conn.execute(f'''
            COPY (SELECT * FROM read_parquet('{sales_transform}') WHERE ...)
            TO 'output.parquet' (FORMAT PARQUET)
        ''')
    return "output.parquet"
Source code in cogapp_libs/dagster/io_managers.py
def __init__(self, base_dir: str = ".dagster/parquet_paths"):
    from pathlib import Path

    self.base_dir = Path(base_dir)
    self.base_dir.mkdir(parents=True, exist_ok=True)

handle_output

handle_output(context: OutputContext, obj: str) -> None

Store the parquet path for downstream assets.

Parameters:

Name Type Description Default
context OutputContext

Dagster output context

required
obj str

Path to the parquet file (string)

required
Source code in cogapp_libs/dagster/io_managers.py
def handle_output(self, context: OutputContext, obj: str) -> None:
    """Store the parquet path for downstream assets.

    Args:
        context: Dagster output context
        obj: Path to the parquet file (string)
    """
    from pathlib import Path

    if not isinstance(obj, str):
        raise TypeError(
            f"ParquetPathIOManager expects a path string, got {type(obj).__name__}. "
            "Asset should write parquet directly and return the path."
        )

    parquet_path = Path(obj)
    if not parquet_path.exists():
        raise FileNotFoundError(
            f"Parquet file not found: {obj}. "
            "Asset must write the parquet file before returning the path."
        )

    # Store path reference
    path_file = self._get_path_file(context)
    path_file.write_text(obj)

    # Add metadata
    file_size_mb = parquet_path.stat().st_size / (1024 * 1024)
    context.add_output_metadata(
        {
            "parquet_path": dg.MetadataValue.path(obj),
            "file_size_mb": round(file_size_mb, 2),
        }
    )

    context.log.info(f"Stored parquet path: {obj} ({file_size_mb:.2f} MB)")

load_input

load_input(context: InputContext) -> str

Load the parquet path for use in downstream SQL.

Parameters:

Name Type Description Default
context InputContext

Dagster input context

required

Returns:

Type Description
str

Path string for use with read_parquet()

Source code in cogapp_libs/dagster/io_managers.py
def load_input(self, context: InputContext) -> str:
    """Load the parquet path for use in downstream SQL.

    Args:
        context: Dagster input context

    Returns:
        Path string for use with read_parquet()
    """
    path_file = self._get_path_file(context)

    if not path_file.exists():
        raise FileNotFoundError(
            f"Path reference not found: {path_file}. Materialize the upstream asset first."
        )

    parquet_path = path_file.read_text().strip()

    context.log.info(f"Loaded parquet path: {parquet_path}")

    return parquet_path

For DuckDB-native pipelines where data stays in parquet format and never materializes as Python DataFrames. Assets write parquet directly and pass file paths to downstream assets.

Example:

from cogapp_libs.dagster import ParquetPathIOManager
from dagster_duckdb import DuckDBResource

defs = dg.Definitions(
    resources={
        "parquet_path_io_manager": ParquetPathIOManager(
            base_dir="data/transforms"
        ),
        "duckdb": DuckDBResource(database=":memory:"),
    },
)

@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_transform(duckdb: DuckDBResource, paths: PathsResource) -> str:
    """Write parquet directly, return path."""
    output_path = paths.transforms_dir / "sales.parquet"

    with duckdb.get_connection() as conn:
        conn.sql("SELECT * FROM ...").write_parquet(str(output_path))

    return str(output_path)  # Return path, not DataFrame


@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_output(duckdb: DuckDBResource, sales_transform: str) -> str:
    """Receive path string, query parquet directly."""
    with duckdb.get_connection() as conn:
        # Use path in SQL query
        result = conn.sql(f"SELECT * FROM '{sales_transform}' WHERE ...")
        result.write_parquet("output.parquet")

    return "output.parquet"

Benefits:

  • Memory efficient: Data stays in DuckDB/Parquet, never loads into Python
  • SQL-native: Use FROM 'path.parquet' directly in queries
  • Streaming: DuckDB processes in batches automatically
  • Soda compatible: Validation checks receive paths to parquet files

See Soda Validation for using this pattern with data quality checks.