Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"pandas >= 1.3.0",
"rich >= 13.0.0",
"wfdb >= 4.0.0",
"pyarrow >= 15.0.0",
]

# Optional dependencies, installed via `pip install '.[test]'`
Expand Down
81 changes: 81 additions & 0 deletions src/croissant_maker/handlers/parquet_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Parquet file handler for tabular event streams (e.g., MEDS)."""

from pathlib import Path
from typing import Dict

from croissant_maker.handlers.base_handler import FileTypeHandler
from croissant_maker.handlers.utils import compute_file_hash
from pyarrow.parquet import ParquetFile
import pyarrow.types as patypes


class ParquetHandler(FileTypeHandler):
"""
Handler for Parquet files (.parquet) with schema-based type inference.

- Uses pyarrow to read schema and row count without loading full data
- Emits Croissant-compatible column types
- Computes SHA256 for reproducibility
- Keeps memory usage minimal (schema-only)
"""

def can_handle(self, file_path: Path) -> bool:
return file_path.suffix.lower() == ".parquet"

def extract_metadata(self, file_path: Path) -> dict:
"""Extract metadata from a Parquet file via pyarrow schema inspection."""
if not file_path.exists():
raise FileNotFoundError(f"Parquet file not found: {file_path}")

try:
pq = ParquetFile(str(file_path))
schema = pq.schema_arrow
num_rows = pq.metadata.num_rows if pq.metadata is not None else 0

column_types: Dict[str, str] = {}
columns = []
for field in schema:
columns.append(field.name)
column_types[field.name] = self._map_arrow_type_to_croissant(
field.type, patypes
)

file_size = file_path.stat().st_size
sha256_hash = compute_file_hash(file_path)

return {
"file_path": str(file_path),
"file_name": file_path.name,
"file_size": file_size,
"sha256": sha256_hash,
"encoding_format": "application/x-parquet",
"column_types": column_types,
"num_rows": num_rows,
"num_columns": len(columns),
"columns": columns,
}
except Exception as e:
raise ValueError(f"Failed to process Parquet file {file_path}: {e}") from e

@staticmethod
def _map_arrow_type_to_croissant(arrow_type, patypes) -> str:
"""Map pyarrow types to Croissant schema.org data types."""
try:
if patypes.is_integer(arrow_type):
return "sc:Integer"
if patypes.is_floating(arrow_type) or patypes.is_decimal(arrow_type):
return "sc:Float"
if patypes.is_boolean(arrow_type):
return "sc:Boolean"
if patypes.is_timestamp(arrow_type):
return "sc:Date"
if patypes.is_date(arrow_type):
return "sc:Date"
if patypes.is_string(arrow_type) or patypes.is_large_string(arrow_type):
return "sc:Text"
if patypes.is_binary(arrow_type) or patypes.is_large_binary(arrow_type):
return "sc:Text"
except Exception:
# Fallback to text for any exotic or extension types
pass
return "sc:Text"
2 changes: 2 additions & 0 deletions src/croissant_maker/handlers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ def register_all_handlers() -> None:
# Import and register all handlers here
from croissant_maker.handlers.csv_handler import CSVHandler
from croissant_maker.handlers.wfdb_handler import WFDBHandler
from croissant_maker.handlers.parquet_handler import ParquetHandler

register_handler(CSVHandler())
register_handler(WFDBHandler())
register_handler(ParquetHandler())

# Future handlers go here. Example:
# from croissant_maker.handlers.parquet_handler import ParquetHandler
Expand Down
Loading