IO Managers¶
Custom IO managers for various storage backends.
External Documentation
- Dagster IO Managers: docs.dagster.io/concepts/io-management
- Elasticsearch: elastic.co/guide | Python Client
- OpenSearch: opensearch.org/docs | Python Client
Elasticsearch¶
ElasticsearchIOManager¶
Bases: IOManager
IO Manager for writing DataFrames to Elasticsearch 8/9 and reading them back.
Provides integration with Elasticsearch for storing and querying pipeline outputs. Supports both Polars and Pandas DataFrames with efficient bulk indexing.
Features: - Bulk indexing for high performance (configurable batch size) - Automatic index creation with dynamic or custom mappings - Support for both Elasticsearch 8 and 9 - Connection pooling and retry logic - Metadata tracking (document count, index name, response time) - Handles both write (dump) and read (load) operations
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hosts
|
list[str]
|
Elasticsearch host(s) (e.g., ["http://localhost:9200"]) |
required |
index_prefix
|
str
|
Prefix for index names (e.g., "dagster_") |
'dagster_'
|
api_key
|
str | None
|
Elasticsearch API key (recommended for security) |
None
|
basic_auth
|
tuple[str, str] | None
|
Tuple of (username, password) if not using API key |
None
|
verify_certs
|
bool
|
Whether to verify SSL certificates (default: True) |
True
|
bulk_size
|
int
|
Number of documents per bulk request (default: 500) |
500
|
custom_mappings
|
dict | None
|
Optional dict of {asset_name: mapping_dict} |
None
|
Environment Variables (recommended for credentials): ELASTICSEARCH_HOST: Comma-separated hosts ELASTICSEARCH_API_KEY: API key for authentication ELASTICSEARCH_USER: Username for basic auth ELASTICSEARCH_PASSWORD: Password for basic auth
Example
# In definitions.py:
import os
defs = dg.Definitions(
assets=[sales_output, artworks_output],
resources={
"elasticsearch_io_manager": ElasticsearchIOManager(
hosts=[os.getenv("ELASTICSEARCH_HOST", "http://localhost:9200")],
index_prefix="honey_duck_",
api_key=os.getenv("ELASTICSEARCH_API_KEY"),
bulk_size=1000,
),
},
)
# In asset:
@dg.asset(io_manager_key="elasticsearch_io_manager")
def sales_output(sales_transform: pl.DataFrame) -> pl.DataFrame:
# DataFrame automatically indexed to Elasticsearch
return sales_transform
Index Naming
Indices are named: {index_prefix}{asset_name}_{timestamp} Example: "honey_duck_sales_output_20240115"
Custom Mappings Example
Source code in cogapp_libs/dagster/io_managers.py
handle_output ¶
Write DataFrame to Elasticsearch using bulk API.
Uses generator-based streaming for memory efficiency - never materializes the full DataFrame as a list of dicts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
OutputContext
|
Dagster output context |
required |
obj
|
DataFrame | DataFrame
|
DataFrame to index (pandas or polars) |
required |
Source code in cogapp_libs/dagster/io_managers.py
load_input ¶
Load DataFrame from Elasticsearch index.
Retrieves all documents from the index and returns as Polars DataFrame. For large indices, consider using scroll API or filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
InputContext
|
Dagster input context |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
Polars DataFrame with all documents from index |
Source code in cogapp_libs/dagster/io_managers.py
Example:
from cogapp_libs.dagster import ElasticsearchIOManager
defs = dg.Definitions(
resources={
"elasticsearch_io_manager": ElasticsearchIOManager(
hosts=["http://localhost:9200"],
index_prefix="dagster_",
api_key=os.getenv("ELASTICSEARCH_API_KEY"),
bulk_size=1000,
),
},
)
@dg.asset(io_manager_key="elasticsearch_io_manager")
def sales_searchable(sales_transform: pl.DataFrame) -> pl.DataFrame:
return sales_transform # Automatically indexed
OpenSearch¶
OpenSearchIOManager¶
Bases: IOManager
IO Manager for writing DataFrames to OpenSearch (AWS fork of Elasticsearch).
OpenSearch is AWS's fork of Elasticsearch 7.x. This IO Manager provides the same functionality as ElasticsearchIOManager but uses the opensearch-py client.
Features: - Bulk indexing for high performance (configurable batch size) - Automatic index creation with dynamic or custom mappings - Support for OpenSearch 1.x, 2.x - Connection pooling and retry logic - Metadata tracking (document count, index name, response time) - Handles both write (dump) and read (load) operations
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hosts
|
list[str]
|
OpenSearch host(s) (e.g., ["https://search-domain.us-east-1.es.amazonaws.com"]) |
required |
index_prefix
|
str
|
Prefix for index names (e.g., "dagster_") |
'dagster_'
|
http_auth
|
tuple[str, str] | None
|
Tuple of (username, password) for basic auth |
None
|
use_ssl
|
bool
|
Whether to use SSL (default: True) |
True
|
verify_certs
|
bool
|
Whether to verify SSL certificates (default: True) |
True
|
bulk_size
|
int
|
Number of documents per bulk request (default: 500) |
500
|
custom_mappings
|
dict | None
|
Optional dict of {asset_name: mapping_dict} |
None
|
aws_auth
|
bool
|
Whether to use AWS SigV4 auth (requires boto3) |
False
|
region
|
str | None
|
AWS region (required if aws_auth=True) |
None
|
Environment Variables
OPENSEARCH_HOST: OpenSearch endpoint URL OPENSEARCH_USER: Username for basic auth OPENSEARCH_PASSWORD: Password for basic auth AWS_REGION: AWS region for SigV4 auth
Example (Basic Auth):
defs = dg.Definitions(
resources={
"opensearch_io_manager": OpenSearchIOManager(
hosts=["https://localhost:9200"],
http_auth=("admin", "admin"),
verify_certs=False, # For local dev
),
},
)
Example (AWS SigV4 Auth):
defs = dg.Definitions(
resources={
"opensearch_io_manager": OpenSearchIOManager(
hosts=["https://search-domain.us-east-1.es.amazonaws.com"],
aws_auth=True,
region="us-east-1",
),
},
)
Example (Asset):
@dg.asset(io_manager_key="opensearch_io_manager")
def sales_output(sales_transform: pl.DataFrame) -> pl.DataFrame:
return sales_transform
Source code in cogapp_libs/dagster/io_managers.py
handle_output ¶
Write DataFrame to OpenSearch using bulk API.
Uses generator-based streaming for memory efficiency - never materializes the full DataFrame as a list of dicts.
Source code in cogapp_libs/dagster/io_managers.py
load_input ¶
Load DataFrame from OpenSearch index.
Source code in cogapp_libs/dagster/io_managers.py
Example (AWS):
from cogapp_libs.dagster import OpenSearchIOManager
opensearch_io_manager = OpenSearchIOManager(
hosts=["https://search-domain.us-east-1.es.amazonaws.com"],
aws_auth=True,
region="us-east-1",
)
JSON¶
JSONIOManager¶
Bases: UPathIOManager
IO Manager that writes DataFrames to JSON files using DuckDB's native export.
Integrates JSON output writing into Dagster's IO system rather than as a side effect. Supports both Polars and Pandas DataFrames.
Features: - Uses DuckDB's native COPY command for performance - Handles both pandas and polars DataFrames - Automatically creates parent directories - Provides standard metadata (record count, preview, path)
Attributes:
| Name | Type | Description |
|---|---|---|
extension |
str
|
File extension (default: ".json") |
Example
# In definitions.py:
defs = dg.Definitions(
assets=[...],
resources={
"json_io_manager": JSONIOManager(base_path="data/output/json"),
},
)
# In asset:
@dg.asset(io_manager_key="json_io_manager")
def sales_output(sales_transform: pl.DataFrame) -> pl.DataFrame:
# JSON is written automatically by IO manager
return sales_transform.filter(pl.col("price") > 1000)
dump_to_path ¶
Write DataFrame to JSON file using DuckDB's native COPY command.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
OutputContext
|
Dagster output context |
required |
obj
|
DataFrame | DataFrame
|
DataFrame to write (pandas or polars) |
required |
path
|
UPath
|
UPath to write JSON file |
required |
Source code in cogapp_libs/dagster/io_managers.py
load_from_path ¶
Load DataFrame from JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
InputContext
|
Dagster input context |
required |
path
|
UPath
|
UPath to read JSON file |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
Polars DataFrame (for downstream compatibility) |
Source code in cogapp_libs/dagster/io_managers.py
Example:
from cogapp_libs.dagster import JSONIOManager
defs = dg.Definitions(
resources={
"json_io_manager": JSONIOManager(base_dir="data/output/json"),
},
)
@dg.asset(io_manager_key="json_io_manager")
def my_asset() -> pl.DataFrame:
return pl.DataFrame({"id": [1, 2, 3]})
Parquet Path¶
ParquetPathIOManager¶
Bases: IOManager
IO Manager that passes parquet file paths between assets.
For DuckDB-native pipelines where assets write parquet directly using COPY ... TO and downstream assets read via read_parquet(). Data never materializes as DataFrames in Python memory.
Flow
Asset writes parquet → returns path string → IO manager stores path → downstream asset receives path → uses read_parquet(path) in SQL
The path is stored in a small .path file alongside the parquet file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_dir
|
str
|
Base directory for storing path reference files. Defaults to ".dagster/parquet_paths" |
'.dagster/parquet_paths'
|
Example
# In definitions.py:
defs = dg.Definitions(
resources={
"parquet_path_io_manager": ParquetPathIOManager(
base_dir="data/transforms"
),
},
)
# In asset - write parquet directly, return path:
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_transform(duckdb: DuckDBResource, paths: PathsResource) -> str:
output = paths.transforms_dir / "sales.parquet"
with duckdb.get_connection() as conn:
conn.execute(f"COPY (...) TO '{output}' (FORMAT PARQUET)")
return str(output)
# Downstream asset receives path string:
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_output(duckdb: DuckDBResource, sales_transform: str) -> str:
with duckdb.get_connection() as conn:
conn.execute(f'''
COPY (SELECT * FROM read_parquet('{sales_transform}') WHERE ...)
TO 'output.parquet' (FORMAT PARQUET)
''')
return "output.parquet"
Source code in cogapp_libs/dagster/io_managers.py
handle_output ¶
Store the parquet path for downstream assets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
OutputContext
|
Dagster output context |
required |
obj
|
str
|
Path to the parquet file (string) |
required |
Source code in cogapp_libs/dagster/io_managers.py
load_input ¶
Load the parquet path for use in downstream SQL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
InputContext
|
Dagster input context |
required |
Returns:
| Type | Description |
|---|---|
str
|
Path string for use with read_parquet() |
Source code in cogapp_libs/dagster/io_managers.py
For DuckDB-native pipelines where data stays in parquet format and never materializes as Python DataFrames. Assets write parquet directly and pass file paths to downstream assets.
Example:
from cogapp_libs.dagster import ParquetPathIOManager
from dagster_duckdb import DuckDBResource
defs = dg.Definitions(
resources={
"parquet_path_io_manager": ParquetPathIOManager(
base_dir="data/transforms"
),
"duckdb": DuckDBResource(database=":memory:"),
},
)
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_transform(duckdb: DuckDBResource, paths: PathsResource) -> str:
"""Write parquet directly, return path."""
output_path = paths.transforms_dir / "sales.parquet"
with duckdb.get_connection() as conn:
conn.sql("SELECT * FROM ...").write_parquet(str(output_path))
return str(output_path) # Return path, not DataFrame
@dg.asset(io_manager_key="parquet_path_io_manager")
def sales_output(duckdb: DuckDBResource, sales_transform: str) -> str:
"""Receive path string, query parquet directly."""
with duckdb.get_connection() as conn:
# Use path in SQL query
result = conn.sql(f"SELECT * FROM '{sales_transform}' WHERE ...")
result.write_parquet("output.parquet")
return "output.parquet"
Benefits:
- Memory efficient: Data stays in DuckDB/Parquet, never loads into Python
- SQL-native: Use
FROM 'path.parquet'directly in queries - Streaming: DuckDB processes in batches automatically
- Soda compatible: Validation checks receive paths to parquet files
See Soda Validation for using this pattern with data quality checks.