Skip to content

XML Harvesting

Parse XML files into Polars DataFrames, with nested elements stored as JSON columns.

Quick Start

import dagster as dg
import polars as pl
from pathlib import Path
from cogapp_libs.dagster import add_dataframe_metadata, parse_xml_file, track_timing

@dg.asset(group_name="harvest", kinds={"xml"})
def plant_catalog(context: dg.AssetExecutionContext) -> pl.DataFrame:
    """Harvest plant catalog from XML."""
    with track_timing(context, "XML parsing"):
        records = parse_xml_file(
            file_path=Path("examples/data/plant_catalog.xml"),
            record_path="./PLANT",  # Direct children of root
            fields={
                "plant_id": "./@id",
                "common_name": "./COMMON/text()",
                "botanical_name": "./BOTANICAL/text()",
                "zone": "./ZONE/text()",
                "light": "./LIGHT/text()",
                "price": "./PRICE/text()",
            },
            nested_fields={
                "images": "./IMAGES/IMAGE",
                "companions": "./COMPANIONS/PLANT",
            },
        )
        df = pl.DataFrame(records)

    # Rich metadata: record count, columns, markdown preview table
    add_dataframe_metadata(context, df, unique_zones=df["zone"].n_unique())
    return df

Output:

shape: (5, 8)
┌──────────┬─────────────┬─────────────────┬────────┬──────────────┬───────┬─────────────────────┬─────────────────────┐
│ plant_id ┆ common_name ┆ botanical_name  ┆ zone   ┆ light        ┆ price ┆ images              ┆ companions          │
╞══════════╪═════════════╪═════════════════╪════════╪══════════════╪═══════╪═════════════════════╪═════════════════════╡
│ P001     ┆ Bloodroot   ┆ Sanguinaria ... ┆ 4      ┆ Mostly Shady ┆ $2.44 ┆ [{"type": "thumb... ┆ [{"value": "Tril... │
│ P002     ┆ Columbine   ┆ Aquilegia ...   ┆ 3      ┆ Mostly Shady ┆ $9.37 ┆ [{"type": "thumb... ┆ [{"value": "Fern"}] │
│ ...      ┆ ...         ┆ ...             ┆ ...    ┆ ...          ┆ ...   ┆ ...                 ┆ ...                 │
└──────────┴─────────────┴─────────────────┴────────┴──────────────┴───────┴─────────────────────┴─────────────────────┘


Field Mapping

The fields parameter maps output column names to XPath expressions that extract values from each record element.

Example XML

<?xml version="1.0" encoding="UTF-8"?>
<CATALOG>
  <PLANT id="P001">
    <COMMON>Bloodroot</COMMON>
    <BOTANICAL>Sanguinaria canadensis</BOTANICAL>
    <ZONE>4</ZONE>
    <LIGHT>Mostly Shady</LIGHT>
    <PRICE>$2.44</PRICE>
    <IMAGES>
      <IMAGE type="thumbnail">bloodroot_thumb.jpg</IMAGE>
      <IMAGE type="full">bloodroot_full.jpg</IMAGE>
    </IMAGES>
    <COMPANIONS>
      <PLANT>Trillium</PLANT>
      <PLANT>Hepatica</PLANT>
    </COMPANIONS>
  </PLANT>
</CATALOG>

Mapping This XML

records = parse_xml_file(
    file_path=Path("plant_catalog.xml"),

    # record_path: Which elements are the "rows"?
    # Use "./PLANT" for direct children of root (not "//PLANT" which matches nested ones too)
    record_path="./PLANT",

    # fields: Simple values - one value per record
    fields={
        "plant_id": "./@id",              # Attribute on PLANT element
        "common_name": "./COMMON/text()", # Text inside <COMMON>
        "botanical_name": "./BOTANICAL/text()",
        "zone": "./ZONE/text()",
        "light": "./LIGHT/text()",
        "price": "./PRICE/text()",
    },

    # nested_fields: Repeated elements → JSON array
    nested_fields={
        "images": "./IMAGES/IMAGE",       # All <IMAGE> elements → JSON
        "companions": "./COMPANIONS/PLANT", # All nested <PLANT> elements → JSON
    },
)

XPath Patterns

Pattern What it extracts Example
./@attr Attribute on current element ./@id"P001"
./ELEM/text() Text content of child element ./COMMON/text()"Bloodroot"
./PARENT/CHILD/text() Nested element text ./PRICING/RETAIL/text()
./PARENT/@attr Attribute on child element ./IMAGE/@type"thumbnail"

Nested Fields → JSON

Elements matched by nested_fields are serialized as JSON arrays:

nested_fields={
    "images": "./IMAGES/IMAGE",
}

Each <IMAGE> element becomes a JSON object with: - All attributes (e.g., type="thumbnail") - Text content as value (e.g., "bloodroot_thumb.jpg")

Result:

[
  {"type": "thumbnail", "value": "bloodroot_thumb.jpg"},
  {"type": "full", "value": "bloodroot_full.jpg"}
]


S3 Files

from cogapp_libs.dagster import parse_xml_s3

@dg.asset(group_name="harvest", kinds={"xml", "s3"})
def s3_xml_harvest(context: dg.AssetExecutionContext) -> pl.DataFrame:
    """Harvest XML from S3 bucket."""
    records = parse_xml_s3(
        bucket="my-data-bucket",
        key="catalogs/plants.xml",
        record_path="./PLANT",
        fields={
            "plant_id": "./@id",
            "common_name": "./COMMON/text()",
        },
        # For LocalStack:
        # endpoint_url="http://localhost:4566",
    )

    return pl.DataFrame(records)

HTTP/RSS Feeds

from cogapp_libs.dagster import parse_xml_http

@dg.asset(group_name="harvest", kinds={"xml", "http"})
def rss_feed_harvest(context: dg.AssetExecutionContext) -> pl.DataFrame:
    """Harvest articles from RSS feed."""
    records = parse_xml_http(
        url="https://example.com/feed.rss",
        record_path=".//item",
        fields={
            "title": "./title/text()",
            "link": "./link/text()",
            "pub_date": "./pubDate/text()",
        },
    )

    return pl.DataFrame(records)

Large File Streaming

For files >500MB, use streaming to avoid memory issues:

from cogapp_libs.dagster import parse_xml_streaming

@dg.asset(group_name="harvest", kinds={"xml"})
def large_xml_harvest(context: dg.AssetExecutionContext) -> pl.DataFrame:
    """Stream large XML file with constant memory."""
    records = list(parse_xml_streaming(
        file_path=Path("data/large_catalog.xml"),
        record_tag="PLANT",  # Tag name, NOT XPath
        fields={
            "plant_id": "./@id",
            "common_name": "./COMMON/text()",
        },
    ))

    return pl.DataFrame(records)

record_tag vs record_path

Streaming uses record_tag (just the tag name like "PLANT") instead of record_path (XPath like "./PLANT"). This is required for incremental parsing.


Namespace Handling

For namespaced XML (Atom, SOAP, OAI-PMH):

records = parse_xml_file(
    file_path=Path("feed.xml"),
    record_path=".//atom:entry",
    fields={
        "id": "./atom:id/text()",
        "title": "./atom:title/text()",
        "author": "./atom:author/atom:name/text()",
    },
    namespaces={
        "atom": "http://www.w3.org/2005/Atom",
    },
)

Available Functions

Function Use Case
parse_xml_file Local XML files
parse_xml_string XML content as string/bytes
parse_xml_s3 S3 buckets (with optional LocalStack)
parse_xml_http HTTP endpoints, RSS, APIs
parse_xml_streaming Large files (>500MB)

All functions accept: - fields: Simple value extraction (one value per record) - nested_fields: Repeated elements → JSON arrays - namespaces: XML namespace prefix mappings


Rich Metadata

Use the helper functions to add useful metadata to the Dagster UI:

from cogapp_libs.dagster import (
    add_dataframe_metadata,
    table_preview_to_metadata,
    track_timing,
)

@dg.asset(group_name="harvest", kinds={"xml"})
def plant_catalog(context: dg.AssetExecutionContext) -> pl.DataFrame:
    # track_timing adds processing_time_ms + logs completion
    with track_timing(context, "XML parsing"):
        records = parse_xml_file(...)
        df = pl.DataFrame(records)

    # add_dataframe_metadata adds record_count, columns, preview table
    add_dataframe_metadata(
        context,
        df,
        # Custom extras appear in metadata panel
        unique_zones=df["zone"].n_unique(),
        source_file="plant_catalog.xml",
    )

    # Add additional preview tables
    context.add_output_metadata(
        table_preview_to_metadata(
            df.select("common_name", "price"),
            title="price_list",
            header="Price List",
        )
    )
    return df
Helper Adds to Metadata
track_timing(context, "operation") processing_time_ms
add_dataframe_metadata(context, df) record_count, columns, preview (markdown table)
table_preview_to_metadata(df, "title") Custom markdown table preview

Troubleshooting

No records found

Your record_path doesn't match any elements. Debug with:

import xml.etree.ElementTree as ET
tree = ET.parse("data.xml")
root = tree.getroot()
print(f"Root tag: {root.tag}")
print(f"Children: {[child.tag for child in root]}")

Getting duplicate/nested records

If using //ELEM, it matches ALL elements with that tag, including nested ones. Use ./ELEM to match only direct children of the root.

# BAD: Matches <PLANT> inside <COMPANIONS> too
record_path="//PLANT"

# GOOD: Only top-level <PLANT> elements
record_path="./PLANT"

Namespace prefix not defined

Add namespace mappings:

namespaces = {"ns": "http://example.com/schema"}
record_path = ".//ns:record"
fields = {"id": "./ns:id/text()"}

API Reference

XML parsing utilities for harvesting XML data into DataFrames.

Simple, dlt-free XML parsing with XPath field mappings. Nested elements are serialized as JSON strings for storage in Parquet.

Example
from cogapp_libs.dagster.xml_sources import parse_xml_file

records = parse_xml_file(
    file_path="data.xml",
    record_path="//record",
    fields={
        "id": "./@id",
        "name": "./name/text()",
    },
)
df = pl.DataFrame(records)

parse_xml_file

parse_xml_file(file_path: str | Path, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None) -> list[dict[str, Any]]

Parse XML file using XPath field mappings.

Parameters:

Name Type Description Default
file_path str | Path

Path to XML file

required
record_path str

XPath to locate record elements (e.g., "//record", ".//item")

required
fields dict[str, str]

Mapping of field names to XPath expressions relative to record. Example: {"id": "./@id", "name": "./name/text()"}

required
nested_fields dict[str, str] | None

Mapping of field names to XPath for nested elements. These are serialized as JSON strings. Example: {"tags": "./tags/tag", "media": "./media/item"}

None
namespaces dict[str, str] | None

Optional namespace mappings for XPath. Example: {"ns": "http://example.com/schema"}

None

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries, one per record.

Example
# XML structure:
# <catalog>
#   <artwork id="1">
#     <title>Starry Night</title>
#     <tags><tag>landscape</tag><tag>night</tag></tags>
#   </artwork>
# </catalog>

records = parse_xml_file(
    file_path="catalog.xml",
    record_path="//artwork",
    fields={
        "artwork_id": "./@id",
        "title": "./title/text()",
    },
    nested_fields={
        "tags": "./tags/tag",
    },
)
# Returns: [{"artwork_id": "1", "title": "Starry Night", "tags": '[...]'}]
Source code in cogapp_libs/dagster/xml_sources.py
def parse_xml_file(
    file_path: str | Path,
    record_path: str,
    fields: dict[str, str],
    nested_fields: dict[str, str] | None = None,
    namespaces: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
    """Parse XML file using XPath field mappings.

    Args:
        file_path: Path to XML file
        record_path: XPath to locate record elements (e.g., "//record", ".//item")
        fields: Mapping of field names to XPath expressions relative to record.
            Example: {"id": "./@id", "name": "./name/text()"}
        nested_fields: Mapping of field names to XPath for nested elements.
            These are serialized as JSON strings.
            Example: {"tags": "./tags/tag", "media": "./media/item"}
        namespaces: Optional namespace mappings for XPath.
            Example: {"ns": "http://example.com/schema"}

    Returns:
        List of dictionaries, one per record.

    Example:
        ```python
        # XML structure:
        # <catalog>
        #   <artwork id="1">
        #     <title>Starry Night</title>
        #     <tags><tag>landscape</tag><tag>night</tag></tags>
        #   </artwork>
        # </catalog>

        records = parse_xml_file(
            file_path="catalog.xml",
            record_path="//artwork",
            fields={
                "artwork_id": "./@id",
                "title": "./title/text()",
            },
            nested_fields={
                "tags": "./tags/tag",
            },
        )
        # Returns: [{"artwork_id": "1", "title": "Starry Night", "tags": '[...]'}]
        ```
    """
    file_path = Path(file_path)

    if not file_path.exists():
        raise FileNotFoundError(f"XML file not found: {file_path}")

    tree = ET.parse(file_path)
    root = tree.getroot()
    ns = namespaces or {}
    nested = nested_fields or {}

    # Convert absolute XPath to relative for element.findall()
    # ElementTree's findall on an element doesn't support "//" at start
    search_path = record_path
    if search_path.startswith("//"):
        search_path = "." + search_path

    records = []
    for elem in root.findall(search_path, ns):
        row = _extract_fields(elem, fields, ns)
        row.update(_extract_nested_fields(elem, nested, ns))
        records.append(row)

    return records

parse_xml_string

parse_xml_string(xml_content: str | bytes, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None) -> list[dict[str, Any]]

Parse XML string using XPath field mappings.

Same as parse_xml_file but accepts XML content directly.

Parameters:

Name Type Description Default
xml_content str | bytes

XML content as string or bytes

required
record_path str

XPath to locate record elements

required
fields dict[str, str]

Mapping of field names to XPath expressions

required
nested_fields dict[str, str] | None

Mapping of field names to XPath for nested elements

None
namespaces dict[str, str] | None

Optional namespace mappings

None

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries, one per record.

Source code in cogapp_libs/dagster/xml_sources.py
def parse_xml_string(
    xml_content: str | bytes,
    record_path: str,
    fields: dict[str, str],
    nested_fields: dict[str, str] | None = None,
    namespaces: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
    """Parse XML string using XPath field mappings.

    Same as parse_xml_file but accepts XML content directly.

    Args:
        xml_content: XML content as string or bytes
        record_path: XPath to locate record elements
        fields: Mapping of field names to XPath expressions
        nested_fields: Mapping of field names to XPath for nested elements
        namespaces: Optional namespace mappings

    Returns:
        List of dictionaries, one per record.
    """
    root = ET.fromstring(xml_content)
    ns = namespaces or {}
    nested = nested_fields or {}

    # Convert absolute XPath to relative for element.findall()
    # ElementTree's findall on an element doesn't support "//" at start
    search_path = record_path
    if search_path.startswith("//"):
        search_path = "." + search_path

    records = []
    for elem in root.findall(search_path, ns):
        row = _extract_fields(elem, fields, ns)
        row.update(_extract_nested_fields(elem, nested, ns))
        records.append(row)

    return records

parse_xml_streaming

parse_xml_streaming(file_path: str | Path, record_tag: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None) -> Iterator[dict[str, Any]]

Stream large XML files using iterparse for memory efficiency.

Uses incremental parsing to handle XML files larger than available RAM.

Parameters:

Name Type Description Default
file_path str | Path

Path to XML file

required
record_tag str

Tag name for record elements (e.g., "record", "item"). Note: This is a tag name, NOT an XPath expression.

required
fields dict[str, str]

Field mappings (XPath expressions relative to record element)

required
nested_fields dict[str, str] | None

Nested field mappings (serialized as JSON)

None
namespaces dict[str, str] | None

Optional namespace mappings

None

Yields:

Type Description
dict[str, Any]

Dictionary for each record.

Example
# For large XML files (>500MB)
for record in parse_xml_streaming(
    file_path="large_catalog.xml",
    record_tag="artwork",
    fields={"id": "./@id", "title": "./title/text()"},
):
    process(record)
Source code in cogapp_libs/dagster/xml_sources.py
def parse_xml_streaming(
    file_path: str | Path,
    record_tag: str,
    fields: dict[str, str],
    nested_fields: dict[str, str] | None = None,
    namespaces: dict[str, str] | None = None,
) -> Iterator[dict[str, Any]]:
    """Stream large XML files using iterparse for memory efficiency.

    Uses incremental parsing to handle XML files larger than available RAM.

    Args:
        file_path: Path to XML file
        record_tag: Tag name for record elements (e.g., "record", "item").
            Note: This is a tag name, NOT an XPath expression.
        fields: Field mappings (XPath expressions relative to record element)
        nested_fields: Nested field mappings (serialized as JSON)
        namespaces: Optional namespace mappings

    Yields:
        Dictionary for each record.

    Example:
        ```python
        # For large XML files (>500MB)
        for record in parse_xml_streaming(
            file_path="large_catalog.xml",
            record_tag="artwork",
            fields={"id": "./@id", "title": "./title/text()"},
        ):
            process(record)
        ```
    """
    file_path = Path(file_path)

    if not file_path.exists():
        raise FileNotFoundError(f"XML file not found: {file_path}")

    ns = namespaces or {}
    nested = nested_fields or {}

    # Handle namespaced tags
    tag_to_match = record_tag
    if ns and not record_tag.startswith("{"):
        default_ns = ns.get("", next(iter(ns.values()), ""))
        if default_ns:
            tag_to_match = f"{{{default_ns}}}{record_tag}"

    context = ET.iterparse(file_path, events=("end",))

    for _, elem in context:
        if elem.tag == tag_to_match or elem.tag == record_tag:
            row = _extract_fields(elem, fields, ns)
            row.update(_extract_nested_fields(elem, nested, ns))
            yield row

            # Clear element to free memory
            elem.clear()

parse_xml_http

parse_xml_http(url: str, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> list[dict[str, Any]]

Fetch and parse XML from HTTP endpoint.

Parameters:

Name Type Description Default
url str

HTTP/HTTPS URL to fetch XML from

required
record_path str

XPath to record elements

required
fields dict[str, str]

Field mappings

required
nested_fields dict[str, str] | None

Nested field mappings (serialized as JSON)

None
namespaces dict[str, str] | None

Optional namespace mappings

None
headers dict[str, str] | None

Optional HTTP headers (auth, content-type, etc.)

None
timeout int

Request timeout in seconds

30

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries, one per record.

Example
# RSS feed
records = parse_xml_http(
    url="https://example.com/feed.rss",
    record_path="//item",
    fields={
        "title": "./title/text()",
        "link": "./link/text()",
        "pub_date": "./pubDate/text()",
    },
)

# API with auth
records = parse_xml_http(
    url="https://api.example.com/data.xml",
    record_path="//record",
    fields={"id": "./@id", "value": "./value/text()"},
    headers={"Authorization": "Bearer token"},
)
Source code in cogapp_libs/dagster/xml_sources.py
def parse_xml_http(
    url: str,
    record_path: str,
    fields: dict[str, str],
    nested_fields: dict[str, str] | None = None,
    namespaces: dict[str, str] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> list[dict[str, Any]]:
    """Fetch and parse XML from HTTP endpoint.

    Args:
        url: HTTP/HTTPS URL to fetch XML from
        record_path: XPath to record elements
        fields: Field mappings
        nested_fields: Nested field mappings (serialized as JSON)
        namespaces: Optional namespace mappings
        headers: Optional HTTP headers (auth, content-type, etc.)
        timeout: Request timeout in seconds

    Returns:
        List of dictionaries, one per record.

    Example:
        ```python
        # RSS feed
        records = parse_xml_http(
            url="https://example.com/feed.rss",
            record_path="//item",
            fields={
                "title": "./title/text()",
                "link": "./link/text()",
                "pub_date": "./pubDate/text()",
            },
        )

        # API with auth
        records = parse_xml_http(
            url="https://api.example.com/data.xml",
            record_path="//record",
            fields={"id": "./@id", "value": "./value/text()"},
            headers={"Authorization": "Bearer token"},
        )
        ```
    """
    try:
        import requests
    except ImportError:
        raise ImportError(
            "requests package required for HTTP sources. Install with: pip install requests"
        )

    response = requests.get(url, headers=headers or {}, timeout=timeout)
    response.raise_for_status()

    return parse_xml_string(
        xml_content=response.content,
        record_path=record_path,
        fields=fields,
        nested_fields=nested_fields,
        namespaces=namespaces,
    )

parse_xml_s3

parse_xml_s3(bucket: str, key: str, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None, s3_client: Any | None = None, endpoint_url: str | None = None) -> list[dict[str, Any]]

Download and parse XML from S3.

Parameters:

Name Type Description Default
bucket str

S3 bucket name

required
key str

S3 object key (path to XML file)

required
record_path str

XPath to record elements

required
fields dict[str, str]

Field mappings

required
nested_fields dict[str, str] | None

Nested field mappings (serialized as JSON)

None
namespaces dict[str, str] | None

Optional namespace mappings

None
s3_client Any | None

Optional pre-configured boto3 S3 client

None
endpoint_url str | None

Optional S3 endpoint (for LocalStack/MinIO)

None

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries, one per record.

Example
records = parse_xml_s3(
    bucket="my-bucket",
    key="data/catalog.xml",
    record_path="//artwork",
    fields={"id": "./@id", "title": "./title/text()"},
)

# With LocalStack
records = parse_xml_s3(
    bucket="test-bucket",
    key="catalog.xml",
    record_path="//artwork",
    fields={...},
    endpoint_url="http://localhost:4566",
)
Source code in cogapp_libs/dagster/xml_sources.py
def parse_xml_s3(
    bucket: str,
    key: str,
    record_path: str,
    fields: dict[str, str],
    nested_fields: dict[str, str] | None = None,
    namespaces: dict[str, str] | None = None,
    s3_client: Any | None = None,
    endpoint_url: str | None = None,
) -> list[dict[str, Any]]:
    """Download and parse XML from S3.

    Args:
        bucket: S3 bucket name
        key: S3 object key (path to XML file)
        record_path: XPath to record elements
        fields: Field mappings
        nested_fields: Nested field mappings (serialized as JSON)
        namespaces: Optional namespace mappings
        s3_client: Optional pre-configured boto3 S3 client
        endpoint_url: Optional S3 endpoint (for LocalStack/MinIO)

    Returns:
        List of dictionaries, one per record.

    Example:
        ```python
        records = parse_xml_s3(
            bucket="my-bucket",
            key="data/catalog.xml",
            record_path="//artwork",
            fields={"id": "./@id", "title": "./title/text()"},
        )

        # With LocalStack
        records = parse_xml_s3(
            bucket="test-bucket",
            key="catalog.xml",
            record_path="//artwork",
            fields={...},
            endpoint_url="http://localhost:4566",
        )
        ```
    """
    try:
        import boto3
    except ImportError:
        raise ImportError("boto3 package required for S3 sources. Install with: pip install boto3")

    if s3_client is None:
        config = {"endpoint_url": endpoint_url} if endpoint_url else {}
        s3_client = boto3.client("s3", **config)

    response = s3_client.get_object(Bucket=bucket, Key=key)
    xml_content = response["Body"].read()

    return parse_xml_string(
        xml_content=xml_content,
        record_path=record_path,
        fields=fields,
        nested_fields=nested_fields,
        namespaces=namespaces,
    )