Skip to content

Commit e1627c4

Browse files
committed
### Features & Enhancements
- **Duplicate File Remediation #2130** - Added `deduplicate_data_files` to the `MaintenanceTable` class. - Enables detection and removal of duplicate data files, improving table hygiene and storage efficiency. - **Support `retainLast` and `setMinSnapshotsToKeep` Snapshot Retention Policies #2150** - Added new snapshot retention methods to `MaintenanceTable` for feature parity with the Java API: - `retain_last_n_snapshots(n)`: Retain only the last N snapshots. - `expire_snapshots_older_than_with_retention(timestamp_ms, retain_last_n=None, min_snapshots_to_keep=None)`: Expire snapshots older than a timestamp, with additional retention constraints. - `expire_snapshots_with_retention_policy(timestamp_ms=None, retain_last_n=None, min_snapshots_to_keep=None)`: Unified retention policy supporting time-based and count-based constraints. - All retention logic respects protected snapshots (branches/tags) and includes guardrails to prevent over-aggressive expiration. ### Bug Fixes & Cleanups - **Remove unrelated instance variable from the `ManageSnapshots` class #2151** - Removed an errant member variable from the `ManageSnapshots` class, aligning the implementation with the intended design and the Java reference. ### Testing & Documentation - Consolidated all snapshot expiration and retention tests into a single file (`test_retention_strategies.py`), covering: - Basic expiration by ID and timestamp. - Protection of branch/tag snapshots. - Retention guardrails and combined policies. - Deduplication of data files. - Added and updated documentation to describe all new retention strategies, deduplication, and API parity improvements.
1 parent 42e55c9 commit e1627c4

File tree

5 files changed

+133
-143
lines changed

5 files changed

+133
-143
lines changed

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
update_table_metadata,
117117
)
118118
from pyiceberg.table.update.schema import UpdateSchema
119-
from pyiceberg.table.update.snapshot import ExpireSnapshots, ManageSnapshots, UpdateSnapshot, _FastAppendFiles
119+
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
120120
from pyiceberg.table.update.spec import UpdateSpec
121121
from pyiceberg.table.update.statistics import UpdateStatistics
122122
from pyiceberg.transforms import IdentityTransform

pyiceberg/table/maintenance.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from __future__ import annotations
1818

1919
import logging
20-
from concurrent.futures import ThreadPoolExecutor
2120
from typing import TYPE_CHECKING, List, Optional, Set, Union
2221

2322
from pyiceberg.manifest import DataFile

test_retention_strategies.py

Lines changed: 0 additions & 116 deletions
This file was deleted.

tests/table/test_maintenance_table.py

Whitespace-only changes.

tests/table/test_expire_snapshots.py renamed to tests/table/test_retention_strategies.py

Lines changed: 132 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,138 @@
1-
# Licensed to the Apache Software Foundation (ASF) under one
2-
# or more contributor license agreements. See the NOTICE file
3-
# distributed with this work for additional information
4-
# regarding copyright ownership. The ASF licenses this file
5-
# to you under the Apache License, Version 2.0 (the
6-
# "License"); you may not use this file except in compliance
7-
# with the License. You may obtain a copy of the License at
8-
#
9-
# http://www.apache.org/licenses/LICENSE-2.0
10-
#
11-
# Unless required by applicable law or agreed to in writing,
12-
# software distributed under the License is distributed on an
13-
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
# KIND, either express or implied. See the License for the
15-
# specific language governing permissions and limitations
16-
# under the License.
17-
from unittest.mock import MagicMock
18-
from uuid import uuid4
1+
#!/usr/bin/env python3
2+
"""
3+
Test script to validate the retention strategies implementation in MaintenanceTable.
4+
5+
This script demonstrates the new retention features:
6+
1. retain_last_n_snapshots() - Keep only the last N snapshots
7+
2. expire_snapshots_older_than_with_retention() - Time-based expiration with retention constraints
8+
3. expire_snapshots_with_retention_policy() - Comprehensive retention policy
9+
"""
10+
11+
# Example usage (commented out since we don't have an actual table)
12+
"""
13+
from pyiceberg.table.maintenance import MaintenanceTable
14+
from pyiceberg.table import Table
15+
16+
# Assume we have a table instance
17+
table = Table(...) # Initialize your table
18+
maintenance = MaintenanceTable(table)
19+
20+
# Example 1: Keep only the last 5 snapshots regardless of age
21+
# This is helpful when regular snapshot creation occurs and users always want
22+
# to keep the last few for rollback
23+
maintenance.retain_last_n_snapshots(5)
24+
25+
# Example 2: Expire snapshots older than a timestamp but keep at least 3 total
26+
# This acts as a guardrail to prevent aggressive expiration logic from removing too many snapshots
27+
import time
28+
one_week_ago = int((time.time() - 7 * 24 * 60 * 60) * 1000) # 7 days ago in milliseconds
29+
maintenance.expire_snapshots_older_than_with_retention(
30+
timestamp_ms=one_week_ago,
31+
min_snapshots_to_keep=3
32+
)
33+
34+
# Example 3: Combined policy - expire old snapshots but keep last 10 and at least 5 total
35+
# This provides comprehensive control combining both strategies
36+
maintenance.expire_snapshots_with_retention_policy(
37+
timestamp_ms=one_week_ago,
38+
retain_last_n=10,
39+
min_snapshots_to_keep=5
40+
)
41+
42+
# Example 4: Just keep the last 20 snapshots (no time constraint)
43+
expired_ids = maintenance.expire_snapshots_with_retention_policy(retain_last_n=20)
44+
print(f"Expired {len(expired_ids)} snapshots")
45+
"""
1946

2047
import pytest
21-
48+
from unittest.mock import MagicMock
49+
from uuid import uuid4
50+
from types import SimpleNamespace
2251
from pyiceberg.table import CommitTableResponse
2352

53+
def _make_snapshots(ids_and_timestamps):
54+
return [SimpleNamespace(snapshot_id=sid, timestamp_ms=ts, parent_snapshot_id=None) for sid, ts in ids_and_timestamps]
55+
56+
def test_retain_last_n_snapshots(table_v2):
57+
# Setup: 5 snapshots, keep last 3
58+
ids_and_ts = [
59+
(1, 1000),
60+
(2, 2000),
61+
(3, 3000),
62+
(4, 4000),
63+
(5, 5000),
64+
]
65+
snapshots = _make_snapshots(ids_and_ts)
66+
table_v2.metadata = table_v2.metadata.model_copy(update={"snapshots": snapshots, "refs": {}})
67+
table_v2.catalog = MagicMock()
68+
# Simulate commit response with only last 3 snapshots
69+
keep_ids = [3, 4, 5]
70+
mock_response = CommitTableResponse(
71+
metadata=table_v2.metadata.model_copy(update={"snapshots": [s for s in snapshots if s.snapshot_id in keep_ids]}),
72+
metadata_location="mock://metadata/location",
73+
uuid=uuid4(),
74+
)
75+
table_v2.catalog.commit_table.return_value = mock_response
76+
table_v2.maintenance.retain_last_n_snapshots(3)
77+
table_v2.catalog.commit_table.assert_called_once()
78+
# Update metadata to reflect commit
79+
table_v2.metadata = mock_response.metadata
80+
remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots}
81+
assert remaining_ids == set(keep_ids)
82+
83+
def test_min_snapshots_to_keep(table_v2):
84+
# Setup: 5 snapshots, expire all older than 4500, but keep at least 3
85+
ids_and_ts = [
86+
(1, 1000),
87+
(2, 2000),
88+
(3, 3000),
89+
(4, 4000),
90+
(5, 5000),
91+
]
92+
snapshots = _make_snapshots(ids_and_ts)
93+
table_v2.metadata = table_v2.metadata.model_copy(update={"snapshots": snapshots, "refs": {}})
94+
table_v2.catalog = MagicMock()
95+
# Only 1,2 should be expired (to keep 3 total)
96+
keep_ids = [3, 4, 5]
97+
mock_response = CommitTableResponse(
98+
metadata=table_v2.metadata.model_copy(update={"snapshots": [s for s in snapshots if s.snapshot_id in keep_ids]}),
99+
metadata_location="mock://metadata/location",
100+
uuid=uuid4(),
101+
)
102+
table_v2.catalog.commit_table.return_value = mock_response
103+
table_v2.maintenance.expire_snapshots_older_than_with_retention(timestamp_ms=4500, min_snapshots_to_keep=3)
104+
table_v2.catalog.commit_table.assert_called_once()
105+
table_v2.metadata = mock_response.metadata
106+
remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots}
107+
assert remaining_ids == set(keep_ids)
108+
109+
def test_combined_constraints(table_v2):
110+
# Setup: 5 snapshots, expire all older than 3500, keep last 2, min 4 total
111+
ids_and_ts = [
112+
(1, 1000),
113+
(2, 2000),
114+
(3, 3000),
115+
(4, 4000),
116+
(5, 5000),
117+
]
118+
snapshots = _make_snapshots(ids_and_ts)
119+
table_v2.metadata = table_v2.metadata.model_copy(update={"snapshots": snapshots, "refs": {}})
120+
table_v2.catalog = MagicMock()
121+
# Only 1 should be expired (to keep last 2 and min 4 total)
122+
keep_ids = [2, 3, 4, 5]
123+
mock_response = CommitTableResponse(
124+
metadata=table_v2.metadata.model_copy(update={"snapshots": [s for s in snapshots if s.snapshot_id in keep_ids]}),
125+
metadata_location="mock://metadata/location",
126+
uuid=uuid4(),
127+
)
128+
table_v2.catalog.commit_table.return_value = mock_response
129+
table_v2.maintenance.expire_snapshots_with_retention_policy(
130+
timestamp_ms=3500, retain_last_n=2, min_snapshots_to_keep=4
131+
)
132+
table_v2.catalog.commit_table.assert_called_once()
133+
table_v2.metadata = mock_response.metadata
134+
remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots}
135+
assert remaining_ids == set(keep_ids)
24136

25137
def test_cannot_expire_protected_head_snapshot(table_v2) -> None:
26138
"""Test that a HEAD (branch) snapshot cannot be expired."""
@@ -47,7 +159,6 @@ def test_cannot_expire_protected_head_snapshot(table_v2) -> None:
47159

48160
table_v2.catalog.commit_table.assert_not_called()
49161

50-
51162
def test_cannot_expire_tagged_snapshot(table_v2) -> None:
52163
"""Test that a tagged snapshot cannot be expired."""
53164
TAGGED_SNAPSHOT = 3051729675574597004
@@ -70,7 +181,6 @@ def test_cannot_expire_tagged_snapshot(table_v2) -> None:
70181

71182
table_v2.catalog.commit_table.assert_not_called()
72183

73-
74184
def test_expire_unprotected_snapshot(table_v2) -> None:
75185
"""Test that an unprotected snapshot can be expired."""
76186
EXPIRE_SNAPSHOT = 3051729675574597004
@@ -105,7 +215,6 @@ def test_expire_unprotected_snapshot(table_v2) -> None:
105215
assert EXPIRE_SNAPSHOT not in remaining_snapshots
106216
assert len(table_v2.metadata.snapshots) == 1
107217

108-
109218
def test_expire_nonexistent_snapshot_raises(table_v2) -> None:
110219
"""Test that trying to expire a non-existent snapshot raises an error."""
111220
NONEXISTENT_SNAPSHOT = 9999999999999999999
@@ -118,7 +227,6 @@ def test_expire_nonexistent_snapshot_raises(table_v2) -> None:
118227

119228
table_v2.catalog.commit_table.assert_not_called()
120229

121-
122230
def test_expire_snapshots_by_timestamp_skips_protected(table_v2) -> None:
123231
# Setup: two snapshots; both are old, but one is head/tag protected
124232
HEAD_SNAPSHOT = 3051729675574597004
@@ -169,7 +277,6 @@ def test_expire_snapshots_by_timestamp_skips_protected(table_v2) -> None:
169277
assert remove_update is not None
170278
assert remove_update.snapshot_ids == []
171279

172-
173280
def test_expire_snapshots_by_ids(table_v2) -> None:
174281
"""Test that multiple unprotected snapshots can be expired by IDs."""
175282
EXPIRE_SNAPSHOT_1 = 3051729675574597004
@@ -221,4 +328,4 @@ def test_expire_snapshots_by_ids(table_v2) -> None:
221328
remaining_snapshots = table_v2.metadata.snapshots
222329
assert EXPIRE_SNAPSHOT_1 not in remaining_snapshots
223330
assert EXPIRE_SNAPSHOT_2 not in remaining_snapshots
224-
assert len(table_v2.metadata.snapshots) == 1
331+
assert len(table_v2.metadata.snapshots) == 1

0 commit comments

Comments
 (0)