diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index b5bdb762f..3bff88d22 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -49,20 +49,20 @@ def setup(delete_legacy_flows): @click.argument("flow_name", type=str, required=False) @click.option( "-L", "--live", is_flag=True, show_default=True, default=False, - help="If true, it will continuously watch changes from data sources and apply to the target index.") -def update(flow_name: str | None, live: bool): + help="Continuously watch changes from data sources and apply to the target index.") +@click.option( + "-q", "--quiet", is_flag=True, show_default=True, default=False, + help="Avoid printing anything to the output, e.g. statistics.") +def update(flow_name: str | None, live: bool, quiet: bool): """ Update the index to reflect the latest data from data sources. """ async def _update_all(): async def _update_flow(fl: flow.Flow): - updater = flow.FlowLiveUpdater(fl, flow.FlowLiveUpdaterOptions(live_mode=live)) + updater = flow.FlowLiveUpdater( + fl, + flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)) await updater.wait() - print(f"Updated index for {fl.name}:") - for line in str(updater.update_stats()).split("\n"): - if line := line.strip(): - print(f" {line}") - print() await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name))) asyncio.run(_update_all()) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index ca1dddb68..8f4ee45fd 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -370,6 +370,7 @@ class FlowLiveUpdaterOptions: Options for live updating a flow. """ live_mode: bool = False + print_stats: bool = False class FlowLiveUpdater: """ diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 4b4eb020a..f54603eba 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -17,6 +17,9 @@ pub struct FlowLiveUpdaterOptions { /// If true, the updater will keep refreshing the index. /// Otherwise, it will only apply changes from the source up to the current time. pub live_mode: bool, + + /// If true, stats will be printed to the console. + pub print_stats: bool, } async fn update_source( @@ -25,18 +28,38 @@ async fn update_source( source_update_stats: Arc, source_idx: usize, pool: PgPool, - live_mode: bool, + options: FlowLiveUpdaterOptions, ) -> Result<()> { let source_context = flow_ctx .get_source_indexing_context(source_idx, &pool) .await?; + let import_op = &plan.import_ops[source_idx]; + let maybe_print_stats = |stats: &stats::UpdateStats| { + if options.print_stats { + println!( + "{}.{}: {}", + flow_ctx.flow.flow_instance.name, import_op.name, stats + ); + } else { + trace!( + "{}.{}: {}", + flow_ctx.flow.flow_instance.name, + import_op.name, + stats + ); + } + }; + let mut update_start = Instant::now(); source_context.update(&pool, &source_update_stats).await?; + maybe_print_stats(&source_update_stats); - let import_op = &plan.import_ops[source_idx]; - if let (true, Some(refresh_interval)) = (live_mode, import_op.refresh_options.refresh_interval) - { + if let (true, Some(refresh_interval)) = ( + options.live_mode, + import_op.refresh_options.refresh_interval, + ) { + let mut last_stats = source_update_stats.as_ref().clone(); loop { let elapsed = update_start.elapsed(); if elapsed < refresh_interval { @@ -44,6 +67,10 @@ async fn update_source( } update_start = Instant::now(); source_context.update(&pool, &source_update_stats).await?; + + let this_stats = source_update_stats.as_ref().clone(); + maybe_print_stats(&this_stats.delta(&last_stats)); + last_stats = this_stats; } } Ok(()) @@ -67,7 +94,7 @@ impl FlowLiveUpdater { source_update_stats.clone(), source_idx, pool.clone(), - options.live_mode, + options.clone(), )); source_update_stats }) diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index d5c48f262..d3827cade 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -3,7 +3,6 @@ use crate::prelude::*; use futures::future::try_join_all; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::Ordering::Relaxed; use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey}; use super::db_tracking_setup; @@ -101,7 +100,7 @@ impl SourceVersion { }; if should_skip { if let Some(update_stats) = update_stats { - update_stats.num_skipped.fetch_add(1, Relaxed); + update_stats.num_skipped.inc(1); } } should_skip @@ -585,12 +584,12 @@ pub async fn update_source_row( if already_exists { if output.is_some() { - update_stats.num_repreocesses.fetch_add(1, Relaxed); + update_stats.num_repreocesses.inc(1); } else { - update_stats.num_deletions.fetch_add(1, Relaxed); + update_stats.num_deletions.inc(1); } } else if output.is_some() { - update_stats.num_insertions.fetch_add(1, Relaxed); + update_stats.num_insertions.inc(1); } Ok(SkippedOr::Normal(())) diff --git a/src/execution/stats.rs b/src/execution/stats.rs index 8db36aed0..a2943068f 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -1,46 +1,94 @@ use crate::prelude::*; -use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; +use std::{ + ops::AddAssign, + sync::atomic::{AtomicI64, Ordering::Relaxed}, +}; -#[derive(Debug, Serialize, Default)] -pub struct UpdateStats { - pub num_skipped: AtomicUsize, - pub num_insertions: AtomicUsize, - pub num_deletions: AtomicUsize, - pub num_repreocesses: AtomicUsize, - pub num_errors: AtomicUsize, +#[derive(Default, Serialize)] +pub struct Counter(pub AtomicI64); + +impl Counter { + pub fn inc(&self, by: i64) { + self.0.fetch_add(by, Relaxed); + } + + pub fn get(&self) -> i64 { + self.0.load(Relaxed) + } + + pub fn delta(&self, base: &Self) -> Counter { + Counter(AtomicI64::new(self.get() - base.get())) + } + + pub fn into_inner(self) -> i64 { + self.0.into_inner() + } +} + +impl AddAssign for Counter { + fn add_assign(&mut self, rhs: Self) { + self.0.fetch_add(rhs.into_inner(), Relaxed); + } } -impl Clone for UpdateStats { +impl Clone for Counter { fn clone(&self) -> Self { - Self { - num_skipped: self.num_skipped.load(Relaxed).into(), - num_insertions: self.num_insertions.load(Relaxed).into(), - num_deletions: self.num_deletions.load(Relaxed).into(), - num_repreocesses: self.num_repreocesses.load(Relaxed).into(), - num_errors: self.num_errors.load(Relaxed).into(), + Self(AtomicI64::new(self.get())) + } +} + +impl std::fmt::Display for Counter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.get()) + } +} + +impl std::fmt::Debug for Counter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.get()) + } +} + +#[derive(Debug, Serialize, Default, Clone)] +pub struct UpdateStats { + pub num_skipped: Counter, + pub num_insertions: Counter, + pub num_deletions: Counter, + pub num_repreocesses: Counter, + pub num_errors: Counter, +} + +impl UpdateStats { + pub fn delta(&self, base: &Self) -> Self { + UpdateStats { + num_skipped: self.num_skipped.delta(&base.num_skipped), + num_insertions: self.num_insertions.delta(&base.num_insertions), + num_deletions: self.num_deletions.delta(&base.num_deletions), + num_repreocesses: self.num_repreocesses.delta(&base.num_repreocesses), + num_errors: self.num_errors.delta(&base.num_errors), } } } impl std::fmt::Display for UpdateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let num_skipped = self.num_skipped.load(Relaxed); + let num_skipped = self.num_skipped.get(); if num_skipped > 0 { write!(f, "{} rows skipped", num_skipped)?; } - let num_insertions = self.num_insertions.load(Relaxed); - let num_deletions = self.num_deletions.load(Relaxed); - let num_reprocesses = self.num_repreocesses.load(Relaxed); + let num_insertions = self.num_insertions.get(); + let num_deletions = self.num_deletions.get(); + let num_reprocesses = self.num_repreocesses.get(); let num_source_rows = num_insertions + num_deletions + num_reprocesses; if num_source_rows > 0 { if num_skipped > 0 { - write!(f, ", ")?; + write!(f, "; ")?; } write!(f, "{num_source_rows} source rows processed",)?; - let num_errors = self.num_errors.load(Relaxed); + let num_errors = self.num_errors.get(); if num_errors > 0 { write!(f, " with {num_errors} ERRORS",)?; } diff --git a/src/service/flows.rs b/src/service/flows.rs index 1179c582f..b65398af9 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -174,7 +174,10 @@ pub async fn update( let mut live_updater = execution::FlowLiveUpdater::start( flow_ctx.clone(), &lib_context.pool, - execution::FlowLiveUpdaterOptions { live_mode: false }, + execution::FlowLiveUpdaterOptions { + live_mode: false, + ..Default::default() + }, ) .await?; live_updater.wait().await?;