Skip to content

Commit 6989b92

Browse files
authored
add_files support partitioned tables (#531)
* add_files support partitioned tables * docs * more * adopt review feedback * split-offsets required
1 parent 69b9e39 commit 6989b92

File tree

6 files changed

+388
-176
lines changed

6 files changed

+388
-176
lines changed

mkdocs/docs/api.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,9 +375,8 @@ tbl.add_files(file_paths=file_paths)
375375
!!! note "Name Mapping"
376376
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.
377377
378-
<!-- prettier-ignore-end -->
379-
380-
<!-- prettier-ignore-start -->
378+
!!! note "Partitions"
379+
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.
381380
382381
!!! warning "Maintenance Operations"
383382
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.

pyiceberg/io/pyarrow.py

Lines changed: 98 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
DataFileContent,
112112
FileFormat,
113113
)
114+
from pyiceberg.partitioning import PartitionField, PartitionSpec, partition_record_value
114115
from pyiceberg.schema import (
115116
PartnerAccessor,
116117
PreOrderSchemaVisitor,
@@ -124,7 +125,7 @@
124125
visit,
125126
visit_with_partner,
126127
)
127-
from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask
128+
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
128129
from pyiceberg.table.metadata import TableMetadata
129130
from pyiceberg.table.name_mapping import NameMapping
130131
from pyiceberg.transforms import TruncateTransform
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
15941595
return result
15951596

15961597

1597-
def fill_parquet_file_metadata(
1598-
data_file: DataFile,
1598+
@dataclass(frozen=True)
1599+
class DataFileStatistics:
1600+
record_count: int
1601+
column_sizes: Dict[int, int]
1602+
value_counts: Dict[int, int]
1603+
null_value_counts: Dict[int, int]
1604+
nan_value_counts: Dict[int, int]
1605+
column_aggregates: Dict[int, StatsAggregator]
1606+
split_offsets: List[int]
1607+
1608+
def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any:
1609+
if partition_field.source_id not in self.column_aggregates:
1610+
return None
1611+
1612+
if not partition_field.transform.preserves_order:
1613+
raise ValueError(
1614+
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
1615+
)
1616+
1617+
lower_value = partition_record_value(
1618+
partition_field=partition_field,
1619+
value=self.column_aggregates[partition_field.source_id].current_min,
1620+
schema=schema,
1621+
)
1622+
upper_value = partition_record_value(
1623+
partition_field=partition_field,
1624+
value=self.column_aggregates[partition_field.source_id].current_max,
1625+
schema=schema,
1626+
)
1627+
if lower_value != upper_value:
1628+
raise ValueError(
1629+
f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
1630+
)
1631+
return lower_value
1632+
1633+
def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
1634+
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})
1635+
1636+
def to_serialized_dict(self) -> Dict[str, Any]:
1637+
lower_bounds = {}
1638+
upper_bounds = {}
1639+
1640+
for k, agg in self.column_aggregates.items():
1641+
_min = agg.min_as_bytes()
1642+
if _min is not None:
1643+
lower_bounds[k] = _min
1644+
_max = agg.max_as_bytes()
1645+
if _max is not None:
1646+
upper_bounds[k] = _max
1647+
return {
1648+
"record_count": self.record_count,
1649+
"column_sizes": self.column_sizes,
1650+
"value_counts": self.value_counts,
1651+
"null_value_counts": self.null_value_counts,
1652+
"nan_value_counts": self.nan_value_counts,
1653+
"lower_bounds": lower_bounds,
1654+
"upper_bounds": upper_bounds,
1655+
"split_offsets": self.split_offsets,
1656+
}
1657+
1658+
1659+
def data_file_statistics_from_parquet_metadata(
15991660
parquet_metadata: pq.FileMetaData,
16001661
stats_columns: Dict[int, StatisticsCollector],
16011662
parquet_column_mapping: Dict[str, int],
1602-
) -> None:
1663+
) -> DataFileStatistics:
16031664
"""
1604-
Compute and fill the following fields of the DataFile object.
1665+
Compute and return DataFileStatistics that includes the following.
16051666
1606-
- file_format
1667+
- record_count
16071668
- column_sizes
16081669
- value_counts
16091670
- null_value_counts
16101671
- nan_value_counts
1611-
- lower_bounds
1612-
- upper_bounds
1672+
- column_aggregates
16131673
- split_offsets
16141674
16151675
Args:
1616-
data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
16171676
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
16181677
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
16191678
set the mode for column metrics collection
1679+
parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
16201680
"""
16211681
if parquet_metadata.num_columns != len(stats_columns):
16221682
raise ValueError(
@@ -1695,30 +1755,19 @@ def fill_parquet_file_metadata(
16951755

16961756
split_offsets.sort()
16971757

1698-
lower_bounds = {}
1699-
upper_bounds = {}
1700-
1701-
for k, agg in col_aggs.items():
1702-
_min = agg.min_as_bytes()
1703-
if _min is not None:
1704-
lower_bounds[k] = _min
1705-
_max = agg.max_as_bytes()
1706-
if _max is not None:
1707-
upper_bounds[k] = _max
1708-
17091758
for field_id in invalidate_col:
1710-
del lower_bounds[field_id]
1711-
del upper_bounds[field_id]
1759+
del col_aggs[field_id]
17121760
del null_value_counts[field_id]
17131761

1714-
data_file.record_count = parquet_metadata.num_rows
1715-
data_file.column_sizes = column_sizes
1716-
data_file.value_counts = value_counts
1717-
data_file.null_value_counts = null_value_counts
1718-
data_file.nan_value_counts = nan_value_counts
1719-
data_file.lower_bounds = lower_bounds
1720-
data_file.upper_bounds = upper_bounds
1721-
data_file.split_offsets = split_offsets
1762+
return DataFileStatistics(
1763+
record_count=parquet_metadata.num_rows,
1764+
column_sizes=column_sizes,
1765+
value_counts=value_counts,
1766+
null_value_counts=null_value_counts,
1767+
nan_value_counts=nan_value_counts,
1768+
column_aggregates=col_aggs,
1769+
split_offsets=split_offsets,
1770+
)
17221771

17231772

17241773
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
@@ -1747,6 +1796,11 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
17471796
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
17481797
writer.write_table(task.df, row_group_size=row_group_size)
17491798

1799+
statistics = data_file_statistics_from_parquet_metadata(
1800+
parquet_metadata=writer.writer.metadata,
1801+
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1802+
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1803+
)
17501804
data_file = DataFile(
17511805
content=DataFileContent.DATA,
17521806
file_path=file_path,
@@ -1761,47 +1815,41 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
17611815
spec_id=table_metadata.default_spec_id,
17621816
equality_ids=None,
17631817
key_metadata=None,
1818+
**statistics.to_serialized_dict(),
17641819
)
17651820

1766-
fill_parquet_file_metadata(
1767-
data_file=data_file,
1768-
parquet_metadata=writer.writer.metadata,
1769-
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1770-
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1771-
)
17721821
return iter([data_file])
17731822

17741823

1775-
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
1776-
for task in tasks:
1777-
input_file = io.new_input(task.file_path)
1824+
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
1825+
for file_path in file_paths:
1826+
input_file = io.new_input(file_path)
17781827
with input_file.open() as input_stream:
17791828
parquet_metadata = pq.read_metadata(input_stream)
17801829

17811830
if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
17821831
raise NotImplementedError(
1783-
f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
1832+
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
17841833
)
1785-
17861834
schema = table_metadata.schema()
1835+
statistics = data_file_statistics_from_parquet_metadata(
1836+
parquet_metadata=parquet_metadata,
1837+
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1838+
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1839+
)
17871840
data_file = DataFile(
17881841
content=DataFileContent.DATA,
1789-
file_path=task.file_path,
1842+
file_path=file_path,
17901843
file_format=FileFormat.PARQUET,
1791-
partition=task.partition_field_value,
1792-
record_count=parquet_metadata.num_rows,
1844+
partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
17931845
file_size_in_bytes=len(input_file),
17941846
sort_order_id=None,
17951847
spec_id=table_metadata.default_spec_id,
17961848
equality_ids=None,
17971849
key_metadata=None,
1850+
**statistics.to_serialized_dict(),
17981851
)
1799-
fill_parquet_file_metadata(
1800-
data_file=data_file,
1801-
parquet_metadata=parquet_metadata,
1802-
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1803-
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1804-
)
1852+
18051853
yield data_file
18061854

18071855

pyiceberg/partitioning.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,16 +388,33 @@ def partition(self) -> Record: # partition key transformed with iceberg interna
388388
if len(partition_fields) != 1:
389389
raise ValueError("partition_fields must contain exactly one field.")
390390
partition_field = partition_fields[0]
391-
iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
392-
iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value)
393-
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
394-
iceberg_typed_key_values[partition_field.name] = transformed_value
391+
iceberg_typed_key_values[partition_field.name] = partition_record_value(
392+
partition_field=partition_field,
393+
value=raw_partition_field_value.value,
394+
schema=self.schema,
395+
)
395396
return Record(**iceberg_typed_key_values)
396397

397398
def to_path(self) -> str:
398399
return self.partition_spec.partition_to_path(self.partition, self.schema)
399400

400401

402+
def partition_record_value(partition_field: PartitionField, value: Any, schema: Schema) -> Any:
403+
"""
404+
Return the Partition Record representation of the value.
405+
406+
The value is first converted to internal partition representation.
407+
For example, UUID is converted to bytes[16], DateType to days since epoch, etc.
408+
409+
Then the corresponding PartitionField's transform is applied to return
410+
the final partition record value.
411+
"""
412+
iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type
413+
iceberg_typed_value = _to_partition_representation(iceberg_type, value)
414+
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
415+
return transformed_value
416+
417+
401418
@singledispatch
402419
def _to_partition_representation(type: IcebergType, value: Any) -> Any:
403420
return TypeError(f"Unsupported partition field type: {type}")

pyiceberg/table/__init__.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
Dict,
3434
Generic,
3535
Iterable,
36-
Iterator,
3736
List,
3837
Literal,
3938
Optional,
@@ -1179,9 +1178,6 @@ def add_files(self, file_paths: List[str]) -> None:
11791178
Raises:
11801179
FileNotFoundError: If the file does not exist.
11811180
"""
1182-
if len(self.spec().fields) > 0:
1183-
raise ValueError("Cannot add files to partitioned tables")
1184-
11851181
with self.transaction() as tx:
11861182
if self.name_mapping() is None:
11871183
tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()})
@@ -2524,17 +2520,6 @@ def _dataframe_to_data_files(
25242520
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))
25252521

25262522

2527-
def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]:
2528-
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
2529-
raise ValueError("Cannot add files to partitioned tables")
2530-
2531-
for file_path in file_paths:
2532-
yield AddFileTask(
2533-
file_path=file_path,
2534-
partition_field_value=Record(),
2535-
)
2536-
2537-
25382523
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
25392524
"""Convert a list files into DataFiles.
25402525
@@ -2543,8 +2528,7 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
25432528
"""
25442529
from pyiceberg.io.pyarrow import parquet_files_to_data_files
25452530

2546-
tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
2547-
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks)
2531+
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))
25482532

25492533

25502534
class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):

0 commit comments

Comments
 (0)