Skip to content

Commit 16317ea

Browse files
HawaiianSporkLiam Brannigan
authored andcommitted
feat: return metrics about the commit
Post commit now contains the number of retries it took to do the commit and FinalizeCommit now contains number of retries, whether there was a checkpoint created and number of log files cleaned-up. Signed-off-by: Michael Maletich <[email protected]>
1 parent 3f2e4b5 commit 16317ea

File tree

1 file changed

+66
-12
lines changed
  • crates/core/src/operations/transaction

1 file changed

+66
-12
lines changed

crates/core/src/operations/transaction/mod.rs

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ use crate::table::config::TableConfig;
9797
use crate::table::state::DeltaTableState;
9898
use crate::{crate_version, DeltaResult};
9999
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};
100-
100+
use serde::{Deserialize, Serialize};
101101
pub use self::conflict_checker::CommitConflictError;
102102
pub use self::protocol::INSTANCE as PROTOCOL;
103103

@@ -113,6 +113,36 @@ mod state;
113113
const DELTA_LOG_FOLDER: &str = "_delta_log";
114114
pub(crate) const DEFAULT_RETRIES: usize = 15;
115115

116+
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
117+
#[serde(rename_all = "camelCase")]
118+
pub struct CommitMetrics {
119+
/// Number of retries before a successful commit
120+
pub num_retries: u64,
121+
}
122+
123+
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
124+
#[serde(rename_all = "camelCase")]
125+
pub struct PostCommitMetrics {
126+
/// Whether a new checkpoint was created as part of this commit
127+
pub new_checkpoint_created: bool,
128+
129+
/// Number of log files cleaned up
130+
pub num_log_files_cleaned_up: u64,
131+
}
132+
133+
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
134+
#[serde(rename_all = "camelCase")]
135+
pub struct Metrics {
136+
/// Number of retries before a successful commit
137+
pub num_retries: u64,
138+
139+
/// Whether a new checkpoint was created as part of this commit
140+
pub new_checkpoint_created: bool,
141+
142+
/// Number of log files cleaned up
143+
pub num_log_files_cleaned_up: u64,
144+
}
145+
116146
/// Error raised while commititng transaction
117147
#[derive(thiserror::Error, Debug)]
118148
pub enum TransactionError {
@@ -598,6 +628,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
598628
log_store: this.log_store,
599629
table_data: this.table_data,
600630
custom_execute_handler: this.post_commit_hook_handler,
631+
metrics: CommitMetrics { num_retries: 0 },
601632
});
602633
}
603634

@@ -675,6 +706,9 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
675706
log_store: this.log_store,
676707
table_data: this.table_data,
677708
custom_execute_handler: this.post_commit_hook_handler,
709+
metrics: CommitMetrics {
710+
num_retries: attempt_number as u64 - 1,
711+
},
678712
});
679713
}
680714
Err(TransactionError::VersionAlreadyExists(version)) => {
@@ -708,11 +742,12 @@ pub struct PostCommit<'a> {
708742
log_store: LogStoreRef,
709743
table_data: Option<&'a dyn TableReference>,
710744
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
745+
metrics: CommitMetrics,
711746
}
712747

713748
impl PostCommit<'_> {
714749
/// Runs the post commit activities
715-
async fn run_post_commit_hook(&self) -> DeltaResult<DeltaTableState> {
750+
async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
716751
if let Some(table) = self.table_data {
717752
let post_commit_operation_id = Uuid::new_v4();
718753
let mut snapshot = table.eager_snapshot().clone();
@@ -745,26 +780,29 @@ impl PostCommit<'_> {
745780
.await?
746781
}
747782

783+
let mut new_checkpoint_created = false;
748784
if self.create_checkpoint {
749785
// Execute create checkpoint hook
750-
self.create_checkpoint(
786+
new_checkpoint_created = self.create_checkpoint(
751787
&state,
752788
&self.log_store,
753789
self.version,
754790
post_commit_operation_id,
755791
)
756792
.await?;
757793
}
794+
795+
let mut num_log_files_cleaned_up : u64 = 0;
758796
if cleanup_logs {
759797
// Execute clean up logs hook
760-
cleanup_expired_logs_for(
798+
num_log_files_cleaned_up = cleanup_expired_logs_for(
761799
self.version,
762800
self.log_store.as_ref(),
763801
Utc::now().timestamp_millis()
764802
- state.table_config().log_retention_duration().as_millis() as i64,
765803
Some(post_commit_operation_id),
766804
)
767-
.await?;
805+
.await? as u64;
768806
}
769807

770808
// Run arbitrary after_post_commit_hook code
@@ -777,7 +815,10 @@ impl PostCommit<'_> {
777815
)
778816
.await?
779817
}
780-
Ok(state)
818+
Ok((state, PostCommitMetrics {
819+
new_checkpoint_created,
820+
num_log_files_cleaned_up,
821+
}))
781822
} else {
782823
let state = DeltaTableState::try_new(
783824
&Path::default(),
@@ -786,7 +827,10 @@ impl PostCommit<'_> {
786827
Some(self.version),
787828
)
788829
.await?;
789-
Ok(state)
830+
Ok((state, PostCommitMetrics {
831+
new_checkpoint_created: false,
832+
num_log_files_cleaned_up: 0,
833+
}))
790834
}
791835
}
792836
async fn create_checkpoint(
@@ -795,18 +839,20 @@ impl PostCommit<'_> {
795839
log_store: &LogStoreRef,
796840
version: i64,
797841
operation_id: Uuid,
798-
) -> DeltaResult<()> {
842+
) -> DeltaResult<bool> {
799843
if !table_state.load_config().require_files {
800844
warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files.");
801-
return Ok(());
845+
return Ok(false);
802846
}
803847

804848
let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
805849
if ((version + 1) % checkpoint_interval) == 0 {
806850
create_checkpoint_for(version, table_state, log_store.as_ref(), Some(operation_id))
807-
.await?
851+
.await?;
852+
Ok(true)
853+
} else {
854+
Ok(false)
808855
}
809-
Ok(())
810856
}
811857
}
812858

@@ -817,6 +863,9 @@ pub struct FinalizedCommit {
817863

818864
/// Version of the finalized commit
819865
pub version: i64,
866+
867+
/// Metrics associated with the commit operation
868+
pub metrics: Metrics,
820869
}
821870

822871
impl FinalizedCommit {
@@ -839,9 +888,14 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> {
839888

840889
Box::pin(async move {
841890
match this.run_post_commit_hook().await {
842-
Ok(snapshot) => Ok(FinalizedCommit {
891+
Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
843892
snapshot,
844893
version: this.version,
894+
metrics: Metrics {
895+
num_retries: this.metrics.num_retries,
896+
new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
897+
num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
898+
},
845899
}),
846900
Err(err) => Err(err),
847901
}

0 commit comments

Comments
 (0)