XML Harvesting¶
Parse XML files into Polars DataFrames, with nested elements stored as JSON columns.
Quick Start¶
import dagster as dg
import polars as pl
from pathlib import Path
from cogapp_libs.dagster import add_dataframe_metadata, parse_xml_file, track_timing
@dg.asset(group_name="harvest", kinds={"xml"})
def plant_catalog(context: dg.AssetExecutionContext) -> pl.DataFrame:
"""Harvest plant catalog from XML."""
with track_timing(context, "XML parsing"):
records = parse_xml_file(
file_path=Path("examples/data/plant_catalog.xml"),
record_path="./PLANT", # Direct children of root
fields={
"plant_id": "./@id",
"common_name": "./COMMON/text()",
"botanical_name": "./BOTANICAL/text()",
"zone": "./ZONE/text()",
"light": "./LIGHT/text()",
"price": "./PRICE/text()",
},
nested_fields={
"images": "./IMAGES/IMAGE",
"companions": "./COMPANIONS/PLANT",
},
)
df = pl.DataFrame(records)
# Rich metadata: record count, columns, markdown preview table
add_dataframe_metadata(context, df, unique_zones=df["zone"].n_unique())
return df
Output:
shape: (5, 8)
┌──────────┬─────────────┬─────────────────┬────────┬──────────────┬───────┬─────────────────────┬─────────────────────┐
│ plant_id ┆ common_name ┆ botanical_name ┆ zone ┆ light ┆ price ┆ images ┆ companions │
╞══════════╪═════════════╪═════════════════╪════════╪══════════════╪═══════╪═════════════════════╪═════════════════════╡
│ P001 ┆ Bloodroot ┆ Sanguinaria ... ┆ 4 ┆ Mostly Shady ┆ $2.44 ┆ [{"type": "thumb... ┆ [{"value": "Tril... │
│ P002 ┆ Columbine ┆ Aquilegia ... ┆ 3 ┆ Mostly Shady ┆ $9.37 ┆ [{"type": "thumb... ┆ [{"value": "Fern"}] │
│ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... │
└──────────┴─────────────┴─────────────────┴────────┴──────────────┴───────┴─────────────────────┴─────────────────────┘
Field Mapping¶
The fields parameter maps output column names to XPath expressions that extract values from each record element.
Example XML¶
<?xml version="1.0" encoding="UTF-8"?>
<CATALOG>
<PLANT id="P001">
<COMMON>Bloodroot</COMMON>
<BOTANICAL>Sanguinaria canadensis</BOTANICAL>
<ZONE>4</ZONE>
<LIGHT>Mostly Shady</LIGHT>
<PRICE>$2.44</PRICE>
<IMAGES>
<IMAGE type="thumbnail">bloodroot_thumb.jpg</IMAGE>
<IMAGE type="full">bloodroot_full.jpg</IMAGE>
</IMAGES>
<COMPANIONS>
<PLANT>Trillium</PLANT>
<PLANT>Hepatica</PLANT>
</COMPANIONS>
</PLANT>
</CATALOG>
Mapping This XML¶
records = parse_xml_file(
file_path=Path("plant_catalog.xml"),
# record_path: Which elements are the "rows"?
# Use "./PLANT" for direct children of root (not "//PLANT" which matches nested ones too)
record_path="./PLANT",
# fields: Simple values - one value per record
fields={
"plant_id": "./@id", # Attribute on PLANT element
"common_name": "./COMMON/text()", # Text inside <COMMON>
"botanical_name": "./BOTANICAL/text()",
"zone": "./ZONE/text()",
"light": "./LIGHT/text()",
"price": "./PRICE/text()",
},
# nested_fields: Repeated elements → JSON array
nested_fields={
"images": "./IMAGES/IMAGE", # All <IMAGE> elements → JSON
"companions": "./COMPANIONS/PLANT", # All nested <PLANT> elements → JSON
},
)
XPath Patterns¶
| Pattern | What it extracts | Example |
|---|---|---|
./@attr |
Attribute on current element | ./@id → "P001" |
./ELEM/text() |
Text content of child element | ./COMMON/text() → "Bloodroot" |
./PARENT/CHILD/text() |
Nested element text | ./PRICING/RETAIL/text() |
./PARENT/@attr |
Attribute on child element | ./IMAGE/@type → "thumbnail" |
Nested Fields → JSON¶
Elements matched by nested_fields are serialized as JSON arrays:
Each <IMAGE> element becomes a JSON object with:
- All attributes (e.g., type="thumbnail")
- Text content as value (e.g., "bloodroot_thumb.jpg")
Result:
[
{"type": "thumbnail", "value": "bloodroot_thumb.jpg"},
{"type": "full", "value": "bloodroot_full.jpg"}
]
S3 Files¶
from cogapp_libs.dagster import parse_xml_s3
@dg.asset(group_name="harvest", kinds={"xml", "s3"})
def s3_xml_harvest(context: dg.AssetExecutionContext) -> pl.DataFrame:
"""Harvest XML from S3 bucket."""
records = parse_xml_s3(
bucket="my-data-bucket",
key="catalogs/plants.xml",
record_path="./PLANT",
fields={
"plant_id": "./@id",
"common_name": "./COMMON/text()",
},
# For LocalStack:
# endpoint_url="http://localhost:4566",
)
return pl.DataFrame(records)
HTTP/RSS Feeds¶
from cogapp_libs.dagster import parse_xml_http
@dg.asset(group_name="harvest", kinds={"xml", "http"})
def rss_feed_harvest(context: dg.AssetExecutionContext) -> pl.DataFrame:
"""Harvest articles from RSS feed."""
records = parse_xml_http(
url="https://example.com/feed.rss",
record_path=".//item",
fields={
"title": "./title/text()",
"link": "./link/text()",
"pub_date": "./pubDate/text()",
},
)
return pl.DataFrame(records)
Large File Streaming¶
For files >500MB, use streaming to avoid memory issues:
from cogapp_libs.dagster import parse_xml_streaming
@dg.asset(group_name="harvest", kinds={"xml"})
def large_xml_harvest(context: dg.AssetExecutionContext) -> pl.DataFrame:
"""Stream large XML file with constant memory."""
records = list(parse_xml_streaming(
file_path=Path("data/large_catalog.xml"),
record_tag="PLANT", # Tag name, NOT XPath
fields={
"plant_id": "./@id",
"common_name": "./COMMON/text()",
},
))
return pl.DataFrame(records)
record_tag vs record_path
Streaming uses record_tag (just the tag name like "PLANT") instead of
record_path (XPath like "./PLANT"). This is required for incremental parsing.
Namespace Handling¶
For namespaced XML (Atom, SOAP, OAI-PMH):
records = parse_xml_file(
file_path=Path("feed.xml"),
record_path=".//atom:entry",
fields={
"id": "./atom:id/text()",
"title": "./atom:title/text()",
"author": "./atom:author/atom:name/text()",
},
namespaces={
"atom": "http://www.w3.org/2005/Atom",
},
)
Available Functions¶
| Function | Use Case |
|---|---|
parse_xml_file |
Local XML files |
parse_xml_string |
XML content as string/bytes |
parse_xml_s3 |
S3 buckets (with optional LocalStack) |
parse_xml_http |
HTTP endpoints, RSS, APIs |
parse_xml_streaming |
Large files (>500MB) |
All functions accept:
- fields: Simple value extraction (one value per record)
- nested_fields: Repeated elements → JSON arrays
- namespaces: XML namespace prefix mappings
Rich Metadata¶
Use the helper functions to add useful metadata to the Dagster UI:
from cogapp_libs.dagster import (
add_dataframe_metadata,
table_preview_to_metadata,
track_timing,
)
@dg.asset(group_name="harvest", kinds={"xml"})
def plant_catalog(context: dg.AssetExecutionContext) -> pl.DataFrame:
# track_timing adds processing_time_ms + logs completion
with track_timing(context, "XML parsing"):
records = parse_xml_file(...)
df = pl.DataFrame(records)
# add_dataframe_metadata adds record_count, columns, preview table
add_dataframe_metadata(
context,
df,
# Custom extras appear in metadata panel
unique_zones=df["zone"].n_unique(),
source_file="plant_catalog.xml",
)
# Add additional preview tables
context.add_output_metadata(
table_preview_to_metadata(
df.select("common_name", "price"),
title="price_list",
header="Price List",
)
)
return df
| Helper | Adds to Metadata |
|---|---|
track_timing(context, "operation") |
processing_time_ms |
add_dataframe_metadata(context, df) |
record_count, columns, preview (markdown table) |
table_preview_to_metadata(df, "title") |
Custom markdown table preview |
Troubleshooting¶
No records found¶
Your record_path doesn't match any elements. Debug with:
import xml.etree.ElementTree as ET
tree = ET.parse("data.xml")
root = tree.getroot()
print(f"Root tag: {root.tag}")
print(f"Children: {[child.tag for child in root]}")
Getting duplicate/nested records¶
If using //ELEM, it matches ALL elements with that tag, including nested ones.
Use ./ELEM to match only direct children of the root.
# BAD: Matches <PLANT> inside <COMPANIONS> too
record_path="//PLANT"
# GOOD: Only top-level <PLANT> elements
record_path="./PLANT"
Namespace prefix not defined¶
Add namespace mappings:
namespaces = {"ns": "http://example.com/schema"}
record_path = ".//ns:record"
fields = {"id": "./ns:id/text()"}
API Reference¶
XML parsing utilities for harvesting XML data into DataFrames.
Simple, dlt-free XML parsing with XPath field mappings. Nested elements are serialized as JSON strings for storage in Parquet.
Example
parse_xml_file ¶
parse_xml_file(file_path: str | Path, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None) -> list[dict[str, Any]]
Parse XML file using XPath field mappings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str | Path
|
Path to XML file |
required |
record_path
|
str
|
XPath to locate record elements (e.g., "//record", ".//item") |
required |
fields
|
dict[str, str]
|
Mapping of field names to XPath expressions relative to record. Example: {"id": "./@id", "name": "./name/text()"} |
required |
nested_fields
|
dict[str, str] | None
|
Mapping of field names to XPath for nested elements. These are serialized as JSON strings. Example: {"tags": "./tags/tag", "media": "./media/item"} |
None
|
namespaces
|
dict[str, str] | None
|
Optional namespace mappings for XPath. Example: {"ns": "http://example.com/schema"} |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of dictionaries, one per record. |
Example
# XML structure:
# <catalog>
# <artwork id="1">
# <title>Starry Night</title>
# <tags><tag>landscape</tag><tag>night</tag></tags>
# </artwork>
# </catalog>
records = parse_xml_file(
file_path="catalog.xml",
record_path="//artwork",
fields={
"artwork_id": "./@id",
"title": "./title/text()",
},
nested_fields={
"tags": "./tags/tag",
},
)
# Returns: [{"artwork_id": "1", "title": "Starry Night", "tags": '[...]'}]
Source code in cogapp_libs/dagster/xml_sources.py
parse_xml_string ¶
parse_xml_string(xml_content: str | bytes, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None) -> list[dict[str, Any]]
Parse XML string using XPath field mappings.
Same as parse_xml_file but accepts XML content directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
xml_content
|
str | bytes
|
XML content as string or bytes |
required |
record_path
|
str
|
XPath to locate record elements |
required |
fields
|
dict[str, str]
|
Mapping of field names to XPath expressions |
required |
nested_fields
|
dict[str, str] | None
|
Mapping of field names to XPath for nested elements |
None
|
namespaces
|
dict[str, str] | None
|
Optional namespace mappings |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of dictionaries, one per record. |
Source code in cogapp_libs/dagster/xml_sources.py
parse_xml_streaming ¶
parse_xml_streaming(file_path: str | Path, record_tag: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None) -> Iterator[dict[str, Any]]
Stream large XML files using iterparse for memory efficiency.
Uses incremental parsing to handle XML files larger than available RAM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str | Path
|
Path to XML file |
required |
record_tag
|
str
|
Tag name for record elements (e.g., "record", "item"). Note: This is a tag name, NOT an XPath expression. |
required |
fields
|
dict[str, str]
|
Field mappings (XPath expressions relative to record element) |
required |
nested_fields
|
dict[str, str] | None
|
Nested field mappings (serialized as JSON) |
None
|
namespaces
|
dict[str, str] | None
|
Optional namespace mappings |
None
|
Yields:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary for each record. |
Example
Source code in cogapp_libs/dagster/xml_sources.py
parse_xml_http ¶
parse_xml_http(url: str, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None, headers: dict[str, str] | None = None, timeout: int = 30) -> list[dict[str, Any]]
Fetch and parse XML from HTTP endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
HTTP/HTTPS URL to fetch XML from |
required |
record_path
|
str
|
XPath to record elements |
required |
fields
|
dict[str, str]
|
Field mappings |
required |
nested_fields
|
dict[str, str] | None
|
Nested field mappings (serialized as JSON) |
None
|
namespaces
|
dict[str, str] | None
|
Optional namespace mappings |
None
|
headers
|
dict[str, str] | None
|
Optional HTTP headers (auth, content-type, etc.) |
None
|
timeout
|
int
|
Request timeout in seconds |
30
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of dictionaries, one per record. |
Example
# RSS feed
records = parse_xml_http(
url="https://example.com/feed.rss",
record_path="//item",
fields={
"title": "./title/text()",
"link": "./link/text()",
"pub_date": "./pubDate/text()",
},
)
# API with auth
records = parse_xml_http(
url="https://api.example.com/data.xml",
record_path="//record",
fields={"id": "./@id", "value": "./value/text()"},
headers={"Authorization": "Bearer token"},
)
Source code in cogapp_libs/dagster/xml_sources.py
parse_xml_s3 ¶
parse_xml_s3(bucket: str, key: str, record_path: str, fields: dict[str, str], nested_fields: dict[str, str] | None = None, namespaces: dict[str, str] | None = None, s3_client: Any | None = None, endpoint_url: str | None = None) -> list[dict[str, Any]]
Download and parse XML from S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket
|
str
|
S3 bucket name |
required |
key
|
str
|
S3 object key (path to XML file) |
required |
record_path
|
str
|
XPath to record elements |
required |
fields
|
dict[str, str]
|
Field mappings |
required |
nested_fields
|
dict[str, str] | None
|
Nested field mappings (serialized as JSON) |
None
|
namespaces
|
dict[str, str] | None
|
Optional namespace mappings |
None
|
s3_client
|
Any | None
|
Optional pre-configured boto3 S3 client |
None
|
endpoint_url
|
str | None
|
Optional S3 endpoint (for LocalStack/MinIO) |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of dictionaries, one per record. |
Example
records = parse_xml_s3(
bucket="my-bucket",
key="data/catalog.xml",
record_path="//artwork",
fields={"id": "./@id", "title": "./title/text()"},
)
# With LocalStack
records = parse_xml_s3(
bucket="test-bucket",
key="catalog.xml",
record_path="//artwork",
fields={...},
endpoint_url="http://localhost:4566",
)