Skip to content

Commit 172f9c0

Browse files
committed
Cleanup
1 parent e474fda commit 172f9c0

File tree

2 files changed

+163
-44
lines changed

2 files changed

+163
-44
lines changed

pyiceberg/table/__init__.py

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2967,6 +2967,13 @@ def __init__(
29672967
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
29682968
self._predicate = AlwaysFalse()
29692969

2970+
def _commit(self) -> UpdatesAndRequirements:
2971+
# Only produce a commit when there is something to delete
2972+
if self.files_affected:
2973+
return super()._commit()
2974+
else:
2975+
return (), ()
2976+
29702977
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
29712978
schema = self._transaction.table_metadata.schema()
29722979
spec = self._transaction.table_metadata.specs()[spec_id]
@@ -2996,6 +3003,13 @@ def delete_by_predicate(self, predicate: BooleanExpression) -> None:
29963003

29973004
@cached_property
29983005
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]:
3006+
"""Computes all the delete operation and cache it when nothing changes.
3007+
3008+
Returns:
3009+
- List of existing manifests that are not affected by the delete operation.
3010+
- The manifest-entries that are deleted based on the metadata.
3011+
- Flag indicating that rewrites of data-files are needed.
3012+
"""
29993013
schema = self._transaction.table_metadata.schema()
30003014

30013015
def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
@@ -3016,44 +3030,47 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
30163030
partial_rewrites_needed = False
30173031
if snapshot := self._transaction.table_metadata.current_snapshot():
30183032
for manifest_file in snapshot.manifests(io=self._io):
3019-
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
3020-
# If the manifest isn't relevant, we can just keep it in the manifest-list
3021-
existing_manifests.append(manifest_file)
3022-
else:
3023-
# It is relevant, let's check out the content
3024-
deleted_entries = []
3025-
existing_entries = []
3026-
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
3027-
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
3028-
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
3029-
elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
3030-
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
3031-
else:
3032-
# Based on the metadata, it is unsure to say if the file can be deleted
3033-
partial_rewrites_needed = True
3034-
3035-
if len(deleted_entries) > 0:
3036-
total_deleted_entries += deleted_entries
3037-
3038-
# Rewrite the manifest
3039-
if len(existing_entries) > 0:
3040-
output_file_location = _new_manifest_path(
3041-
location=self._transaction.table_metadata.location,
3042-
num=next(self._manifest_counter),
3043-
commit_uuid=self.commit_uuid,
3044-
)
3045-
with write_manifest(
3046-
format_version=self._transaction.table_metadata.format_version,
3047-
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
3048-
schema=self._transaction.table_metadata.schema(),
3049-
output_file=self._io.new_output(output_file_location),
3050-
snapshot_id=self._snapshot_id,
3051-
) as writer:
3052-
for existing_entry in existing_entries:
3053-
writer.add_entry(existing_entry)
3054-
existing_manifests.append(writer.to_manifest_file())
3055-
else:
3033+
if manifest_file.content == ManifestContent.DATA:
3034+
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
3035+
# If the manifest isn't relevant, we can just keep it in the manifest-list
30563036
existing_manifests.append(manifest_file)
3037+
else:
3038+
# It is relevant, let's check out the content
3039+
deleted_entries = []
3040+
existing_entries = []
3041+
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
3042+
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
3043+
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
3044+
elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
3045+
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
3046+
else:
3047+
# Based on the metadata, it is unsure to say if the file can be deleted
3048+
partial_rewrites_needed = True
3049+
3050+
if len(deleted_entries) > 0:
3051+
total_deleted_entries += deleted_entries
3052+
3053+
# Rewrite the manifest
3054+
if len(existing_entries) > 0:
3055+
output_file_location = _new_manifest_path(
3056+
location=self._transaction.table_metadata.location,
3057+
num=next(self._manifest_counter),
3058+
commit_uuid=self.commit_uuid,
3059+
)
3060+
with write_manifest(
3061+
format_version=self._transaction.table_metadata.format_version,
3062+
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
3063+
schema=self._transaction.table_metadata.schema(),
3064+
output_file=self._io.new_output(output_file_location),
3065+
snapshot_id=self._snapshot_id,
3066+
) as writer:
3067+
for existing_entry in existing_entries:
3068+
writer.add_entry(existing_entry)
3069+
existing_manifests.append(writer.to_manifest_file())
3070+
else:
3071+
existing_manifests.append(manifest_file)
3072+
else:
3073+
existing_manifests.append(manifest_file)
30573074

30583075
return existing_manifests, total_deleted_entries, partial_rewrites_needed
30593076

@@ -3068,6 +3085,11 @@ def rewrites_needed(self) -> bool:
30683085
"""Indicate if data files need to be rewritten."""
30693086
return self._compute_deletes[2]
30703087

3088+
@property
3089+
def files_affected(self) -> bool:
3090+
"""Indicate if any manifest-entries can be dropped."""
3091+
return len(self._deleted_entries()) > 0
3092+
30713093

30723094
class FastAppendFiles(_MergingSnapshotProducer):
30733095
def _existing_manifests(self) -> List[ManifestFile]:

tests/integration/test_deletes.py

Lines changed: 104 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222

2323
from pyiceberg.catalog.rest import RestCatalog
2424
from pyiceberg.expressions import EqualTo
25+
from pyiceberg.table.snapshots import Operation, Summary
2526

2627

2728
def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
2829
for sql in sqls:
2930
spark.sql(sql)
3031

3132

33+
@pytest.mark.integration
3234
@pytest.mark.parametrize("format_version", [1, 2])
3335
def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
3436
identifier = 'default.table_partitioned_delete'
@@ -63,6 +65,7 @@ def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog
6365
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}
6466

6567

68+
@pytest.mark.integration
6669
@pytest.mark.parametrize("format_version", [1, 2])
6770
def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
6871
identifier = 'default.table_partitioned_delete'
@@ -92,10 +95,12 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat
9295
tbl = session_catalog.load_table(identifier)
9396
tbl.delete(EqualTo("number", 20))
9497

95-
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'delete', 'overwrite']
98+
# We don't delete a whole partition, so there is only a overwrite
99+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'overwrite']
96100
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 10], 'number': [30, 30]}
97101

98102

103+
@pytest.mark.integration
99104
@pytest.mark.parametrize("format_version", [1, 2])
100105
def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
101106
identifier = 'default.table_partitioned_delete'
@@ -123,10 +128,11 @@ def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCa
123128
tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data
124129

125130
# Open for discussion, do we want to create a new snapshot?
126-
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'delete']
131+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append']
127132
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [10, 10], 'number': [20, 30]}
128133

129134

135+
@pytest.mark.integration
130136
def test_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
131137
identifier = 'default.table_partitioned_delete'
132138

@@ -160,14 +166,105 @@ def test_partitioned_table_positional_deletes(spark: SparkSession, session_catal
160166

161167
tbl = session_catalog.load_table(identifier)
162168

163-
# Assert that there is just a single Parquet file
164-
assert len(list(tbl.scan().plan_files())) == 1
169+
# Assert that there is just a single Parquet file, that has one merge on read file
170+
files = list(tbl.scan().plan_files())
171+
assert len(files) == 1
172+
assert len(files[0].delete_files) == 1
165173

166174
# Will rewrite a data file with a positional delete
167175
tbl.delete(EqualTo("number", 40))
168176

169-
# Yet another wrong status by Spark
170177
# One positional delete has been added, but an OVERWRITE status is set
171-
# Related issue https://github.com/apache/iceberg/issues/9995
172-
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'overwrite', 'delete', 'overwrite']
178+
# https://github.com/apache/iceberg/issues/10122
179+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'overwrite', 'overwrite']
173180
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [10], 'number': [20]}
181+
182+
183+
@pytest.mark.integration
184+
def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None:
185+
identifier = 'default.table_partitioned_delete_sequence_number'
186+
187+
# This test case is a bit more complex. Here we run a MoR delete on a file, we make sure that
188+
# the manifest gets rewritten (but not the data file with a MoR), and check if the delete is still there
189+
# to assure that the sequence numbers are maintained
190+
191+
run_spark_commands(
192+
spark,
193+
[
194+
f"DROP TABLE IF EXISTS {identifier}",
195+
f"""
196+
CREATE TABLE {identifier} (
197+
number_partitioned int,
198+
number int
199+
)
200+
USING iceberg
201+
PARTITIONED BY (number_partitioned)
202+
TBLPROPERTIES(
203+
'format-version' = 2,
204+
'write.delete.mode'='merge-on-read',
205+
'write.update.mode'='merge-on-read',
206+
'write.merge.mode'='merge-on-read'
207+
)
208+
""",
209+
f"""
210+
INSERT INTO {identifier} VALUES (10, 100), (10, 101), (20, 200), (20, 201), (20, 202)
211+
""",
212+
# Generate a positional delete
213+
f"""
214+
DELETE FROM {identifier} WHERE number = 101
215+
""",
216+
],
217+
)
218+
219+
tbl = session_catalog.load_table(identifier)
220+
221+
files = list(tbl.scan().plan_files())
222+
assert len(files) == 2
223+
224+
# Will rewrite a data file with a positional delete
225+
tbl.delete(EqualTo("number", 201))
226+
227+
# One positional delete has been added, but an OVERWRITE status is set
228+
# https://github.com/apache/iceberg/issues/10122
229+
snapshots = tbl.snapshots()
230+
assert len(snapshots) == 4
231+
232+
# Snapshots produced by Spark
233+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ['append', 'overwrite']
234+
235+
# Snapshots produced by PyIceberg
236+
# This is a no-op since nothing has been added or deleted (because the predicate cannot drop a whole file)
237+
assert tbl.snapshots()[2].summary == Summary(
238+
Operation.DELETE,
239+
**{
240+
'total-data-files': '2',
241+
'total-delete-files': '1',
242+
'total-records': '5',
243+
'total-files-size': tbl.snapshots()[2].summary['total-files-size'],
244+
'total-position-deletes': '1',
245+
'total-equality-deletes': '0',
246+
},
247+
)
248+
# Will rewrite one parquet file
249+
assert tbl.snapshots()[3].summary == Summary(
250+
Operation.OVERWRITE,
251+
**{
252+
'added-files-size': '1145',
253+
'added-data-files': '1',
254+
'added-records': '2',
255+
'changed-partition-count': '1',
256+
'total-files-size': tbl.snapshots()[3].summary['total-files-size'],
257+
'total-delete-files': '0',
258+
'total-data-files': '1',
259+
'total-position-deletes': '0',
260+
'total-records': '2',
261+
'total-equality-deletes': '0',
262+
'deleted-data-files': '2',
263+
'removed-delete-files': '1',
264+
'deleted-records': '5',
265+
'removed-files-size': '3088',
266+
'removed-position-deletes': '1',
267+
},
268+
)
269+
270+
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [20, 20, 10], 'number': [200, 202, 100]}

0 commit comments

Comments
 (0)