Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::stats;
use futures::future::try_join_all;
use indicatif::ProgressBar;
use sqlx::PgPool;
use std::fmt::Write;
use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior};
use tokio_util::task::AbortOnDropHandle;

Expand Down Expand Up @@ -251,7 +252,7 @@ impl SourceUpdateTask {
let curr_change_stream_stats = change_stream_stats.as_ref().clone();
let delta = curr_change_stream_stats.delta(&last_change_stream_stats);
if delta.has_any_change() {
task.report_stats(&delta, "change stream");
task.report_stats(&delta, "change stream", None);
last_change_stream_stats = curr_change_stream_stats;
}
}
Expand Down Expand Up @@ -303,22 +304,30 @@ impl SourceUpdateTask {
Ok(())
}

fn report_stats(&self, stats: &stats::UpdateStats, update_title: &str) {
fn report_stats(
&self,
stats: &stats::UpdateStats,
update_title: &str,
start_time: Option<std::time::Instant>,
) {
self.source_update_stats.merge(stats);
let mut message = format!(
"{}.{} ({update_title}): {stats}",
self.flow.flow_instance.name,
self.import_op().name
);
if let Some(start_time) = start_time {
write!(
&mut message,
" [processing time: {:.3}s]",
start_time.elapsed().as_secs_f64()
)
.expect("Failed to write to message");
}
if self.options.print_stats {
println!(
"{}.{} ({update_title}): {}",
self.flow.flow_instance.name,
self.import_op().name,
stats
);
println!("{message}");
} else {
trace!(
"{}.{} ({update_title}): {}",
self.flow.flow_instance.name,
self.import_op().name,
stats
);
trace!("{message}");
}
}

Expand All @@ -328,6 +337,7 @@ impl SourceUpdateTask {
update_title: &str,
update_options: super::source_indexer::UpdateOptions,
) -> Result<()> {
let now = std::time::Instant::now();
let update_stats = Arc::new(stats::UpdateStats::default());

// Spawn periodic stats reporting task if print_stats is enabled
Expand Down Expand Up @@ -403,7 +413,7 @@ impl SourceUpdateTask {
}

// Report final stats
self.report_stats(&update_stats, update_title);
self.report_stats(&update_stats, update_title, Some(now));
Ok(())
}

Expand Down
Loading