Skip to content

Commit f22d46d

Browse files
committed
support rollback and set current snapshot operations
1 parent a5ce407 commit f22d46d

File tree

1 file changed

+53
-0
lines changed

1 file changed

+53
-0
lines changed

pyiceberg/table/__init__.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1975,6 +1975,10 @@ def _commit(self) -> UpdatesAndRequirements:
19751975
"""Apply the pending changes and commit."""
19761976
return self._updates, self._requirements
19771977

1978+
def _commit_if_ref_updates_exist(self) -> None:
1979+
self.commit()
1980+
self._updates, self._requirements = (), ()
1981+
19781982
def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
19791983
"""
19801984
Create a new tag pointing to the given snapshot id.
@@ -2029,6 +2033,55 @@ def create_branch(
20292033
self._requirements += requirement
20302034
return self
20312035

2036+
def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots:
2037+
"""Rollback the table to the given snapshot id, whose snapshot needs to be an ancestor of the current table state."""
2038+
self._commit_if_ref_updates_exist()
2039+
if self._transaction._table.snapshot_by_id(snapshot_id) is None:
2040+
raise ValidationError(f"Cannot roll back to unknown snapshot id: {snapshot_id}")
2041+
if snapshot_id not in {
2042+
ancestor.snapshot_id
2043+
for ancestor in ancestors_of(self._transaction._table.current_snapshot(), self._transaction.table_metadata)
2044+
}:
2045+
raise ValidationError(f"Cannot roll back to snapshot, not an ancestor of the current state: {snapshot_id}")
2046+
2047+
update, requirement = self._transaction._set_ref_snapshot(snapshot_id=snapshot_id, ref_name="main", type="branch")
2048+
self._updates += update
2049+
self._requirements += requirement
2050+
return self
2051+
2052+
def rollback_to_timestamp(self, timestamp: int) -> ManageSnapshots:
2053+
"""Rollback the table to the snapshot right before the given timestamp."""
2054+
self._commit_if_ref_updates_exist()
2055+
if (snapshot := self._transaction._table.snapshot_as_of_timestamp(timestamp, inclusive=False)) is None:
2056+
raise ValidationError(f"Cannot roll back, no valid snapshot older than: {timestamp}")
2057+
2058+
update, requirement = self._transaction._set_ref_snapshot(
2059+
snapshot_id=snapshot.snapshot_id, ref_name="main", type="branch"
2060+
)
2061+
self._updates += update
2062+
self._requirements += requirement
2063+
return self
2064+
2065+
def set_current_snapshot(self, snapshot_id: Optional[int] = None, ref_name: Optional[str] = None) -> ManageSnapshots:
2066+
"""Set the table to a specific snapshot identified either by its id or the branch/tag its on, not both."""
2067+
self._commit_if_ref_updates_exist()
2068+
if (not snapshot_id or ref_name) and (snapshot_id or not ref_name):
2069+
raise ValidationError("Either snapshot_id or ref must be provided")
2070+
else:
2071+
if snapshot_id is None:
2072+
target_snapshot_id = self._transaction.table_metadata.refs[ref_name].snapshot_id # type:ignore
2073+
else:
2074+
target_snapshot_id = snapshot_id
2075+
if (snapshot := self._transaction._table.snapshot_by_id(target_snapshot_id)) is None:
2076+
raise ValidationError(f"Cannot set snapshot current with snapshot id: {snapshot_id} or ref_name: {ref_name}")
2077+
2078+
update, requirement = self._transaction._set_ref_snapshot(
2079+
snapshot_id=snapshot.snapshot_id, ref_name="main", type="branch"
2080+
)
2081+
self._updates += update
2082+
self._requirements += requirement
2083+
return self
2084+
20322085

20332086
class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
20342087
_schema: Schema

0 commit comments

Comments
 (0)