Skip to content

Commit 179fa27

Browse files
committed
Comments
1 parent 3c98eef commit 179fa27

File tree

2 files changed

+154
-54
lines changed

2 files changed

+154
-54
lines changed

pyiceberg/table/__init__.py

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,13 @@
6464
ROWS_MUST_MATCH,
6565
_InclusiveMetricsEvaluator,
6666
_StrictMetricsEvaluator,
67+
bind,
6768
expression_evaluator,
6869
inclusive_projection,
6970
manifest_evaluator,
7071
)
7172
from pyiceberg.io import FileIO, load_file_io
72-
from pyiceberg.io.pyarrow import _dataframe_to_data_files, project_table
73+
from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_pyarrow, project_table
7374
from pyiceberg.manifest import (
7475
POSITIONAL_DELETE_SCHEMA,
7576
DataFile,
@@ -310,8 +311,6 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ
310311
for new_requirement in requirements:
311312
if type(new_requirement) not in existing_requirements:
312313
self._requirements = self._requirements + requirements
313-
else:
314-
warnings.warn(f"Dropped duplicate requirement: {new_requirement}")
315314

316315
self.table_metadata = update_table_metadata(self.table_metadata, updates)
317316

@@ -430,7 +429,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
430429
update_snapshot.append_data_file(data_file)
431430

432431
def overwrite(
433-
self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
432+
self,
433+
df: pa.Table,
434+
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
435+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
434436
) -> None:
435437
"""
436438
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
@@ -458,8 +460,7 @@ def overwrite(
458460
if table_arrow_schema != df.schema:
459461
df = df.cast(table_arrow_schema)
460462

461-
with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
462-
delete_snapshot.delete_by_predicate(overwrite_filter)
463+
self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
463464

464465
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot:
465466
# skip writing data files if the dataframe is empty
@@ -470,53 +471,73 @@ def overwrite(
470471
for data_file in data_files:
471472
update_snapshot.append_data_file(data_file)
472473

473-
def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
474+
def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
474475
if (
475476
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_COPY_ON_WRITE)
476477
== TableProperties.DELETE_MODE_MERGE_ON_READ
477478
):
478479
raise NotImplementedError("Merge on read is not yet supported")
479480

481+
if isinstance(delete_filter, str):
482+
delete_filter = _parse_row_filter(delete_filter)
483+
480484
with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
481485
delete_snapshot.delete_by_predicate(delete_filter)
482486

483487
# Check if there are any files that require an actual rewrite of a data file
484488
if delete_snapshot.rewrites_needed is True:
485-
# When we want to filter out certain rows, we want to invert the expression
486-
# delete id = 22 means that we want to look for that value, and then remove
487-
# if from the Parquet file
488-
delete_row_filter = Not(delete_filter)
489-
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot:
490-
# Potential optimization is where we check if the files actually contain relevant data.
491-
files = self._scan(row_filter=delete_filter).plan_files()
492-
493-
counter = itertools.count(0)
494-
495-
# This will load the Parquet file into memory, including:
496-
# - Filter out the rows based on the delete filter
497-
# - Projecting it to the current schema
498-
# - Applying the positional deletes if they are there
499-
# When writing
500-
# - Apply the latest partition-spec
501-
# - And sort order when added
502-
for original_file in files:
503-
df = project_table(
504-
tasks=[original_file],
505-
table_metadata=self._table.metadata,
506-
io=self._table.io,
507-
row_filter=delete_row_filter,
508-
projected_schema=self.table_metadata.schema(),
509-
)
510-
for data_file in _dataframe_to_data_files(
511-
io=self._table.io,
512-
df=df,
513-
table_metadata=self._table.metadata,
514-
write_uuid=overwrite_snapshot.commit_uuid,
515-
counter=counter,
516-
):
517-
overwrite_snapshot.append_data_file(data_file)
489+
bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True)
490+
preserve_row_filter = expression_to_pyarrow(Not(bound_delete_filter))
491+
commit_uuid = uuid.uuid4()
492+
493+
files = self._scan(row_filter=delete_filter).plan_files()
494+
495+
counter = itertools.count(0)
496+
497+
replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
498+
# This will load the Parquet file into memory, including:
499+
# - Filter out the rows based on the delete filter
500+
# - Projecting it to the current schema
501+
# - Applying the positional deletes if they are there
502+
# When writing
503+
# - Apply the latest partition-spec
504+
# - And sort order when added
505+
for original_file in files:
506+
df = project_table(
507+
tasks=[original_file],
508+
table_metadata=self._table.metadata,
509+
io=self._table.io,
510+
row_filter=AlwaysTrue(),
511+
projected_schema=self.table_metadata.schema(),
512+
)
513+
filtered_df = df.filter(preserve_row_filter)
514+
515+
# Only rewrite if there are records being deleted
516+
if len(df) != len(filtered_df):
517+
replaced_files.append((
518+
original_file.file,
519+
list(
520+
_dataframe_to_data_files(
521+
io=self._table.io,
522+
df=filtered_df,
523+
table_metadata=self._table.metadata,
524+
write_uuid=commit_uuid,
525+
counter=counter,
526+
)
527+
),
528+
))
518529

519-
overwrite_snapshot.delete_data_file(original_file.file)
530+
if len(replaced_files) > 0:
531+
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
532+
commit_uuid=commit_uuid
533+
) as overwrite_snapshot:
534+
for original_data_file, replaced_data_files in replaced_files:
535+
overwrite_snapshot.delete_data_file(original_data_file)
536+
for replaced_data_file in replaced_data_files:
537+
overwrite_snapshot.append_data_file(replaced_data_file)
538+
539+
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
540+
warnings.warn("Delete operation did not match any records")
520541

521542
def add_files(self, file_paths: List[str]) -> None:
522543
"""
@@ -1405,7 +1426,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
14051426
tx.append(df=df, snapshot_properties=snapshot_properties)
14061427

14071428
def overwrite(
1408-
self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
1429+
self,
1430+
df: pa.Table,
1431+
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
1432+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
14091433
) -> None:
14101434
"""
14111435
Shorthand for overwriting the table with a PyArrow table.
@@ -1419,7 +1443,9 @@ def overwrite(
14191443
with self.transaction() as tx:
14201444
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)
14211445

1422-
def delete(self, delete_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
1446+
def delete(
1447+
self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
1448+
) -> None:
14231449
"""
14241450
Shorthand for deleting rows from the table.
14251451
@@ -3011,15 +3037,6 @@ def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bo
30113037
spec = self._transaction.table_metadata.specs()[spec_id]
30123038
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True)
30133039

3014-
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
3015-
schema = self._transaction.table_metadata.schema()
3016-
spec = self._transaction.table_metadata.specs()[spec_id]
3017-
partition_type = spec.partition_type(schema)
3018-
partition_schema = Schema(*partition_type.fields)
3019-
partition_expr = self.partition_filters[spec_id]
3020-
3021-
return lambda data_file: expression_evaluator(partition_schema, partition_expr, case_sensitive=True)(data_file.partition)
3022-
30233040
def delete_by_predicate(self, predicate: BooleanExpression) -> None:
30243041
self._predicate = Or(self._predicate, predicate)
30253042

@@ -3240,8 +3257,9 @@ def fast_append(self) -> FastAppendFiles:
32403257
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
32413258
)
32423259

3243-
def overwrite(self) -> OverwriteFiles:
3260+
def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
32443261
return OverwriteFiles(
3262+
commit_uuid=commit_uuid,
32453263
operation=Operation.OVERWRITE
32463264
if self._transaction.table_metadata.current_snapshot() is not None
32473265
else Operation.APPEND,

tests/integration/test_deletes.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
# pylint:disable=redefined-outer-name
1818
from typing import List
1919

20+
import pyarrow as pa
2021
import pytest
2122
from pyspark.sql import SparkSession
2223

2324
from pyiceberg.catalog.rest import RestCatalog
25+
from pyiceberg.exceptions import NoSuchTableError
2426
from pyiceberg.expressions import EqualTo
27+
from pyiceberg.schema import Schema
2528
from pyiceberg.table.snapshots import Operation, Summary
29+
from pyiceberg.types import IntegerType, NestedField
2630

2731

2832
def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
@@ -127,7 +131,6 @@ def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCa
127131
tbl = session_catalog.load_table(identifier)
128132
tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data
129133

130-
# Open for discussion, do we want to create a new snapshot?
131134
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append']
132135
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [10, 10], 'number': [20, 30]}
133136

@@ -255,3 +258,82 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio
255258
)
256259

257260
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [20, 20, 10], 'number': [200, 202, 100]}
261+
262+
263+
@pytest.mark.integration
264+
def test_delete_no_match(session_catalog: RestCatalog) -> None:
265+
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
266+
arrow_tbl = pa.Table.from_pylist(
267+
[
268+
{
269+
'ints': 1,
270+
},
271+
{'ints': 3},
272+
],
273+
schema=arrow_schema,
274+
)
275+
276+
iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
277+
278+
tbl_identifier = "default.test_delete_no_match"
279+
280+
try:
281+
session_catalog.drop_table(tbl_identifier)
282+
except NoSuchTableError:
283+
pass
284+
285+
tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
286+
tbl.append(arrow_tbl)
287+
288+
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
289+
290+
tbl.delete('ints == 2') # Only 1 and 3 in the file, but is between the lower and upper bound
291+
292+
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
293+
294+
295+
@pytest.mark.integration
296+
def test_delete_overwrite(session_catalog: RestCatalog) -> None:
297+
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
298+
arrow_tbl = pa.Table.from_pylist(
299+
[
300+
{
301+
'ints': 1,
302+
},
303+
{'ints': 2},
304+
],
305+
schema=arrow_schema,
306+
)
307+
308+
iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
309+
310+
tbl_identifier = "default.test_delete_overwrite"
311+
312+
try:
313+
session_catalog.drop_table(tbl_identifier)
314+
except NoSuchTableError:
315+
pass
316+
317+
tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
318+
tbl.append(arrow_tbl)
319+
320+
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]
321+
322+
arrow_tbl_overwrite = pa.Table.from_pylist(
323+
[
324+
{
325+
'ints': 3,
326+
},
327+
{'ints': 4},
328+
],
329+
schema=arrow_schema,
330+
)
331+
tbl.overwrite(arrow_tbl_overwrite, 'ints == 2') # Should rewrite one file
332+
333+
assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [
334+
Operation.APPEND,
335+
Operation.OVERWRITE,
336+
Operation.OVERWRITE,
337+
]
338+
339+
assert tbl.scan().to_arrow()['ints'] == [[3, 4], [1]]

0 commit comments

Comments
 (0)