Skip to content

Commit 2dcbaa4

Browse files
author
Tom McCormick
committed
basic iceberg table write for orc
1 parent 355359e commit 2dcbaa4

File tree

2 files changed

+197
-3
lines changed

2 files changed

+197
-3
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 188 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2582,7 +2582,16 @@ def write_orc(task: WriteTask) -> DataFile:
25822582
fo = io.new_output(file_path)
25832583
with fo.create(overwrite=True) as fos:
25842584
orc.write_table(arrow_table, fos)
2585-
# You may want to add statistics extraction here if needed
2585+
2586+
# Extract statistics from the written ORC file
2587+
orc_file = orc.ORCFile(fo.to_input_file().open())
2588+
statistics = data_file_statistics_from_orc_metadata(
2589+
orc_metadata=orc_file,
2590+
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
2591+
orc_column_mapping=orc_column_to_id_mapping(file_schema),
2592+
arrow_table=arrow_table,
2593+
)
2594+
25862595
data_file = DataFile.from_args(
25872596
content=DataFileContent.DATA,
25882597
file_path=file_path,
@@ -2593,7 +2602,7 @@ def write_orc(task: WriteTask) -> DataFile:
25932602
spec_id=table_metadata.default_spec_id,
25942603
equality_ids=None,
25952604
key_metadata=None,
2596-
# statistics=... (if you implement ORC stats)
2605+
**statistics.to_serialized_dict(),
25972606
)
25982607
return data_file
25992608

@@ -2891,3 +2900,180 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
28912900
field_array = arrow_table[path_parts[0]]
28922901
# Navigate into the struct using the remaining path parts
28932902
return pc.struct_field(field_array, path_parts[1:])
2903+
2904+
2905+
def data_file_statistics_from_orc_metadata(
2906+
orc_metadata: "orc.ORCFile",
2907+
stats_columns: Dict[int, StatisticsCollector],
2908+
orc_column_mapping: Dict[str, int],
2909+
arrow_table: Optional[pa.Table] = None,
2910+
) -> DataFileStatistics:
2911+
"""
2912+
Compute and return DataFileStatistics that includes the following.
2913+
2914+
- record_count
2915+
- column_sizes
2916+
- value_counts
2917+
- null_value_counts
2918+
- nan_value_counts
2919+
- column_aggregates
2920+
- split_offsets
2921+
2922+
Args:
2923+
orc_metadata (pyarrow.orc.ORCFile): A pyarrow ORC file object.
2924+
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
2925+
set the mode for column metrics collection
2926+
orc_column_mapping (Dict[str, int]): The mapping of the ORC column name to the field ID
2927+
arrow_table (pa.Table, optional): The original arrow table that was written, used for row count
2928+
"""
2929+
column_sizes: Dict[int, int] = {}
2930+
value_counts: Dict[int, int] = {}
2931+
split_offsets: List[int] = []
2932+
2933+
null_value_counts: Dict[int, int] = {}
2934+
nan_value_counts: Dict[int, int] = {}
2935+
2936+
col_aggs = {}
2937+
2938+
invalidate_col: Set[int] = set()
2939+
2940+
# Get row count from the arrow table if available, otherwise use a default
2941+
if arrow_table is not None:
2942+
record_count = arrow_table.num_rows
2943+
else:
2944+
# Fallback: ORC doesn't provide num_rows like Parquet, so we'll use a default
2945+
record_count = 0
2946+
2947+
# ORC files have a single stripe structure, unlike Parquet's row groups
2948+
# We'll process the file-level statistics
2949+
for col_name, field_id in orc_column_mapping.items():
2950+
stats_col = stats_columns[field_id]
2951+
2952+
# Initialize column sizes (ORC doesn't provide per-column size like Parquet)
2953+
column_sizes[field_id] = 0 # ORC doesn't provide detailed column size info
2954+
2955+
if stats_col.mode == MetricsMode(MetricModeTypes.NONE):
2956+
continue
2957+
2958+
# Get column statistics from ORC metadata
2959+
try:
2960+
# ORC provides file-level statistics
2961+
# Note: ORC statistics are more limited than Parquet
2962+
# We'll use the available statistics and set defaults for missing ones
2963+
2964+
# For ORC, we'll use the total number of values as value count
2965+
# This is a simplification since ORC doesn't provide per-column value counts like Parquet
2966+
value_counts[field_id] = record_count
2967+
2968+
# ORC doesn't provide null counts in the same way as Parquet
2969+
# We'll set this to 0 for now, as ORC doesn't expose null counts easily
2970+
null_value_counts[field_id] = 0
2971+
2972+
if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS):
2973+
continue
2974+
2975+
if field_id not in col_aggs:
2976+
col_aggs[field_id] = StatsAggregator(
2977+
stats_col.iceberg_type, _primitive_to_physical(stats_col.iceberg_type), stats_col.mode.length
2978+
)
2979+
2980+
# ORC doesn't provide min/max statistics in the same way as Parquet
2981+
# We'll skip the min/max aggregation for ORC files
2982+
# This is a limitation of ORC's metadata structure compared to Parquet
2983+
2984+
except Exception as e:
2985+
invalidate_col.add(field_id)
2986+
logger.warning(f"Failed to extract ORC statistics for column {col_name}: {e}")
2987+
2988+
# ORC doesn't have split offsets like Parquet
2989+
# We'll use an empty list or a single offset at 0
2990+
split_offsets = [0] if record_count > 0 else []
2991+
2992+
# Clean up invalid columns
2993+
for field_id in invalidate_col:
2994+
col_aggs.pop(field_id, None)
2995+
null_value_counts.pop(field_id, None)
2996+
2997+
return DataFileStatistics(
2998+
record_count=record_count,
2999+
column_sizes=column_sizes,
3000+
value_counts=value_counts,
3001+
null_value_counts=null_value_counts,
3002+
nan_value_counts=nan_value_counts,
3003+
column_aggregates=col_aggs,
3004+
split_offsets=split_offsets,
3005+
)
3006+
3007+
3008+
class ID2OrcColumn:
3009+
field_id: int
3010+
orc_column: str
3011+
3012+
def __init__(self, field_id: int, orc_column: str):
3013+
self.field_id = field_id
3014+
self.orc_column = orc_column
3015+
3016+
3017+
class ID2OrcColumnVisitor(PreOrderSchemaVisitor[List[ID2OrcColumn]]):
3018+
_field_id: int = 0
3019+
_path: List[str]
3020+
3021+
def __init__(self) -> None:
3022+
self._path = []
3023+
3024+
def schema(self, schema: Schema, struct_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]:
3025+
return struct_result()
3026+
3027+
def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2OrcColumn]]]) -> List[ID2OrcColumn]:
3028+
return list(itertools.chain(*[result() for result in field_results]))
3029+
3030+
def field(self, field: NestedField, field_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]:
3031+
self._field_id = field.field_id
3032+
self._path.append(field.name)
3033+
result = field_result()
3034+
self._path.pop()
3035+
return result
3036+
3037+
def list(self, list_type: ListType, element_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]:
3038+
self._field_id = list_type.element_id
3039+
self._path.append("list.element")
3040+
result = element_result()
3041+
self._path.pop()
3042+
return result
3043+
3044+
def map(
3045+
self,
3046+
map_type: MapType,
3047+
key_result: Callable[[], List[ID2OrcColumn]],
3048+
value_result: Callable[[], List[ID2OrcColumn]],
3049+
) -> List[ID2OrcColumn]:
3050+
self._field_id = map_type.key_id
3051+
self._path.append("key_value.key")
3052+
k = key_result()
3053+
self._path.pop()
3054+
self._field_id = map_type.value_id
3055+
self._path.append("key_value.value")
3056+
v = value_result()
3057+
self._path.pop()
3058+
return k + v
3059+
3060+
def primitive(self, primitive: PrimitiveType) -> List[ID2OrcColumn]:
3061+
return [ID2OrcColumn(field_id=self._field_id, orc_column=".".join(self._path))]
3062+
3063+
3064+
def orc_column_to_id_mapping(
3065+
schema: Schema,
3066+
) -> Dict[str, int]:
3067+
"""
3068+
Create a mapping from ORC column names to Iceberg field IDs.
3069+
3070+
Args:
3071+
schema: The Iceberg schema
3072+
3073+
Returns:
3074+
A dictionary mapping ORC column names to field IDs
3075+
"""
3076+
result: Dict[str, int] = {}
3077+
for pair in pre_order_visit(schema, ID2OrcColumnVisitor()):
3078+
result[pair.orc_column] = pair.field_id
3079+
return result

pyiceberg/manifest.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,15 @@ def to_manifest_file(self) -> ManifestFile:
10541054
def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
10551055
if self.closed:
10561056
raise RuntimeError("Cannot add entry to closed manifest writer")
1057-
if entry.status == ManifestEntryStatus.ADDED:
1057+
# Ensure record_count is not None
1058+
if entry.data_file.record_count is None:
1059+
entry.data_file.record_count = 0 # or a real row count if available
1060+
if entry.data_file.file_format == FileFormat.ORC:
1061+
# ORC file stats not yet supported
1062+
self._added_files += 1 if entry.status == ManifestEntryStatus.ADDED else 0
1063+
self._existing_files += 1 if entry.status == ManifestEntryStatus.EXISTING else 0
1064+
self._deleted_files += 1 if entry.status == ManifestEntryStatus.DELETED else 0
1065+
elif entry.status == ManifestEntryStatus.ADDED:
10581066
self._added_files += 1
10591067
self._added_rows += entry.data_file.record_count
10601068
elif entry.status == ManifestEntryStatus.EXISTING:

0 commit comments

Comments
 (0)