|
113 | 113 | SnapshotLogEntry,
|
114 | 114 | SnapshotSummaryCollector,
|
115 | 115 | Summary,
|
| 116 | + ancestors_of, |
116 | 117 | update_snapshot_summaries,
|
117 | 118 | )
|
118 | 119 | from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
|
@@ -1956,6 +1957,10 @@ def _commit(self) -> UpdatesAndRequirements:
|
1956 | 1957 | """Apply the pending changes and commit."""
|
1957 | 1958 | return self._updates, self._requirements
|
1958 | 1959 |
|
| 1960 | + def _commit_if_ref_updates_exist(self) -> None: |
| 1961 | + self.commit() |
| 1962 | + self._updates, self._requirements = (), () |
| 1963 | + |
1959 | 1964 | def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
|
1960 | 1965 | """
|
1961 | 1966 | Create a new tag pointing to the given snapshot id.
|
@@ -2010,6 +2015,55 @@ def create_branch(
|
2010 | 2015 | self._requirements += requirement
|
2011 | 2016 | return self
|
2012 | 2017 |
|
| 2018 | + def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots: |
| 2019 | + """Rollback the table to the given snapshot id, whose snapshot needs to be an ancestor of the current table state.""" |
| 2020 | + self._commit_if_ref_updates_exist() |
| 2021 | + if self._transaction._table.snapshot_by_id(snapshot_id) is None: |
| 2022 | + raise ValidationError(f"Cannot roll back to unknown snapshot id: {snapshot_id}") |
| 2023 | + if snapshot_id not in { |
| 2024 | + ancestor.snapshot_id |
| 2025 | + for ancestor in ancestors_of(self._transaction._table.current_snapshot(), self._transaction.table_metadata) |
| 2026 | + }: |
| 2027 | + raise ValidationError(f"Cannot roll back to snapshot, not an ancestor of the current state: {snapshot_id}") |
| 2028 | + |
| 2029 | + update, requirement = self._transaction._set_ref_snapshot(snapshot_id=snapshot_id, ref_name="main", type="branch") |
| 2030 | + self._updates += update |
| 2031 | + self._requirements += requirement |
| 2032 | + return self |
| 2033 | + |
| 2034 | + def rollback_to_timestamp(self, timestamp: int) -> ManageSnapshots: |
| 2035 | + """Rollback the table to the snapshot right before the given timestamp.""" |
| 2036 | + self._commit_if_ref_updates_exist() |
| 2037 | + if (snapshot := self._transaction._table.snapshot_as_of_timestamp(timestamp, inclusive=False)) is None: |
| 2038 | + raise ValidationError(f"Cannot roll back, no valid snapshot older than: {timestamp}") |
| 2039 | + |
| 2040 | + update, requirement = self._transaction._set_ref_snapshot( |
| 2041 | + snapshot_id=snapshot.snapshot_id, ref_name="main", type="branch" |
| 2042 | + ) |
| 2043 | + self._updates += update |
| 2044 | + self._requirements += requirement |
| 2045 | + return self |
| 2046 | + |
| 2047 | + def set_current_snapshot(self, snapshot_id: Optional[int] = None, ref_name: Optional[str] = None) -> ManageSnapshots: |
| 2048 | + """Set the table to a specific snapshot identified either by its id or the branch/tag its on, not both.""" |
| 2049 | + self._commit_if_ref_updates_exist() |
| 2050 | + if (not snapshot_id or ref_name) and (snapshot_id or not ref_name): |
| 2051 | + raise ValidationError("Either snapshot_id or ref must be provided") |
| 2052 | + else: |
| 2053 | + if snapshot_id is None: |
| 2054 | + target_snapshot_id = self._transaction.table_metadata.refs[ref_name].snapshot_id # type:ignore |
| 2055 | + else: |
| 2056 | + target_snapshot_id = snapshot_id |
| 2057 | + if (snapshot := self._transaction._table.snapshot_by_id(target_snapshot_id)) is None: |
| 2058 | + raise ValidationError(f"Cannot set snapshot current with snapshot id: {snapshot_id} or ref_name: {ref_name}") |
| 2059 | + |
| 2060 | + update, requirement = self._transaction._set_ref_snapshot( |
| 2061 | + snapshot_id=snapshot.snapshot_id, ref_name="main", type="branch" |
| 2062 | + ) |
| 2063 | + self._updates += update |
| 2064 | + self._requirements += requirement |
| 2065 | + return self |
| 2066 | + |
2013 | 2067 |
|
2014 | 2068 | class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
|
2015 | 2069 | _schema: Schema
|
|
0 commit comments