diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 3ffb275ded..e6ca4104a1 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -405,7 +405,8 @@ def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], boo def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: return ManifestEntry.from_args( status=status, - snapshot_id=entry.snapshot_id, + # When a file is replaced or deleted from the dataset, its manifest entry fields store the snapshot ID in which the file was deleted and status 2 (deleted). + snapshot_id=self.snapshot_id if status == ManifestEntryStatus.DELETED else entry.snapshot_id, sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index abf8502ac7..21c3d12999 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -24,7 +24,7 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.expressions import AlwaysTrue, EqualTo +from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual from pyiceberg.manifest import ManifestEntryStatus from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -923,3 +923,55 @@ def test_delete_on_empty_table(spark: SparkSession, session_catalog: RestCatalog # Assert that no new snapshot was created because no rows were deleted assert len(tbl.snapshots()) == 0 + + +@pytest.mark.integration +def test_manifest_entry_after_deletes(session_catalog: RestCatalog) -> None: + identifier = "default.test_manifest_entry_after_deletes" + try: + session_catalog.drop_table(identifier) + except NoSuchTableError: + pass + + schema = pa.schema( + [ + ("id", pa.int32()), + ("name", pa.string()), + ] + ) + + table = session_catalog.create_table(identifier, schema) + data = pa.Table.from_pylist( + [ + {"id": 1, "name": "foo"}, + {"id": 2, "name": "bar"}, + {"id": 3, "name": "bar"}, + {"id": 4, "name": "bar"}, + ], + schema=schema, + ) + table.append(data) + + def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapshot_id: int) -> None: + current_snapshot = table.refresh().current_snapshot() + assert current_snapshot is not None + + manifest_files = current_snapshot.manifests(table.io) + assert len(manifest_files) == 1 + + entries = manifest_files[0].fetch_manifest_entry(table.io, discard_deleted=False) + assert len(entries) == 1 + entry = entries[0] + assert entry.status == expected_status + assert entry.snapshot_id == expected_snapshot_id + + before_delete_snapshot = table.current_snapshot() + assert before_delete_snapshot is not None + + assert_manifest_entry(ManifestEntryStatus.ADDED, before_delete_snapshot.snapshot_id) + + table.delete(LessThanOrEqual("id", 4)) + after_delete_snapshot = table.refresh().current_snapshot() + assert after_delete_snapshot is not None + + assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id)