Skip to content

Commit 39d187d

Browse files
ion-elgrecoLiam Brannigan
authored andcommitted
fix: advance state after conflict check
Signed-off-by: Ion Koutsouris <[email protected]> Signed-off-by: Liam Brannigan <[email protected]>
1 parent dcf41fa commit 39d187d

File tree

1 file changed

+16
-11
lines changed
  • crates/core/src/operations/transaction

1 file changed

+16
-11
lines changed

crates/core/src/operations/transaction/mod.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
//! └───────────────────────────────┘
7575
//!</pre>
7676
use std::collections::HashMap;
77+
use std::future::Future;
7778
use std::sync::Arc;
7879

7980
use bytes::Bytes;
@@ -607,7 +608,7 @@ impl PreparedCommit<'_> {
607608
}
608609

609610
impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
610-
type Output = DeltaResult<PostCommit<'a>>;
611+
type Output = DeltaResult<PostCommit>;
611612
type IntoFuture = BoxFuture<'a, Self::Output>;
612613

613614
fn into_future(self) -> Self::IntoFuture {
@@ -626,14 +627,14 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
626627
create_checkpoint: false,
627628
cleanup_expired_logs: None,
628629
log_store: this.log_store,
629-
table_data: this.table_data,
630+
table_data: None,
630631
custom_execute_handler: this.post_commit_hook_handler,
631632
metrics: CommitMetrics { num_retries: 0 },
632633
});
633634
}
634635

635636
// unwrap() is safe here due to the above check
636-
let read_snapshot = this.table_data.unwrap().eager_snapshot();
637+
let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();
637638

638639
let mut attempt_number = 1;
639640
let total_retries = this.max_retries + 1;
@@ -664,7 +665,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
664665
)
665666
.await?;
666667
let transaction_info = TransactionInfo::try_new(
667-
read_snapshot,
668+
&read_snapshot,
668669
this.data.operation.read_predicate(),
669670
&this.data.actions,
670671
this.data.operation.read_whole_table(),
@@ -683,6 +684,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
683684
}
684685
steps -= 1;
685686
}
687+
// Update snapshot to latest version after succesful conflict check
688+
read_snapshot
689+
.update(this.log_store.clone(), Some(latest_version))
690+
.await?;
686691
}
687692
let version: i64 = latest_version + 1;
688693

@@ -704,7 +709,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
704709
.map(|v| v.cleanup_expired_logs)
705710
.unwrap_or_default(),
706711
log_store: this.log_store,
707-
table_data: this.table_data,
712+
table_data: Some(Box::new(read_snapshot)),
708713
custom_execute_handler: this.post_commit_hook_handler,
709714
metrics: CommitMetrics {
710715
num_retries: attempt_number as u64 - 1,
@@ -732,23 +737,23 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
732737
}
733738

734739
/// Represents items for the post commit hook
735-
pub struct PostCommit<'a> {
740+
pub struct PostCommit {
736741
/// The winning version number of the commit
737742
pub version: i64,
738743
/// The data that was committed to the log store
739744
pub data: CommitData,
740745
create_checkpoint: bool,
741746
cleanup_expired_logs: Option<bool>,
742747
log_store: LogStoreRef,
743-
table_data: Option<&'a dyn TableReference>,
748+
table_data: Option<Box<dyn TableReference>>,
744749
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
745750
metrics: CommitMetrics,
746751
}
747752

748-
impl PostCommit<'_> {
753+
impl PostCommit {
749754
/// Runs the post commit activities
750755
async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
751-
if let Some(table) = self.table_data {
756+
if let Some(table) = &self.table_data {
752757
let post_commit_operation_id = Uuid::new_v4();
753758
let mut snapshot = table.eager_snapshot().clone();
754759
if self.version - snapshot.version() > 1 {
@@ -886,9 +891,9 @@ impl FinalizedCommit {
886891
}
887892
}
888893

889-
impl<'a> std::future::IntoFuture for PostCommit<'a> {
894+
impl std::future::IntoFuture for PostCommit {
890895
type Output = DeltaResult<FinalizedCommit>;
891-
type IntoFuture = BoxFuture<'a, Self::Output>;
896+
type IntoFuture = BoxFuture<'static, Self::Output>;
892897

893898
fn into_future(self) -> Self::IntoFuture {
894899
let this = self;

0 commit comments

Comments
 (0)