Skip to content

Commit b44cfa9

Browse files
ForeverAngryFokkokevinjqliu
authored andcommitted
refactor: consolidate snapshot expiration into MaintenanceTable (apache#2143)
<!-- Thanks for opening a pull request! --> <!-- Closes apache#2150 --> # Rationale for this change - Consolidates snapshot expiration functionality from the standalone `ExpireSnapshots` class into the `MaintenanceTable` class for a unified maintenance API. - Resolves planned work left over from apache#1880, and closes apache#2142 - Achieves feature and API parity with the Java implementation for snapshot retention and table maintenance. # Features & Enhancements - Introduces `table.maintenance.expire_snapshots()` as the unified entry point for snapshot expiration and future maintenance operations. - Retains the existing `ExpireSnapshots` implementation internally. The `expire_snapshots()` method on `MaintenanceTable` now returns an `ExpireSnapshots` object, preserving transaction semantics and supporting context manager usage: ```python with table.maintenance.expire_snapshots() as expire_snapshots: expire_snapshots.by_id(1) expire_snapshots.by_id(2) ``` - Focuses this PR on refactoring and documentation improvements, while maintaining compatibility with the prior `ExpireSnapshots` interface. - Sets a foundation for future expansion of the `MaintenanceTable` abstraction to encapsulate additional maintenance operations. # Bug Fixes & Cleanups - **ManageSnapshots Cleanup ([apache#2151](apache#2151 - Removes an unrelated instance variable from the `ManageSnapshots` class, aligning with the Java reference implementation. # Testing & Documentation - **Testing:** - Tested the new API interface including: - Expiration by ID - Protection of branch/tag snapshots - **Documentation:** - Added and updated documentation to describe: - API usage examples Preview: <img width="1686" height="1015" alt="Screenshot 2025-08-11 at 1 37 04 PM" src="https://github.com/user-attachments/assets/f469f3fc-b4b1-4ec9-b1ca-b9185e22643e" /> # Are these changes tested? Yes. All changes are tested.~, with this PR predicated on the final changes from apache#1200.~ This work builds on the framework introduced by @jayceslesar in apache#1200 for the `MaintenanceTable`. # Are there any user-facing changes? --- **Closes:** - Closes apache#2151 - Closes apache#2142 --------- Co-authored-by: Fokko Driesprong <[email protected]> Co-authored-by: Kevin Liu <[email protected]>
1 parent 5956b24 commit b44cfa9

File tree

5 files changed

+108
-17
lines changed

5 files changed

+108
-17
lines changed

mkdocs/docs/api.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1287,6 +1287,47 @@ with table.manage_snapshots() as ms:
12871287
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
12881288
```
12891289

1290+
## Table Maintenance
1291+
1292+
PyIceberg provides table maintenance operations through the `table.maintenance` API. This provides a clean interface for performing maintenance tasks like snapshot expiration.
1293+
1294+
### Snapshot Expiration
1295+
1296+
Expire old snapshots to clean up table metadata and reduce storage costs:
1297+
1298+
```python
1299+
# Basic usage - expire a specific snapshot by ID
1300+
table.maintenance.expire_snapshots().by_id(12345).commit()
1301+
1302+
# Context manager usage (recommended for multiple operations)
1303+
with table.maintenance.expire_snapshots() as expire:
1304+
expire.by_id(12345)
1305+
expire.by_id(67890)
1306+
# Automatically commits when exiting the context
1307+
1308+
# Method chaining
1309+
table.maintenance.expire_snapshots().by_id(12345).commit()
1310+
```
1311+
1312+
#### Real-world Example
1313+
1314+
```python
1315+
def cleanup_old_snapshots(table_name: str, snapshot_ids: list[int]):
1316+
"""Remove specific snapshots from a table."""
1317+
catalog = load_catalog("production")
1318+
table = catalog.load_table(table_name)
1319+
1320+
# Use context manager for safe transaction handling
1321+
with table.maintenance.expire_snapshots() as expire:
1322+
for snapshot_id in snapshot_ids:
1323+
expire.by_id(snapshot_id)
1324+
1325+
print(f"Expired {len(snapshot_ids)} snapshots from {table_name}")
1326+
1327+
# Usage
1328+
cleanup_old_snapshots("analytics.user_events", [12345, 67890, 11111])
1329+
```
1330+
12901331
## Views
12911332

12921333
PyIceberg supports view operations.

pyiceberg/table/__init__.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
from pyiceberg.schema import Schema
8181
from pyiceberg.table.inspect import InspectTable
8282
from pyiceberg.table.locations import LocationProvider, load_location_provider
83+
from pyiceberg.table.maintenance import MaintenanceTable
8384
from pyiceberg.table.metadata import (
8485
INITIAL_SEQUENCE_NUMBER,
8586
TableMetadata,
@@ -115,7 +116,7 @@
115116
update_table_metadata,
116117
)
117118
from pyiceberg.table.update.schema import UpdateSchema
118-
from pyiceberg.table.update.snapshot import ExpireSnapshots, ManageSnapshots, UpdateSnapshot, _FastAppendFiles
119+
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
119120
from pyiceberg.table.update.spec import UpdateSpec
120121
from pyiceberg.table.update.statistics import UpdateStatistics
121122
from pyiceberg.transforms import IdentityTransform
@@ -1086,6 +1087,15 @@ def inspect(self) -> InspectTable:
10861087
"""
10871088
return InspectTable(self)
10881089

1090+
@property
1091+
def maintenance(self) -> MaintenanceTable:
1092+
"""Return the MaintenanceTable object for maintenance.
1093+
1094+
Returns:
1095+
MaintenanceTable object based on this Table.
1096+
"""
1097+
return MaintenanceTable(self)
1098+
10891099
def refresh(self) -> Table:
10901100
"""Refresh the current table metadata.
10911101
@@ -1258,10 +1268,6 @@ def manage_snapshots(self) -> ManageSnapshots:
12581268
"""
12591269
return ManageSnapshots(transaction=Transaction(self, autocommit=True))
12601270

1261-
def expire_snapshots(self) -> ExpireSnapshots:
1262-
"""Shorthand to run expire snapshots by id or by a timestamp."""
1263-
return ExpireSnapshots(transaction=Transaction(self, autocommit=True))
1264-
12651271
def update_statistics(self) -> UpdateStatistics:
12661272
"""
12671273
Shorthand to run statistics management operations like add statistics and remove statistics.

pyiceberg/table/maintenance.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
import logging
20+
from typing import TYPE_CHECKING
21+
22+
logger = logging.getLogger(__name__)
23+
24+
25+
if TYPE_CHECKING:
26+
from pyiceberg.table import Table
27+
from pyiceberg.table.update.snapshot import ExpireSnapshots
28+
29+
30+
class MaintenanceTable:
31+
tbl: Table
32+
33+
def __init__(self, tbl: Table) -> None:
34+
self.tbl = tbl
35+
36+
def expire_snapshots(self) -> ExpireSnapshots:
37+
"""Return an ExpireSnapshots builder for snapshot expiration operations.
38+
39+
Returns:
40+
ExpireSnapshots builder for configuring and executing snapshot expiration.
41+
"""
42+
from pyiceberg.table import Transaction
43+
from pyiceberg.table.update.snapshot import ExpireSnapshots
44+
45+
return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True))

pyiceberg/table/update/snapshot.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -907,8 +907,7 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots:
907907

908908

909909
class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
910-
"""
911-
Expire snapshots by ID.
910+
"""Expire snapshots by ID.
912911
913912
Use table.expire_snapshots().<operation>().commit() to run a specific operation.
914913
Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
@@ -953,7 +952,7 @@ def _get_protected_snapshot_ids(self) -> Set[int]:
953952

954953
return protected_ids
955954

956-
def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots:
955+
def by_id(self, snapshot_id: int) -> ExpireSnapshots:
957956
"""
958957
Expire a snapshot by its ID.
959958
@@ -974,7 +973,7 @@ def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots:
974973

975974
return self
976975

977-
def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
976+
def by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
978977
"""
979978
Expire multiple snapshots by their IDs.
980979
@@ -986,10 +985,10 @@ def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
986985
This for method chaining.
987986
"""
988987
for snapshot_id in snapshot_ids:
989-
self.expire_snapshot_by_id(snapshot_id)
988+
self.by_id(snapshot_id)
990989
return self
991990

992-
def expire_snapshots_older_than(self, timestamp_ms: int) -> "ExpireSnapshots":
991+
def older_than(self, timestamp_ms: int) -> "ExpireSnapshots":
993992
"""
994993
Expire all unprotected snapshots with a timestamp older than a given value.
995994

tests/table/test_expire_snapshots.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None:
4343

4444
# Attempt to expire the HEAD snapshot and expect a ValueError
4545
with pytest.raises(ValueError, match=f"Snapshot with ID {HEAD_SNAPSHOT} is protected and cannot be expired."):
46-
table_v2.expire_snapshots().expire_snapshot_by_id(HEAD_SNAPSHOT).commit()
46+
table_v2.maintenance.expire_snapshots().by_id(HEAD_SNAPSHOT).commit()
4747

4848
table_v2.catalog.commit_table.assert_not_called()
4949

@@ -66,7 +66,7 @@ def test_cannot_expire_tagged_snapshot(table_v2: Table) -> None:
6666
assert any(ref.snapshot_id == TAGGED_SNAPSHOT for ref in table_v2.metadata.refs.values())
6767

6868
with pytest.raises(ValueError, match=f"Snapshot with ID {TAGGED_SNAPSHOT} is protected and cannot be expired."):
69-
table_v2.expire_snapshots().expire_snapshot_by_id(TAGGED_SNAPSHOT).commit()
69+
table_v2.maintenance.expire_snapshots().by_id(TAGGED_SNAPSHOT).commit()
7070

7171
table_v2.catalog.commit_table.assert_not_called()
7272

@@ -98,7 +98,7 @@ def test_expire_unprotected_snapshot(table_v2: Table) -> None:
9898
assert all(ref.snapshot_id != EXPIRE_SNAPSHOT for ref in table_v2.metadata.refs.values())
9999

100100
# Expire the snapshot
101-
table_v2.expire_snapshots().expire_snapshot_by_id(EXPIRE_SNAPSHOT).commit()
101+
table_v2.maintenance.expire_snapshots().by_id(EXPIRE_SNAPSHOT).commit()
102102

103103
table_v2.catalog.commit_table.assert_called_once()
104104
remaining_snapshots = table_v2.metadata.snapshots
@@ -114,7 +114,7 @@ def test_expire_nonexistent_snapshot_raises(table_v2: Table) -> None:
114114
table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}})
115115

116116
with pytest.raises(ValueError, match=f"Snapshot with ID {NONEXISTENT_SNAPSHOT} does not exist."):
117-
table_v2.expire_snapshots().expire_snapshot_by_id(NONEXISTENT_SNAPSHOT).commit()
117+
table_v2.maintenance.expire_snapshots().by_id(NONEXISTENT_SNAPSHOT).commit()
118118

119119
table_v2.catalog.commit_table.assert_not_called()
120120

@@ -152,7 +152,7 @@ def test_expire_snapshots_by_timestamp_skips_protected(table_v2: Table) -> None:
152152
)
153153
table_v2.catalog.commit_table.return_value = mock_response
154154

155-
table_v2.expire_snapshots().expire_snapshots_older_than(future_timestamp).commit()
155+
table_v2.maintenance.expire_snapshots().older_than(future_timestamp).commit()
156156
# Update metadata to reflect the commit (as in other tests)
157157
table_v2.metadata = mock_response.metadata
158158

@@ -215,7 +215,7 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None:
215215
assert all(ref.snapshot_id not in (EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2) for ref in table_v2.metadata.refs.values())
216216

217217
# Expire the snapshots
218-
table_v2.expire_snapshots().expire_snapshots_by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit()
218+
table_v2.maintenance.expire_snapshots().by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit()
219219

220220
table_v2.catalog.commit_table.assert_called_once()
221221
remaining_snapshots = table_v2.metadata.snapshots

0 commit comments

Comments
 (0)