Skip to content

Commit d925438

Browse files
authored
feat(stats): polish wording for stats and add a flow change bucket (#426)
1 parent 51b7a3a commit d925438

File tree

2 files changed

+31
-17
lines changed

2 files changed

+31
-17
lines changed

src/execution/row_indexer.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl SourceVersion {
100100
};
101101
if should_skip {
102102
if let Some(update_stats) = update_stats {
103-
update_stats.num_skipped.inc(1);
103+
update_stats.num_no_change.inc(1);
104104
}
105105
}
106106
should_skip
@@ -491,8 +491,7 @@ pub async fn update_source_row(
491491
pool,
492492
)
493493
.await?;
494-
let already_exists = existing_tracking_info.is_some();
495-
let memoization_info = match existing_tracking_info {
494+
let (memoization_info, existing_version) = match existing_tracking_info {
496495
Some(info) => {
497496
let existing_version = SourceVersion::from_stored(
498497
info.processed_source_ordinal,
@@ -502,7 +501,10 @@ pub async fn update_source_row(
502501
if existing_version.should_skip(source_version, Some(update_stats)) {
503502
return Ok(SkippedOr::Skipped(existing_version));
504503
}
505-
info.memoization_info.and_then(|info| info.0)
504+
(
505+
info.memoization_info.and_then(|info| info.0),
506+
Some(existing_version),
507+
)
506508
}
507509
None => Default::default(),
508510
};
@@ -592,9 +594,15 @@ pub async fn update_source_row(
592594
)
593595
.await?;
594596

595-
if already_exists {
597+
if let Some(existing_version) = existing_version {
596598
if output.is_some() {
597-
update_stats.num_repreocesses.inc(1);
599+
if source_version.ordinal.is_none()
600+
|| source_version.ordinal != existing_version.ordinal
601+
{
602+
update_stats.num_updates.inc(1);
603+
} else {
604+
update_stats.num_reprocesses.inc(1);
605+
}
598606
} else {
599607
update_stats.num_deletions.inc(1);
600608
}

src/execution/stats.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,29 +52,34 @@ impl std::fmt::Debug for Counter {
5252

5353
#[derive(Debug, Serialize, Default, Clone)]
5454
pub struct UpdateStats {
55-
pub num_skipped: Counter,
55+
pub num_no_change: Counter,
5656
pub num_insertions: Counter,
5757
pub num_deletions: Counter,
58-
pub num_repreocesses: Counter,
58+
/// Number of source rows that were updated.
59+
pub num_updates: Counter,
60+
/// Number of source rows that were reprocessed because of logic change.
61+
pub num_reprocesses: Counter,
5962
pub num_errors: Counter,
6063
}
6164

6265
impl UpdateStats {
6366
pub fn delta(&self, base: &Self) -> Self {
6467
UpdateStats {
65-
num_skipped: self.num_skipped.delta(&base.num_skipped),
68+
num_no_change: self.num_no_change.delta(&base.num_no_change),
6669
num_insertions: self.num_insertions.delta(&base.num_insertions),
6770
num_deletions: self.num_deletions.delta(&base.num_deletions),
68-
num_repreocesses: self.num_repreocesses.delta(&base.num_repreocesses),
71+
num_updates: self.num_updates.delta(&base.num_updates),
72+
num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses),
6973
num_errors: self.num_errors.delta(&base.num_errors),
7074
}
7175
}
7276

7377
pub fn is_zero(&self) -> bool {
74-
self.num_skipped.get() == 0
78+
self.num_no_change.get() == 0
7579
&& self.num_insertions.get() == 0
7680
&& self.num_deletions.get() == 0
77-
&& self.num_repreocesses.get() == 0
81+
&& self.num_updates.get() == 0
82+
&& self.num_reprocesses.get() == 0
7883
&& self.num_errors.get() == 0
7984
}
8085
}
@@ -87,18 +92,19 @@ impl std::fmt::Display for UpdateStats {
8792
messages.push(format!("{num_errors} source rows FAILED"));
8893
}
8994

90-
let num_skipped = self.num_skipped.get();
95+
let num_skipped = self.num_no_change.get();
9196
if num_skipped > 0 {
92-
messages.push(format!("{} source rows SKIPPED", num_skipped));
97+
messages.push(format!("{} source rows NO CHANGE", num_skipped));
9398
}
9499

95100
let num_insertions = self.num_insertions.get();
96101
let num_deletions = self.num_deletions.get();
97-
let num_reprocesses = self.num_repreocesses.get();
98-
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
102+
let num_updates = self.num_updates.get();
103+
let num_reprocesses = self.num_reprocesses.get();
104+
let num_source_rows = num_insertions + num_deletions + num_updates + num_reprocesses;
99105
if num_source_rows > 0 {
100106
messages.push(format!(
101-
"{num_source_rows} source rows processed: {num_insertions} ADDED, {num_deletions} REMOVED, {num_reprocesses} REPROCESSED",
107+
"{num_source_rows} source rows processed ({num_insertions} ADDED, {num_deletions} REMOVED, {num_updates} UPDATED, {num_reprocesses} REPROCESSED on flow change)",
102108
));
103109
}
104110

0 commit comments

Comments
 (0)