Skip to content

Commit cf987c6

Browse files
lliangyu-linLeon LinFokko
authored
Fix: use new snapshot id in deleted manifest entry unless is existing entry (#2266)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Based on iceberg spec, when a manifest entry is marked as deleted, the snapshot id for when the entry was deleted should be used. https://iceberg.apache.org/spec/?h=deletes#manifest-entry-fields ``` When a file is replaced or deleted from the dataset, its manifest entry fields store the snapshot ID in which the file was deleted and status 2 (deleted). The file may be deleted from the file system when the snapshot in which it was deleted is garbage collected, assuming that older snapshots have also been garbage collected [1]. ``` https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestWriter.java#L178-L179 Incorrect snapshot id could lead to data being deleted during garbage collection when not supposed to. # Are these changes tested? Added in `test_deletes.py` # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. --> No --------- Co-authored-by: Leon Lin <[email protected]> Co-authored-by: Fokko Driesprong <[email protected]>
1 parent 8b43eb8 commit cf987c6

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

pyiceberg/table/update/snapshot.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,8 @@ def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], boo
407407
def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
408408
return ManifestEntry.from_args(
409409
status=status,
410-
snapshot_id=entry.snapshot_id,
410+
# When a file is replaced or deleted from the dataset, its manifest entry fields store the snapshot ID in which the file was deleted and status 2 (deleted).
411+
snapshot_id=self.snapshot_id if status == ManifestEntryStatus.DELETED else entry.snapshot_id,
411412
sequence_number=entry.sequence_number,
412413
file_sequence_number=entry.file_sequence_number,
413414
data_file=entry.data_file,

tests/integration/test_deletes.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
from pyiceberg.catalog.rest import RestCatalog
2626
from pyiceberg.exceptions import NoSuchTableError
27-
from pyiceberg.expressions import AlwaysTrue, EqualTo
27+
from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual
2828
from pyiceberg.manifest import ManifestEntryStatus
2929
from pyiceberg.partitioning import PartitionField, PartitionSpec
3030
from pyiceberg.schema import Schema
@@ -923,3 +923,55 @@ def test_delete_on_empty_table(spark: SparkSession, session_catalog: RestCatalog
923923

924924
# Assert that no new snapshot was created because no rows were deleted
925925
assert len(tbl.snapshots()) == 0
926+
927+
928+
@pytest.mark.integration
929+
def test_manifest_entry_after_deletes(session_catalog: RestCatalog) -> None:
930+
identifier = "default.test_manifest_entry_after_deletes"
931+
try:
932+
session_catalog.drop_table(identifier)
933+
except NoSuchTableError:
934+
pass
935+
936+
schema = pa.schema(
937+
[
938+
("id", pa.int32()),
939+
("name", pa.string()),
940+
]
941+
)
942+
943+
table = session_catalog.create_table(identifier, schema)
944+
data = pa.Table.from_pylist(
945+
[
946+
{"id": 1, "name": "foo"},
947+
{"id": 2, "name": "bar"},
948+
{"id": 3, "name": "bar"},
949+
{"id": 4, "name": "bar"},
950+
],
951+
schema=schema,
952+
)
953+
table.append(data)
954+
955+
def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapshot_id: int) -> None:
956+
current_snapshot = table.refresh().current_snapshot()
957+
assert current_snapshot is not None
958+
959+
manifest_files = current_snapshot.manifests(table.io)
960+
assert len(manifest_files) == 1
961+
962+
entries = manifest_files[0].fetch_manifest_entry(table.io, discard_deleted=False)
963+
assert len(entries) == 1
964+
entry = entries[0]
965+
assert entry.status == expected_status
966+
assert entry.snapshot_id == expected_snapshot_id
967+
968+
before_delete_snapshot = table.current_snapshot()
969+
assert before_delete_snapshot is not None
970+
971+
assert_manifest_entry(ManifestEntryStatus.ADDED, before_delete_snapshot.snapshot_id)
972+
973+
table.delete(LessThanOrEqual("id", 4))
974+
after_delete_snapshot = table.refresh().current_snapshot()
975+
assert after_delete_snapshot is not None
976+
977+
assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id)

0 commit comments

Comments
 (0)