Processors¶
Reusable DataFrame transformation classes for data pipelines.
External Documentation
- DuckDB: duckdb.org/docs | SQL Reference
- Polars: pola.rs/docs | API Reference
DuckDB Processors¶
DuckDBQueryProcessor¶
Query tables in the configured DuckDB database.
Executes SQL against persistent database tables. Requires configure() first. Use for extracting data from the database without DataFrame input.
Example
from cogapp_libs.processors.duckdb import configure
configure(db_path="warehouse.duckdb", read_only=True)
processor = DuckDBQueryProcessor(sql="""
SELECT s.*, a.title
FROM raw.sales s
JOIN raw.artworks a ON s.artwork_id = a.artwork_id
WHERE s.sale_date > '2024-01-01'
""")
result = processor.process()
For lazy output (enables downstream streaming):
processor = DuckDBQueryProcessor(sql="SELECT * FROM raw.sales", lazy=True)
lazy_result = processor.process() # Returns LazyFrame
Initialize query processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str
|
SQL query to execute against configured database. |
required |
lazy
|
bool
|
If True, return LazyFrame for downstream streaming. |
False
|
Source code in cogapp_libs/processors/duckdb/sql.py
process ¶
Execute the SQL against configured database and return result.
Returns:
| Type | Description |
|---|---|
DataFrame | LazyFrame
|
Polars DataFrame or LazyFrame (if lazy=True). |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If database is not configured or connection fails. |
Source code in cogapp_libs/processors/duckdb/sql.py
DuckDBSQLProcessor¶
Transform DataFrames using SQL queries.
Registers input DataFrame as "_input" table and executes SQL in-memory. Use for transforming DataFrames with complex SQL logic.
Example
For multi-table operations, pass additional DataFrames via tables:
processor = DuckDBSQLProcessor(sql="""
SELECT a.*, b.category_name
FROM _input a
LEFT JOIN categories b ON a.category_id = b.id
""")
result = processor.process(products_df, tables={"categories": categories_df})
For lazy output (enables downstream streaming):
processor = DuckDBSQLProcessor(sql="SELECT * FROM _input", lazy=True)
lazy_result = processor.process(input_df) # Returns LazyFrame
Initialize SQL processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str
|
SQL query to execute. Use "_input" to reference input DataFrame. |
required |
lazy
|
bool
|
If True, return LazyFrame for downstream streaming. |
False
|
Source code in cogapp_libs/processors/duckdb/sql.py
process ¶
process(df: DataFrame | LazyFrame, tables: dict[str, DataFrame | LazyFrame] | None = None) -> pl.DataFrame | pl.LazyFrame
Execute the SQL and return result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | LazyFrame
|
Input DataFrame/LazyFrame, registered as "_input" table. |
required |
tables
|
dict[str, DataFrame | LazyFrame] | None
|
Additional DataFrames to register as named tables for JOINs. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame | LazyFrame
|
Polars DataFrame or LazyFrame (if lazy=True). |
Source code in cogapp_libs/processors/duckdb/sql.py
DuckDBWindowProcessor¶
Execute window functions and return DataFrame with added columns.
Takes an input DataFrame, registers it as a table, applies window functions, and returns the result with new columns added.
Example
Initialize window processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exprs
|
dict[str, str]
|
Dict of {output_col: window_expression}. Each expression should be a valid SQL window function. |
required |
Source code in cogapp_libs/processors/duckdb/window.py
process ¶
Apply window functions to DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
conn
|
'duckdb.DuckDBPyConnection | None'
|
Optional DuckDB connection. If not provided, uses in-memory connection. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with window columns added. |
Source code in cogapp_libs/processors/duckdb/window.py
DuckDBAggregateProcessor¶
Execute group-by aggregations and return DataFrame.
Takes an input DataFrame, registers it as a table, applies aggregations, and returns the grouped result.
Example
Initialize aggregate processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_cols
|
list[str]
|
Columns to group by. |
required |
agg_exprs
|
dict[str, str]
|
Dict of {output_col: sql_aggregation_expression}. |
required |
Source code in cogapp_libs/processors/duckdb/aggregate.py
process ¶
Apply aggregations to DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
Input DataFrame. |
required |
conn
|
'duckdb.DuckDBPyConnection | None'
|
Optional DuckDB connection. If not provided, uses in-memory connection. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Aggregated DataFrame. |
Source code in cogapp_libs/processors/duckdb/aggregate.py
DuckDBJoinProcessor¶
Execute multi-table joins and return DataFrame.
.. deprecated:: Use DuckDBQueryProcessor (for database queries) or DuckDBSQLProcessor (for DataFrame transforms) with explicit SQL instead. Explicit SQL is more readable and universally understood.
Instead of:
DuckDBJoinProcessor(
base_table="sales",
joins=[("artworks", "a.artwork_id", "artwork_id")],
select_cols=["a.*", "b.title"],
).process()
Use:
DuckDBQueryProcessor(sql='''
SELECT s.*, a.title
FROM sales s
LEFT JOIN artworks a ON s.artwork_id = a.artwork_id
''').process()
Alias convention (if you must use this class): - Base table: aliased as 'a' - First join: aliased as 'b' - Second join: aliased as 'c' - And so on...
Initialize join processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
joins
|
list[tuple[str, str, str]]
|
List of (table, left_key, right_key) tuples. Tables are aliased as 'b', 'c', 'd', etc. If left_key contains '.', used as-is; otherwise prefixed with 'a.'. |
required |
select_cols
|
list[str] | None
|
Columns to select. Defaults to ["*"]. |
None
|
join_type
|
str
|
Type of join (LEFT, INNER, etc.). Defaults to LEFT. |
'LEFT'
|
base_table
|
str | None
|
Name of base table (aliased as 'a'). If None, expects DataFrame input in process(). |
None
|
Source code in cogapp_libs/processors/duckdb/join.py
process ¶
process(df: DataFrame | None = None, conn: 'duckdb.DuckDBPyConnection | None' = None) -> pd.DataFrame
Execute the join and return result as DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | None
|
Input DataFrame to use as base table. If provided, it's registered as "_input" and used as the base. Required if base_table not set. |
None
|
conn
|
'duckdb.DuckDBPyConnection | None'
|
Optional DuckDB connection. If not provided, uses configured connection. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with joined data. |
Source code in cogapp_libs/processors/duckdb/join.py
Polars Processors¶
PolarsFilterProcessor¶
High-performance filter processor using Polars.
Supports lazy evaluation when used in a Chain for query optimization.
Example
Source code in cogapp_libs/processors/polars/filter.py
process ¶
Apply the filter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | 'pl.DataFrame' | 'pl.LazyFrame'
|
Input DataFrame (pandas, polars DataFrame, or polars LazyFrame) |
required |
Returns:
| Type | Description |
|---|---|
'pl.DataFrame | pl.LazyFrame'
|
Filtered DataFrame/LazyFrame (same type as input for polars types) |
Source code in cogapp_libs/processors/polars/filter.py
PolarsStringProcessor¶
High-performance string processor using Polars.
Significantly faster than pandas for string operations on large datasets. Automatically handles pandas <-> polars conversion. Supports lazy evaluation when used in a Chain.
Example
processor = PolarsStringProcessor("artist_name", "upper")
result_df = processor.process(df) # accepts pandas or polars
# Chained with optimization:
chain = Chain([
PolarsStringProcessor("name", "upper"),
PolarsFilterProcessor("price", 1000, ">="),
])
result = chain.process(df) # single optimized query
Source code in cogapp_libs/processors/polars/string_transform.py
process ¶
Apply the string transformation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | 'pl.DataFrame' | 'pl.LazyFrame'
|
Input DataFrame (pandas, polars DataFrame, or polars LazyFrame) |
required |
Returns:
| Type | Description |
|---|---|
'pl.DataFrame | pl.LazyFrame'
|
Transformed DataFrame/LazyFrame (same type as input for polars types) |
Source code in cogapp_libs/processors/polars/string_transform.py
Chaining¶
Chain¶
Chain multiple Polars processors with lazy evaluation optimization.
All processors in the chain must have a _apply(LazyFrame) -> LazyFrame method.
The entire chain is executed as a single optimized Polars query.
Example
from cogapp_libs.processors import Chain from cogapp_libs.processors.polars import PolarsStringProcessor, PolarsFilterProcessor
chain = Chain([ PolarsStringProcessor("name", "upper"), PolarsFilterProcessor("price", 1000, ">="), ]) result = chain.process(df) # single optimized query
Source code in cogapp_libs/processors/chain.py
process ¶
Execute the processor chain with lazy optimization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | DataFrame | LazyFrame
|
Input DataFrame (pandas, polars DataFrame, or polars LazyFrame) |
required |
Returns:
| Type | Description |
|---|---|
DataFrame | LazyFrame
|
Processed DataFrame/LazyFrame (same type as input for polars types) |
Source code in cogapp_libs/processors/chain.py
Process multiple transformations in sequence with lazy optimization for Polars.