Skip to content

Processors

Reusable DataFrame transformation classes for data pipelines.

External Documentation


DuckDB Processors

DuckDBQueryProcessor

Query tables in the configured DuckDB database.

Executes SQL against persistent database tables. Requires configure() first. Use for extracting data from the database without DataFrame input.

Example
from cogapp_libs.processors.duckdb import configure
configure(db_path="warehouse.duckdb", read_only=True)

processor = DuckDBQueryProcessor(sql="""
    SELECT s.*, a.title
    FROM raw.sales s
    JOIN raw.artworks a ON s.artwork_id = a.artwork_id
    WHERE s.sale_date > '2024-01-01'
""")
result = processor.process()

For lazy output (enables downstream streaming):

processor = DuckDBQueryProcessor(sql="SELECT * FROM raw.sales", lazy=True)
lazy_result = processor.process()  # Returns LazyFrame

Initialize query processor.

Parameters:

Name Type Description Default
sql str

SQL query to execute against configured database.

required
lazy bool

If True, return LazyFrame for downstream streaming.

False
Source code in cogapp_libs/processors/duckdb/sql.py
def __init__(self, sql: str, lazy: bool = False):
    """Initialize query processor.

    Args:
        sql: SQL query to execute against configured database.
        lazy: If True, return LazyFrame for downstream streaming.
    """
    self.sql = sql
    self.lazy = lazy

process

process() -> pl.DataFrame | pl.LazyFrame

Execute the SQL against configured database and return result.

Returns:

Type Description
DataFrame | LazyFrame

Polars DataFrame or LazyFrame (if lazy=True).

Raises:

Type Description
RuntimeError

If database is not configured or connection fails.

Source code in cogapp_libs/processors/duckdb/sql.py
def process(self) -> pl.DataFrame | pl.LazyFrame:
    """Execute the SQL against configured database and return result.

    Returns:
        Polars DataFrame or LazyFrame (if lazy=True).

    Raises:
        RuntimeError: If database is not configured or connection fails.
    """
    from . import get_connection

    conn = get_connection()
    try:
        result = conn.sql(self.sql).pl()
        return result.lazy() if self.lazy else result
    finally:
        conn.close()

DuckDBSQLProcessor

Transform DataFrames using SQL queries.

Registers input DataFrame as "_input" table and executes SQL in-memory. Use for transforming DataFrames with complex SQL logic.

Example
processor = DuckDBSQLProcessor(sql="""
    SELECT *,
        price * 1.1 AS price_with_tax
    FROM _input
    WHERE status = 'active'
""")
result = processor.process(input_df)

For multi-table operations, pass additional DataFrames via tables:

processor = DuckDBSQLProcessor(sql="""
    SELECT a.*, b.category_name
    FROM _input a
    LEFT JOIN categories b ON a.category_id = b.id
""")
result = processor.process(products_df, tables={"categories": categories_df})

For lazy output (enables downstream streaming):

processor = DuckDBSQLProcessor(sql="SELECT * FROM _input", lazy=True)
lazy_result = processor.process(input_df)  # Returns LazyFrame

Initialize SQL processor.

Parameters:

Name Type Description Default
sql str

SQL query to execute. Use "_input" to reference input DataFrame.

required
lazy bool

If True, return LazyFrame for downstream streaming.

False
Source code in cogapp_libs/processors/duckdb/sql.py
def __init__(self, sql: str, lazy: bool = False):
    """Initialize SQL processor.

    Args:
        sql: SQL query to execute. Use "_input" to reference input DataFrame.
        lazy: If True, return LazyFrame for downstream streaming.
    """
    self.sql = sql
    self.lazy = lazy

process

process(df: DataFrame | LazyFrame, tables: dict[str, DataFrame | LazyFrame] | None = None) -> pl.DataFrame | pl.LazyFrame

Execute the SQL and return result.

Parameters:

Name Type Description Default
df DataFrame | LazyFrame

Input DataFrame/LazyFrame, registered as "_input" table.

required
tables dict[str, DataFrame | LazyFrame] | None

Additional DataFrames to register as named tables for JOINs.

None

Returns:

Type Description
DataFrame | LazyFrame

Polars DataFrame or LazyFrame (if lazy=True).

Source code in cogapp_libs/processors/duckdb/sql.py
def process(
    self,
    df: pl.DataFrame | pl.LazyFrame,
    tables: dict[str, pl.DataFrame | pl.LazyFrame] | None = None,
) -> pl.DataFrame | pl.LazyFrame:
    """Execute the SQL and return result.

    Args:
        df: Input DataFrame/LazyFrame, registered as "_input" table.
        tables: Additional DataFrames to register as named tables for JOINs.

    Returns:
        Polars DataFrame or LazyFrame (if lazy=True).
    """
    import duckdb as ddb

    conn = ddb.connect(":memory:")
    try:
        # Collect LazyFrames for DuckDB registration
        input_df = df.collect() if isinstance(df, pl.LazyFrame) else df
        conn.register("_input", input_df)
        if tables:
            for name, table_df in tables.items():
                resolved = (
                    table_df.collect() if isinstance(table_df, pl.LazyFrame) else table_df
                )
                conn.register(name, resolved)
        result = conn.sql(self.sql).pl()
        return result.lazy() if self.lazy else result
    finally:
        conn.close()

DuckDBWindowProcessor

Execute window functions and return DataFrame with added columns.

Takes an input DataFrame, registers it as a table, applies window functions, and returns the result with new columns added.

Example
processor = DuckDBWindowProcessor(
    exprs={
        "rank_in_artist": "ROW_NUMBER() OVER (PARTITION BY artist ORDER BY price DESC)",
        "artist_total": "SUM(price) OVER (PARTITION BY artist)",
    }
)
df_with_windows = processor.process(input_df)

Initialize window processor.

Parameters:

Name Type Description Default
exprs dict[str, str]

Dict of {output_col: window_expression}. Each expression should be a valid SQL window function.

required
Source code in cogapp_libs/processors/duckdb/window.py
def __init__(self, exprs: dict[str, str]):
    """Initialize window processor.

    Args:
        exprs: Dict of {output_col: window_expression}.
               Each expression should be a valid SQL window function.
    """
    self.exprs = exprs

process

process(df: DataFrame, conn: 'duckdb.DuckDBPyConnection | None' = None) -> pd.DataFrame

Apply window functions to DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
conn 'duckdb.DuckDBPyConnection | None'

Optional DuckDB connection. If not provided, uses in-memory connection.

None

Returns:

Type Description
DataFrame

DataFrame with window columns added.

Source code in cogapp_libs/processors/duckdb/window.py
def process(
    self,
    df: pd.DataFrame,
    conn: "duckdb.DuckDBPyConnection | None" = None,
) -> pd.DataFrame:
    """Apply window functions to DataFrame.

    Args:
        df: Input DataFrame.
        conn: Optional DuckDB connection. If not provided, uses in-memory connection.

    Returns:
        DataFrame with window columns added.
    """
    import duckdb as ddb

    # For window operations, we always use in-memory since we're working
    # with a DataFrame that's passed in (not reading from persistent tables)
    should_close = conn is None
    conn = conn or ddb.connect(":memory:")

    try:
        conn.register("_input", df)
        sql = self._generate_sql()
        return conn.sql(sql).df()
    finally:
        if should_close:
            conn.close()

DuckDBAggregateProcessor

Execute group-by aggregations and return DataFrame.

Takes an input DataFrame, registers it as a table, applies aggregations, and returns the grouped result.

Example
processor = DuckDBAggregateProcessor(
    group_cols=["artwork_id"],
    agg_exprs={
        "total_count": "COUNT(*)",
        "total_value": "SUM(sale_price_usd)",
        "avg_price": "ROUND(AVG(sale_price_usd), 0)",
    },
)
aggregated_df = processor.process(sales_df)

Initialize aggregate processor.

Parameters:

Name Type Description Default
group_cols list[str]

Columns to group by.

required
agg_exprs dict[str, str]

Dict of {output_col: sql_aggregation_expression}.

required
Source code in cogapp_libs/processors/duckdb/aggregate.py
def __init__(
    self,
    group_cols: list[str],
    agg_exprs: dict[str, str],  # {output_col: sql_expr}
):
    """Initialize aggregate processor.

    Args:
        group_cols: Columns to group by.
        agg_exprs: Dict of {output_col: sql_aggregation_expression}.
    """
    self.group_cols = group_cols
    self.agg_exprs = agg_exprs

process

process(df: DataFrame, conn: 'duckdb.DuckDBPyConnection | None' = None) -> pd.DataFrame

Apply aggregations to DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
conn 'duckdb.DuckDBPyConnection | None'

Optional DuckDB connection. If not provided, uses in-memory connection.

None

Returns:

Type Description
DataFrame

Aggregated DataFrame.

Source code in cogapp_libs/processors/duckdb/aggregate.py
def process(
    self,
    df: pd.DataFrame,
    conn: "duckdb.DuckDBPyConnection | None" = None,
) -> pd.DataFrame:
    """Apply aggregations to DataFrame.

    Args:
        df: Input DataFrame.
        conn: Optional DuckDB connection. If not provided, uses in-memory connection.

    Returns:
        Aggregated DataFrame.
    """
    import duckdb as ddb

    # For aggregations, we always use in-memory since we're working
    # with a DataFrame that's passed in (not reading from persistent tables)
    should_close = conn is None
    conn = conn or ddb.connect(":memory:")

    try:
        conn.register("_input", df)
        sql = self._generate_sql()
        return conn.sql(sql).df()
    finally:
        if should_close:
            conn.close()

DuckDBJoinProcessor

Execute multi-table joins and return DataFrame.

.. deprecated:: Use DuckDBQueryProcessor (for database queries) or DuckDBSQLProcessor (for DataFrame transforms) with explicit SQL instead. Explicit SQL is more readable and universally understood.

Instead of:
    DuckDBJoinProcessor(
        base_table="sales",
        joins=[("artworks", "a.artwork_id", "artwork_id")],
        select_cols=["a.*", "b.title"],
    ).process()

Use:
    DuckDBQueryProcessor(sql='''
        SELECT s.*, a.title
        FROM sales s
        LEFT JOIN artworks a ON s.artwork_id = a.artwork_id
    ''').process()

Alias convention (if you must use this class): - Base table: aliased as 'a' - First join: aliased as 'b' - Second join: aliased as 'c' - And so on...

Initialize join processor.

Parameters:

Name Type Description Default
joins list[tuple[str, str, str]]

List of (table, left_key, right_key) tuples. Tables are aliased as 'b', 'c', 'd', etc. If left_key contains '.', used as-is; otherwise prefixed with 'a.'.

required
select_cols list[str] | None

Columns to select. Defaults to ["*"].

None
join_type str

Type of join (LEFT, INNER, etc.). Defaults to LEFT.

'LEFT'
base_table str | None

Name of base table (aliased as 'a'). If None, expects DataFrame input in process().

None
Source code in cogapp_libs/processors/duckdb/join.py
def __init__(
    self,
    joins: list[tuple[str, str, str]],  # (table, left_key, right_key)
    select_cols: list[str] | None = None,
    join_type: str = "LEFT",
    base_table: str | None = None,
):
    """Initialize join processor.

    Args:
        joins: List of (table, left_key, right_key) tuples.
               Tables are aliased as 'b', 'c', 'd', etc.
               If left_key contains '.', used as-is; otherwise prefixed with 'a.'.
        select_cols: Columns to select. Defaults to ["*"].
        join_type: Type of join (LEFT, INNER, etc.). Defaults to LEFT.
        base_table: Name of base table (aliased as 'a'). If None, expects
                   DataFrame input in process().
    """
    warnings.warn(
        "DuckDBJoinProcessor is deprecated. Use DuckDBQueryProcessor or "
        "DuckDBSQLProcessor with explicit SQL instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.base_table = base_table
    self.joins = joins
    self.select_cols = select_cols or ["*"]
    self.join_type = join_type.upper()

process

process(df: DataFrame | None = None, conn: 'duckdb.DuckDBPyConnection | None' = None) -> pd.DataFrame

Execute the join and return result as DataFrame.

Parameters:

Name Type Description Default
df DataFrame | None

Input DataFrame to use as base table. If provided, it's registered as "_input" and used as the base. Required if base_table not set.

None
conn 'duckdb.DuckDBPyConnection | None'

Optional DuckDB connection. If not provided, uses configured connection.

None

Returns:

Type Description
DataFrame

DataFrame with joined data.

Source code in cogapp_libs/processors/duckdb/join.py
def process(
    self,
    df: pd.DataFrame | None = None,
    conn: "duckdb.DuckDBPyConnection | None" = None,
) -> pd.DataFrame:
    """Execute the join and return result as DataFrame.

    Args:
        df: Input DataFrame to use as base table. If provided, it's registered
            as "_input" and used as the base. Required if base_table not set.
        conn: Optional DuckDB connection. If not provided, uses configured connection.

    Returns:
        DataFrame with joined data.
    """
    from . import get_connection

    # Determine base table
    if df is not None:
        base = "_input"
    elif self.base_table is not None:
        base = self.base_table
    else:
        raise ValueError("Either provide df argument or set base_table in __init__")

    should_close = conn is None
    conn = conn or get_connection()

    try:
        if df is not None:
            conn.register("_input", df)

        sql = self._generate_sql(base)
        return conn.sql(sql).df()
    finally:
        if should_close:
            conn.close()

Polars Processors

PolarsFilterProcessor

High-performance filter processor using Polars.

Supports lazy evaluation when used in a Chain for query optimization.

Example
processor = PolarsFilterProcessor("price", 1000, ">=")
filtered_df = processor.process(df)

# Chained with optimization:
chain = Chain([
    PolarsStringProcessor("name", "upper"),
    PolarsFilterProcessor("price", 1000, ">="),
])
result = chain.process(df)  # single optimized query
Source code in cogapp_libs/processors/polars/filter.py
def __init__(self, column: str, threshold: float, operator: str = ">="):
    if not POLARS_AVAILABLE:
        raise ImportError(
            "polars is required for PolarsFilterProcessor. Install with: pip install polars"
        )
    self.column = column
    self.threshold = threshold
    self.operator = operator

process

process(df: 'pl.LazyFrame') -> 'pl.LazyFrame'
process(df: 'pl.DataFrame') -> 'pl.DataFrame'
process(df: DataFrame) -> 'pl.DataFrame'
process(df: DataFrame | 'pl.DataFrame' | 'pl.LazyFrame') -> 'pl.DataFrame | pl.LazyFrame'

Apply the filter.

Parameters:

Name Type Description Default
df DataFrame | 'pl.DataFrame' | 'pl.LazyFrame'

Input DataFrame (pandas, polars DataFrame, or polars LazyFrame)

required

Returns:

Type Description
'pl.DataFrame | pl.LazyFrame'

Filtered DataFrame/LazyFrame (same type as input for polars types)

Source code in cogapp_libs/processors/polars/filter.py
def process(
    self, df: pd.DataFrame | "pl.DataFrame" | "pl.LazyFrame"
) -> "pl.DataFrame | pl.LazyFrame":
    """Apply the filter.

    Args:
        df: Input DataFrame (pandas, polars DataFrame, or polars LazyFrame)

    Returns:
        Filtered DataFrame/LazyFrame (same type as input for polars types)
    """
    # LazyFrame in -> LazyFrame out (for streaming)
    if isinstance(df, pl.LazyFrame):
        return self._apply(df)

    # Convert pandas to polars if needed
    if isinstance(df, pd.DataFrame):
        pl_df = pl.from_pandas(df)
    else:
        pl_df = df

    # DataFrame in -> DataFrame out
    return self._apply(pl_df.lazy()).collect()

PolarsStringProcessor

High-performance string processor using Polars.

Significantly faster than pandas for string operations on large datasets. Automatically handles pandas <-> polars conversion. Supports lazy evaluation when used in a Chain.

Example
processor = PolarsStringProcessor("artist_name", "upper")
result_df = processor.process(df)  # accepts pandas or polars

# Chained with optimization:
chain = Chain([
    PolarsStringProcessor("name", "upper"),
    PolarsFilterProcessor("price", 1000, ">="),
])
result = chain.process(df)  # single optimized query
Source code in cogapp_libs/processors/polars/string_transform.py
def __init__(self, column: str, transform: TransformType = "upper"):
    if not POLARS_AVAILABLE:
        raise ImportError(
            "polars is required for PolarsStringProcessor. Install with: pip install polars"
        )
    self.column = column
    self.transform = transform

process

process(df: 'pl.LazyFrame') -> 'pl.LazyFrame'
process(df: 'pl.DataFrame') -> 'pl.DataFrame'
process(df: DataFrame) -> 'pl.DataFrame'
process(df: DataFrame | 'pl.DataFrame' | 'pl.LazyFrame') -> 'pl.DataFrame | pl.LazyFrame'

Apply the string transformation.

Parameters:

Name Type Description Default
df DataFrame | 'pl.DataFrame' | 'pl.LazyFrame'

Input DataFrame (pandas, polars DataFrame, or polars LazyFrame)

required

Returns:

Type Description
'pl.DataFrame | pl.LazyFrame'

Transformed DataFrame/LazyFrame (same type as input for polars types)

Source code in cogapp_libs/processors/polars/string_transform.py
def process(
    self, df: pd.DataFrame | "pl.DataFrame" | "pl.LazyFrame"
) -> "pl.DataFrame | pl.LazyFrame":
    """Apply the string transformation.

    Args:
        df: Input DataFrame (pandas, polars DataFrame, or polars LazyFrame)

    Returns:
        Transformed DataFrame/LazyFrame (same type as input for polars types)
    """
    # LazyFrame in -> LazyFrame out (for streaming)
    if isinstance(df, pl.LazyFrame):
        return self._apply(df)

    # Convert pandas to polars if needed
    if isinstance(df, pd.DataFrame):
        pl_df = pl.from_pandas(df)
    else:
        pl_df = df

    # DataFrame in -> DataFrame out
    return self._apply(pl_df.lazy()).collect()

Chaining

Chain

Chain multiple Polars processors with lazy evaluation optimization.

All processors in the chain must have a _apply(LazyFrame) -> LazyFrame method. The entire chain is executed as a single optimized Polars query.

Example

from cogapp_libs.processors import Chain from cogapp_libs.processors.polars import PolarsStringProcessor, PolarsFilterProcessor

chain = Chain([ PolarsStringProcessor("name", "upper"), PolarsFilterProcessor("price", 1000, ">="), ]) result = chain.process(df) # single optimized query

Source code in cogapp_libs/processors/chain.py
def __init__(self, processors: list):
    if not processors:
        raise ValueError("Chain requires at least one processor")

    # Validate all processors support lazy evaluation
    for p in processors:
        if not hasattr(p, "_apply"):
            raise TypeError(
                f"{type(p).__name__} does not support chaining. "
                "Polars processors must have an _apply(LazyFrame) method."
            )

    self.processors = processors

process

process(df: LazyFrame) -> pl.LazyFrame
process(df: DataFrame) -> pl.DataFrame
process(df: DataFrame) -> pl.DataFrame
process(df: DataFrame | DataFrame | LazyFrame) -> pl.DataFrame | pl.LazyFrame

Execute the processor chain with lazy optimization.

Parameters:

Name Type Description Default
df DataFrame | DataFrame | LazyFrame

Input DataFrame (pandas, polars DataFrame, or polars LazyFrame)

required

Returns:

Type Description
DataFrame | LazyFrame

Processed DataFrame/LazyFrame (same type as input for polars types)

Source code in cogapp_libs/processors/chain.py
def process(
    self, df: pd.DataFrame | pl.DataFrame | pl.LazyFrame
) -> pl.DataFrame | pl.LazyFrame:
    """Execute the processor chain with lazy optimization.

    Args:
        df: Input DataFrame (pandas, polars DataFrame, or polars LazyFrame)

    Returns:
        Processed DataFrame/LazyFrame (same type as input for polars types)
    """
    # Track if input was lazy (for return type)
    input_is_lazy = isinstance(df, pl.LazyFrame)

    # Convert to LazyFrame for processing
    if isinstance(df, pd.DataFrame):
        lf = pl.from_pandas(df).lazy()
    elif isinstance(df, pl.LazyFrame):
        lf = df
    else:
        lf = df.lazy()

    # Apply all processors as lazy operations
    for processor in self.processors:
        lf = processor._apply(lf)

    # LazyFrame in -> LazyFrame out (for streaming)
    if input_is_lazy:
        return lf

    # DataFrame in -> DataFrame out
    return lf.collect()

Process multiple transformations in sequence with lazy optimization for Polars.

from cogapp_libs.processors import Chain
from cogapp_libs.processors.polars import PolarsFilterProcessor, PolarsStringProcessor

chain = Chain([
    PolarsStringProcessor("name", "upper"),
    PolarsFilterProcessor("price", 1000, ">="),
])

result = chain.process(df)  # Single optimized query