Skip to content

Fix: use new snapshot id in deleted manifest entry unless is existing entry #2266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], boo
def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
return ManifestEntry.from_args(
status=status,
snapshot_id=entry.snapshot_id,
# When a file is replaced or deleted from the dataset, its manifest entry fields store the snapshot ID in which the file was deleted and status 2 (deleted).
snapshot_id=self.snapshot_id if status == ManifestEntryStatus.DELETED else entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
Expand Down
54 changes: 53 additions & 1 deletion tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import AlwaysTrue, EqualTo
from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual
from pyiceberg.manifest import ManifestEntryStatus
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
Expand Down Expand Up @@ -923,3 +923,55 @@ def test_delete_on_empty_table(spark: SparkSession, session_catalog: RestCatalog

# Assert that no new snapshot was created because no rows were deleted
assert len(tbl.snapshots()) == 0


@pytest.mark.integration
def test_manifest_entry_after_deletes(session_catalog: RestCatalog) -> None:
identifier = "default.test_manifest_entry_after_deletes"
try:
session_catalog.drop_table(identifier)
except NoSuchTableError:
pass

schema = pa.schema(
[
("id", pa.int32()),
("name", pa.string()),
]
)

table = session_catalog.create_table(identifier, schema)
data = pa.Table.from_pylist(
[
{"id": 1, "name": "foo"},
{"id": 2, "name": "bar"},
{"id": 3, "name": "bar"},
{"id": 4, "name": "bar"},
],
schema=schema,
)
table.append(data)

def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapshot_id: int) -> None:
current_snapshot = table.refresh().current_snapshot()
assert current_snapshot is not None

manifest_files = current_snapshot.manifests(table.io)
assert len(manifest_files) == 1

entries = manifest_files[0].fetch_manifest_entry(table.io, discard_deleted=False)
assert len(entries) == 1
entry = entries[0]
assert entry.status == expected_status
assert entry.snapshot_id == expected_snapshot_id

before_delete_snapshot = table.current_snapshot()
assert before_delete_snapshot is not None

assert_manifest_entry(ManifestEntryStatus.ADDED, before_delete_snapshot.snapshot_id)

table.delete(LessThanOrEqual("id", 4))
after_delete_snapshot = table.refresh().current_snapshot()
assert after_delete_snapshot is not None

assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id)
Loading