Skip to content

Commit a1d89ec

Browse files
authored
feat: report elapsed time for processing (#1231)
1 parent 4d6ea70 commit a1d89ec

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

src/execution/live_updater.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use super::stats;
1010
use futures::future::try_join_all;
1111
use indicatif::ProgressBar;
1212
use sqlx::PgPool;
13+
use std::fmt::Write;
1314
use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior};
1415
use tokio_util::task::AbortOnDropHandle;
1516

@@ -251,7 +252,7 @@ impl SourceUpdateTask {
251252
let curr_change_stream_stats = change_stream_stats.as_ref().clone();
252253
let delta = curr_change_stream_stats.delta(&last_change_stream_stats);
253254
if delta.has_any_change() {
254-
task.report_stats(&delta, "change stream");
255+
task.report_stats(&delta, "change stream", None);
255256
last_change_stream_stats = curr_change_stream_stats;
256257
}
257258
}
@@ -303,22 +304,30 @@ impl SourceUpdateTask {
303304
Ok(())
304305
}
305306

306-
fn report_stats(&self, stats: &stats::UpdateStats, update_title: &str) {
307+
fn report_stats(
308+
&self,
309+
stats: &stats::UpdateStats,
310+
update_title: &str,
311+
start_time: Option<std::time::Instant>,
312+
) {
307313
self.source_update_stats.merge(stats);
314+
let mut message = format!(
315+
"{}.{} ({update_title}): {stats}",
316+
self.flow.flow_instance.name,
317+
self.import_op().name
318+
);
319+
if let Some(start_time) = start_time {
320+
write!(
321+
&mut message,
322+
" [processing time: {:.3}s]",
323+
start_time.elapsed().as_secs_f64()
324+
)
325+
.expect("Failed to write to message");
326+
}
308327
if self.options.print_stats {
309-
println!(
310-
"{}.{} ({update_title}): {}",
311-
self.flow.flow_instance.name,
312-
self.import_op().name,
313-
stats
314-
);
328+
println!("{message}");
315329
} else {
316-
trace!(
317-
"{}.{} ({update_title}): {}",
318-
self.flow.flow_instance.name,
319-
self.import_op().name,
320-
stats
321-
);
330+
trace!("{message}");
322331
}
323332
}
324333

@@ -328,6 +337,7 @@ impl SourceUpdateTask {
328337
update_title: &str,
329338
update_options: super::source_indexer::UpdateOptions,
330339
) -> Result<()> {
340+
let now = std::time::Instant::now();
331341
let update_stats = Arc::new(stats::UpdateStats::default());
332342

333343
// Spawn periodic stats reporting task if print_stats is enabled
@@ -403,7 +413,7 @@ impl SourceUpdateTask {
403413
}
404414

405415
// Report final stats
406-
self.report_stats(&update_stats, update_title);
416+
self.report_stats(&update_stats, update_title, Some(now));
407417
Ok(())
408418
}
409419

0 commit comments

Comments
 (0)