Skip to content

Commit 9c72aa6

Browse files
committed
feat: improve stats display with indicatif
1 parent 7a06c88 commit 9c72aa6

File tree

3 files changed

+70
-11
lines changed

3 files changed

+70
-11
lines changed

Cargo.lock

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ blake2 = "0.10.6"
6565
pgvector = { version = "0.4.1", features = ["sqlx", "halfvec"] }
6666
phf = { version = "0.12.1", features = ["macros"] }
6767
indenter = "0.3.4"
68+
indicatif = "0.17.9"
6869
itertools = "0.14.0"
6970
derivative = "2.2.0"
7071
hex = "0.4.3"

src/execution/live_updater.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88

99
use super::stats;
1010
use futures::future::try_join_all;
11+
use indicatif::ProgressBar;
1112
use sqlx::PgPool;
1213
use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior};
1314

@@ -331,34 +332,41 @@ impl SourceUpdateTask {
331332
let update_stats = Arc::new(stats::UpdateStats::default());
332333

333334
// Spawn periodic stats reporting task if print_stats is enabled
334-
let reporting_handle = if self.options.print_stats {
335+
let (reporting_handle, progress_bar) = if self.options.print_stats {
335336
let update_stats_clone = update_stats.clone();
336337
let update_title_owned = update_title.to_string();
337338
let flow_name = self.flow.flow_instance.name.clone();
338339
let import_op_name = self.import_op().name.clone();
339340

341+
// Create a progress bar that will overwrite the same line
342+
let pb = ProgressBar::new_spinner();
343+
pb.set_style(
344+
indicatif::ProgressStyle::default_spinner()
345+
.template("{msg}")
346+
.unwrap()
347+
);
348+
let pb_clone = pb.clone();
349+
340350
let report_task = async move {
341351
let mut interval = tokio::time::interval(REPORT_INTERVAL);
342-
let mut last_stats = update_stats_clone.as_ref().clone();
343352
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
344353
interval.tick().await; // Skip first tick
354+
345355
loop {
346356
interval.tick().await;
347357
let current_stats = update_stats_clone.as_ref().clone();
348-
let delta = current_stats.delta(&last_stats);
349-
if delta.has_any_change() {
350-
// Print periodic progress (do NOT merge here, final report_stats will merge)
351-
println!(
358+
if current_stats.has_any_change() {
359+
// Show cumulative stats (always show latest total, not delta)
360+
pb_clone.set_message(format!(
352361
"{}.{} ({update_title_owned}): {}",
353-
flow_name, import_op_name, delta
354-
);
355-
last_stats = current_stats;
362+
flow_name, import_op_name, current_stats
363+
));
356364
}
357365
}
358366
};
359-
Some(tokio::spawn(report_task))
367+
(Some(tokio::spawn(report_task)), Some(pb))
360368
} else {
361-
None
369+
(None, None)
362370
};
363371

364372
// Run the actual update
@@ -378,6 +386,11 @@ impl SourceUpdateTask {
378386
handle.abort();
379387
}
380388

389+
// Clear the progress bar to ensure final stats appear on a new line
390+
if let Some(pb) = progress_bar {
391+
pb.finish_and_clear();
392+
}
393+
381394
// Check update result
382395
update_result?;
383396

0 commit comments

Comments
 (0)