Skip to content

Commit 3f574d3

Browse files
FokkoHonahXsungwy
authored
Support partial deletes (#569)
* Add option to delete datafiles This is done through the Iceberg metadata, resulting in efficient deletes if the data is partitioned correctly * Pull in main * WIP * Change DataScan to accept Metadata and io For the partial deletes I want to do a scan on in memory metadata. Changing this API allows this. * fix name-mapping issue * WIP * WIP * Moar tests * Oops * Cleanup * WIP * WIP * Fix summary generation * Last few bits * Fix the requirement * Make ruff happy * Comments, thanks Kevin! * Comments * Append rather than truncate * Fix merge conflicts * Make the tests pass * Add another test * Conflicts * Add docs (#33) * docs * docs * Add a partitioned overwrite test * Fix comment * Skip empty manifests --------- Co-authored-by: HonahX <[email protected]> Co-authored-by: Sung Yun <[email protected]>
1 parent cdc3e54 commit 3f574d3

File tree

14 files changed

+1025
-139
lines changed

14 files changed

+1025
-139
lines changed

mkdocs/docs/api.md

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,12 +331,25 @@ df = pa.Table.from_pylist(
331331
table.append(df)
332332
```
333333

334-
<!-- prettier-ignore-start -->
334+
You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`.
335+
336+
```python
337+
tbl.delete(delete_filter="city == 'Paris'")
338+
```
335339

336-
!!! example "Under development"
337-
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.
340+
In the above example, any records where the city field value equals to `Paris` will be deleted.
341+
Running `tbl.scan().to_arrow()` will now yield:
338342

339-
<!-- prettier-ignore-end -->
343+
```
344+
pyarrow.Table
345+
city: string
346+
lat: double
347+
long: double
348+
----
349+
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
350+
lat: [[52.371807,37.773972,53.11254],[53.21917]]
351+
long: [[4.896029,-122.431297,6.0989],[6.56667]]
352+
```
340353

341354
## Inspecting tables
342355

pyiceberg/io/pyarrow.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import logging
3232
import os
3333
import re
34+
import uuid
3435
from abc import ABC, abstractmethod
3536
from concurrent.futures import Future
3637
from copy import copy
@@ -126,7 +127,6 @@
126127
visit,
127128
visit_with_partner,
128129
)
129-
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
130130
from pyiceberg.table.metadata import TableMetadata
131131
from pyiceberg.table.name_mapping import NameMapping
132132
from pyiceberg.transforms import TruncateTransform
@@ -159,7 +159,7 @@
159159
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
160160

161161
if TYPE_CHECKING:
162-
from pyiceberg.table import FileScanTask
162+
from pyiceberg.table import FileScanTask, WriteTask
163163

164164
logger = logging.getLogger(__name__)
165165

@@ -1563,6 +1563,8 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
15631563
_default_mode: str
15641564

15651565
def __init__(self, schema: Schema, properties: Dict[str, str]):
1566+
from pyiceberg.table import TableProperties
1567+
15661568
self._schema = schema
15671569
self._properties = properties
15681570
self._default_mode = self._properties.get(
@@ -1598,6 +1600,8 @@ def map(
15981600
return k + v
15991601

16001602
def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
1603+
from pyiceberg.table import TableProperties
1604+
16011605
column_name = self._schema.find_column_name(self._field_id)
16021606
if column_name is None:
16031607
return []
@@ -1895,6 +1899,8 @@ def data_file_statistics_from_parquet_metadata(
18951899

18961900

18971901
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
1902+
from pyiceberg.table import PropertyUtil, TableProperties
1903+
18981904
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
18991905
row_group_size = PropertyUtil.property_as_int(
19001906
properties=table_metadata.properties,
@@ -2005,6 +2011,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_
20052011

20062012

20072013
def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
2014+
from pyiceberg.table import PropertyUtil, TableProperties
2015+
20082016
for key_pattern in [
20092017
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
20102018
TableProperties.PARQUET_PAGE_ROW_LIMIT,
@@ -2042,3 +2050,55 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
20422050
default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT,
20432051
),
20442052
}
2053+
2054+
2055+
def _dataframe_to_data_files(
2056+
table_metadata: TableMetadata,
2057+
df: pa.Table,
2058+
io: FileIO,
2059+
write_uuid: Optional[uuid.UUID] = None,
2060+
counter: Optional[itertools.count[int]] = None,
2061+
) -> Iterable[DataFile]:
2062+
"""Convert a PyArrow table into a DataFile.
2063+
2064+
Returns:
2065+
An iterable that supplies datafiles that represent the table.
2066+
"""
2067+
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
2068+
2069+
counter = counter or itertools.count(0)
2070+
write_uuid = write_uuid or uuid.uuid4()
2071+
target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value.
2072+
properties=table_metadata.properties,
2073+
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
2074+
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
2075+
)
2076+
2077+
if table_metadata.spec().is_unpartitioned():
2078+
yield from write_file(
2079+
io=io,
2080+
table_metadata=table_metadata,
2081+
tasks=iter([
2082+
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema())
2083+
for batches in bin_pack_arrow_table(df, target_file_size)
2084+
]),
2085+
)
2086+
else:
2087+
from pyiceberg.table import _determine_partitions
2088+
2089+
partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
2090+
yield from write_file(
2091+
io=io,
2092+
table_metadata=table_metadata,
2093+
tasks=iter([
2094+
WriteTask(
2095+
write_uuid=write_uuid,
2096+
task_id=next(counter),
2097+
record_batches=batches,
2098+
partition_key=partition.partition_key,
2099+
schema=table_metadata.schema(),
2100+
)
2101+
for partition in partitions
2102+
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
2103+
]),
2104+
)

pyiceberg/manifest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ class DataFile(Record):
341341
split_offsets: Optional[List[int]]
342342
equality_ids: Optional[List[int]]
343343
sort_order_id: Optional[int]
344-
spec_id: Optional[int]
344+
spec_id: int
345345

346346
def __setattr__(self, name: str, value: Any) -> None:
347347
"""Assign a key/value to a DataFile."""

0 commit comments

Comments
 (0)