-
Notifications
You must be signed in to change notification settings - Fork 344
Create rollback and set snapshot APIs #758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
f22d46d
45c25db
ca63831
f7e192a
6859fa4
ea0e645
dc4028b
7fba98b
1f4a404
59f1626
7c7907b
e563b7e
386496f
5adccb9
8885f78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -106,13 +106,14 @@ | |||||||||||||||||||||||||||||
NameMapping, | ||||||||||||||||||||||||||||||
update_mapping, | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef | ||||||||||||||||||||||||||||||
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType | ||||||||||||||||||||||||||||||
from pyiceberg.table.snapshots import ( | ||||||||||||||||||||||||||||||
Operation, | ||||||||||||||||||||||||||||||
Snapshot, | ||||||||||||||||||||||||||||||
SnapshotLogEntry, | ||||||||||||||||||||||||||||||
SnapshotSummaryCollector, | ||||||||||||||||||||||||||||||
Summary, | ||||||||||||||||||||||||||||||
ancestor_right_before_timestamp, | ||||||||||||||||||||||||||||||
ancestors_of, | ||||||||||||||||||||||||||||||
update_snapshot_summaries, | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
|
@@ -299,7 +300,12 @@ def __exit__(self, _: Any, value: Any, traceback: Any) -> None: | |||||||||||||||||||||||||||||
"""Close and commit the transaction.""" | ||||||||||||||||||||||||||||||
self.commit_transaction() | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction: | ||||||||||||||||||||||||||||||
def _apply( | ||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||
updates: Tuple[TableUpdate, ...], | ||||||||||||||||||||||||||||||
requirements: Tuple[TableRequirement, ...] = (), | ||||||||||||||||||||||||||||||
commit_transaction_now: bool = True, | ||||||||||||||||||||||||||||||
) -> Transaction: | ||||||||||||||||||||||||||||||
"""Check if the requirements are met, and applies the updates to the metadata.""" | ||||||||||||||||||||||||||||||
for requirement in requirements: | ||||||||||||||||||||||||||||||
requirement.validate(self.table_metadata) | ||||||||||||||||||||||||||||||
|
@@ -309,7 +315,7 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ | |||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
self.table_metadata = update_table_metadata(self.table_metadata, updates) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
if self._autocommit: | ||||||||||||||||||||||||||||||
if self._autocommit and commit_transaction_now: | ||||||||||||||||||||||||||||||
self.commit_transaction() | ||||||||||||||||||||||||||||||
self._updates = () | ||||||||||||||||||||||||||||||
self._requirements = () | ||||||||||||||||||||||||||||||
|
@@ -402,39 +408,6 @@ def set_ref_snapshot( | |||||||||||||||||||||||||||||
requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),) | ||||||||||||||||||||||||||||||
return self._apply(updates, requirements) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def _set_ref_snapshot( | ||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||
snapshot_id: int, | ||||||||||||||||||||||||||||||
ref_name: str, | ||||||||||||||||||||||||||||||
type: str, | ||||||||||||||||||||||||||||||
max_ref_age_ms: Optional[int] = None, | ||||||||||||||||||||||||||||||
max_snapshot_age_ms: Optional[int] = None, | ||||||||||||||||||||||||||||||
min_snapshots_to_keep: Optional[int] = None, | ||||||||||||||||||||||||||||||
) -> UpdatesAndRequirements: | ||||||||||||||||||||||||||||||
"""Update a ref to a snapshot. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||
The updates and requirements for the set-snapshot-ref staged | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
updates = ( | ||||||||||||||||||||||||||||||
SetSnapshotRefUpdate( | ||||||||||||||||||||||||||||||
snapshot_id=snapshot_id, | ||||||||||||||||||||||||||||||
ref_name=ref_name, | ||||||||||||||||||||||||||||||
type=type, | ||||||||||||||||||||||||||||||
max_ref_age_ms=max_ref_age_ms, | ||||||||||||||||||||||||||||||
max_snapshot_age_ms=max_snapshot_age_ms, | ||||||||||||||||||||||||||||||
min_snapshots_to_keep=min_snapshots_to_keep, | ||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
requirements = ( | ||||||||||||||||||||||||||||||
AssertRefSnapshotId( | ||||||||||||||||||||||||||||||
snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None, | ||||||||||||||||||||||||||||||
ref=ref_name, | ||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
return updates, requirements | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: | ||||||||||||||||||||||||||||||
"""Create a new UpdateSchema to alter the columns of this table. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
|
@@ -1975,6 +1948,52 @@ def _commit(self) -> UpdatesAndRequirements: | |||||||||||||||||||||||||||||
"""Apply the pending changes and commit.""" | ||||||||||||||||||||||||||||||
return self._updates, self._requirements | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def _commit_if_ref_updates_exist(self) -> None: | ||||||||||||||||||||||||||||||
self._transaction._apply(*self._commit(), commit_transaction_now=False) | ||||||||||||||||||||||||||||||
self._updates, self._requirements = (), () | ||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to Java implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only issue here is that iceberg-python/pyiceberg/table/__init__.py Lines 1508 to 1521 in 2252e71
where autocommit is set to true.
One possible way to fix this is that we can add additional parameters in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated! Now there's an extra parameter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, I'm re-opening this resolved conversation, since I don't think adding the additional parameter is enough. Say, in the future, we have more APIs like: branch_name, min_snapshots_to_keep = "test_branch_min_snapshots_to_keep", 2
with tbl.manage_snapshots() as ms:
ms.create_branch(branch_name=branch_name, snapshot_id=snapshot_id)
ms.set_min_snapshots_to_keep(branch_name=branch_name, min_snapshots_to_keep=min_snapshots_to_keep) The updates and requirements would be :
The 2nd requirement will fail with a In To fix this, we might consider one of the following solutions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chinmay-bhat Thank you so much for digging into this issue! I think you've made a great point. I am thinking of a similar solution like your first point: to derive a list of requirements when we commit the transaction: https://github.com/apache/iceberg/blob/d69ba0568a2e07dfb5af233350ad5668d9aef134/core/src/main/java/org/apache/iceberg/UpdateRequirements.java#L50-L58 This will save us from manually specifying requirements for every Let me research more on this and get back to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @HonahX, should I make a new issue for this? Since changing how we specify requirements is not strictly in the scope of this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @chinmay-bhat. Sorry for the long wait🙏. I was distracted by other stuff and some blocking issues for 0.7.0 release. Yes, please feel free to create an issue to further discuss it. I can reply to that when I get something. |
||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def _stage_main_branch_snapshot_ref(self, snapshot_id: int) -> None: | ||||||||||||||||||||||||||||||
update, requirement = self._set_ref_snapshot( | ||||||||||||||||||||||||||||||
snapshot_id=snapshot_id, ref_name=MAIN_BRANCH, type=str(SnapshotRefType.BRANCH) | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
self._updates += update | ||||||||||||||||||||||||||||||
self._requirements += requirement | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def _set_ref_snapshot( | ||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||
snapshot_id: int, | ||||||||||||||||||||||||||||||
ref_name: str, | ||||||||||||||||||||||||||||||
type: str, | ||||||||||||||||||||||||||||||
max_ref_age_ms: Optional[int] = None, | ||||||||||||||||||||||||||||||
max_snapshot_age_ms: Optional[int] = None, | ||||||||||||||||||||||||||||||
min_snapshots_to_keep: Optional[int] = None, | ||||||||||||||||||||||||||||||
) -> UpdatesAndRequirements: | ||||||||||||||||||||||||||||||
"""Update a ref to a snapshot. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||
The updates and requirements for the set-snapshot-ref staged | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
updates = ( | ||||||||||||||||||||||||||||||
SetSnapshotRefUpdate( | ||||||||||||||||||||||||||||||
snapshot_id=snapshot_id, | ||||||||||||||||||||||||||||||
ref_name=ref_name, | ||||||||||||||||||||||||||||||
type=type, | ||||||||||||||||||||||||||||||
max_ref_age_ms=max_ref_age_ms, | ||||||||||||||||||||||||||||||
max_snapshot_age_ms=max_snapshot_age_ms, | ||||||||||||||||||||||||||||||
min_snapshots_to_keep=min_snapshots_to_keep, | ||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
requirements = ( | ||||||||||||||||||||||||||||||
AssertRefSnapshotId( | ||||||||||||||||||||||||||||||
snapshot_id=self._transaction.table_metadata.refs[ref_name].snapshot_id | ||||||||||||||||||||||||||||||
if ref_name in self._transaction.table_metadata.refs | ||||||||||||||||||||||||||||||
else None, | ||||||||||||||||||||||||||||||
ref=ref_name, | ||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
return updates, requirements | ||||||||||||||||||||||||||||||
chinmay-bhat marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots: | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
Create a new tag pointing to the given snapshot id. | ||||||||||||||||||||||||||||||
|
@@ -1987,7 +2006,7 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[i | |||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||
This for method chaining | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
update, requirement = self._transaction._set_ref_snapshot( | ||||||||||||||||||||||||||||||
update, requirement = self._set_ref_snapshot( | ||||||||||||||||||||||||||||||
snapshot_id=snapshot_id, | ||||||||||||||||||||||||||||||
ref_name=tag_name, | ||||||||||||||||||||||||||||||
type="tag", | ||||||||||||||||||||||||||||||
|
@@ -2017,7 +2036,7 @@ def create_branch( | |||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||
This for method chaining | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
update, requirement = self._transaction._set_ref_snapshot( | ||||||||||||||||||||||||||||||
update, requirement = self._set_ref_snapshot( | ||||||||||||||||||||||||||||||
snapshot_id=snapshot_id, | ||||||||||||||||||||||||||||||
ref_name=branch_name, | ||||||||||||||||||||||||||||||
type="branch", | ||||||||||||||||||||||||||||||
|
@@ -2029,6 +2048,74 @@ def create_branch( | |||||||||||||||||||||||||||||
self._requirements += requirement | ||||||||||||||||||||||||||||||
return self | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots: | ||||||||||||||||||||||||||||||
"""Rollback the table to the given snapshot id. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
The snapshot needs to be an ancestor of the current table state. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||||||
snapshot_id (int): rollback to this snapshot_id that used to be current. | ||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||
This for method chaining | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
self._commit_if_ref_updates_exist() | ||||||||||||||||||||||||||||||
if self._transaction._table.snapshot_by_id(snapshot_id) is None: | ||||||||||||||||||||||||||||||
raise ValidationError(f"Cannot roll back to unknown snapshot id: {snapshot_id}") | ||||||||||||||||||||||||||||||
if snapshot_id not in { | ||||||||||||||||||||||||||||||
ancestor.snapshot_id | ||||||||||||||||||||||||||||||
for ancestor in ancestors_of(self._transaction._table.current_snapshot(), self._transaction.table_metadata) | ||||||||||||||||||||||||||||||
}: | ||||||||||||||||||||||||||||||
raise ValidationError(f"Cannot roll back to snapshot, not an ancestor of the current state: {snapshot_id}") | ||||||||||||||||||||||||||||||
self._stage_main_branch_snapshot_ref(snapshot_id=snapshot_id) | ||||||||||||||||||||||||||||||
return self | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def rollback_to_timestamp(self, timestamp: int) -> ManageSnapshots: | ||||||||||||||||||||||||||||||
"""Rollback the table to the snapshot right before the given timestamp. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
The snapshot needs to be an ancestor of the current table state. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||||||
timestamp (int): rollback to the snapshot that used to be current right before this timestamp. | ||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||
This for method chaining | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
self._commit_if_ref_updates_exist() | ||||||||||||||||||||||||||||||
if ( | ||||||||||||||||||||||||||||||
snapshot := ancestor_right_before_timestamp( | ||||||||||||||||||||||||||||||
self._transaction._table.current_snapshot(), self._transaction.table_metadata, timestamp | ||||||||||||||||||||||||||||||
chinmay-bhat marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
) is None: | ||||||||||||||||||||||||||||||
raise ValidationError(f"Cannot roll back, no valid snapshot older than: {timestamp}") | ||||||||||||||||||||||||||||||
self._stage_main_branch_snapshot_ref(snapshot_id=snapshot.snapshot_id) | ||||||||||||||||||||||||||||||
return self | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def set_current_snapshot(self, snapshot_id: Optional[int] = None, ref_name: Optional[str] = None) -> ManageSnapshots: | ||||||||||||||||||||||||||||||
"""Set the table to a specific snapshot identified either by its id or the branch/tag its on, not both. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
The snapshot is not required to be an ancestor of the current table state. | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||||||
snapshot_id (Optional[int]): id of the snapshot to be set as current | ||||||||||||||||||||||||||||||
ref_name (Optional[str]): branch/tag where the snapshot to be set as current exists. | ||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||
This for method chaining | ||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||
self._commit_if_ref_updates_exist() | ||||||||||||||||||||||||||||||
if (not snapshot_id or ref_name) and (snapshot_id or not ref_name): | ||||||||||||||||||||||||||||||
raise ValidationError("Either snapshot_id or ref must be provided") | ||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||
if snapshot_id is None: | ||||||||||||||||||||||||||||||
if ref_name not in self._transaction.table_metadata.refs: | ||||||||||||||||||||||||||||||
raise ValidationError(f"Cannot set snapshot current to unknown ref {ref_name}") | ||||||||||||||||||||||||||||||
target_snapshot_id = self._transaction.table_metadata.refs[ref_name].snapshot_id | ||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||
target_snapshot_id = snapshot_id | ||||||||||||||||||||||||||||||
if (snapshot := self._transaction._table.snapshot_by_id(target_snapshot_id)) is None: | ||||||||||||||||||||||||||||||
raise ValidationError(f"Cannot set snapshot current with snapshot id: {snapshot_id} or ref_name: {ref_name}") | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
self._stage_main_branch_snapshot_ref(snapshot_id=snapshot.snapshot_id) | ||||||||||||||||||||||||||||||
return self | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
class UpdateSchema(UpdateTableMetadata["UpdateSchema"]): | ||||||||||||||||||||||||||||||
_schema: Schema | ||||||||||||||||||||||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.