Skip to content

Commit 81f4980

Browse files
committed
Show stats after each round of update by default.
1 parent 7196cb2 commit 81f4980

File tree

6 files changed

+118
-40
lines changed

6 files changed

+118
-40
lines changed

python/cocoindex/cli.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ def setup(delete_legacy_flows):
4949
@click.argument("flow_name", type=str, required=False)
5050
@click.option(
5151
"-L", "--live", is_flag=True, show_default=True, default=False,
52-
help="If true, it will continuously watch changes from data sources and apply to the target index.")
53-
def update(flow_name: str | None, live: bool):
52+
help="Continuously watch changes from data sources and apply to the target index.")
53+
@click.option(
54+
"-q", "--quiet", is_flag=True, show_default=True, default=False,
55+
help="Avoid printing anything to the output, e.g. statistics.")
56+
def update(flow_name: str | None, live: bool, quiet: bool):
5457
"""
5558
Update the index to reflect the latest data from data sources.
5659
"""
5760
async def _update_all():
5861
async def _update_flow(fl: flow.Flow):
59-
updater = flow.FlowLiveUpdater(fl, flow.FlowLiveUpdaterOptions(live_mode=live))
62+
updater = flow.FlowLiveUpdater(
63+
fl,
64+
flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet))
6065
await updater.wait()
61-
print(f"Updated index for {fl.name}:")
62-
for line in str(updater.update_stats()).split("\n"):
63-
if line := line.strip():
64-
print(f" {line}")
65-
print()
6666
await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name)))
6767
asyncio.run(_update_all())
6868

python/cocoindex/flow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ class FlowLiveUpdaterOptions:
370370
Options for live updating a flow.
371371
"""
372372
live_mode: bool = False
373+
print_stats: bool = False
373374

374375
class FlowLiveUpdater:
375376
"""

src/execution/live_updater.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ pub struct FlowLiveUpdaterOptions {
1717
/// If true, the updater will keep refreshing the index.
1818
/// Otherwise, it will only apply changes from the source up to the current time.
1919
pub live_mode: bool,
20+
21+
/// If true, stats will be printed to the console.
22+
pub print_stats: bool,
2023
}
2124

2225
async fn update_source(
@@ -25,25 +28,49 @@ async fn update_source(
2528
source_update_stats: Arc<stats::UpdateStats>,
2629
source_idx: usize,
2730
pool: PgPool,
28-
live_mode: bool,
31+
options: FlowLiveUpdaterOptions,
2932
) -> Result<()> {
3033
let source_context = flow_ctx
3134
.get_source_indexing_context(source_idx, &pool)
3235
.await?;
3336

37+
let import_op = &plan.import_ops[source_idx];
38+
let maybe_print_stats = |stats: &stats::UpdateStats| {
39+
if options.print_stats {
40+
println!(
41+
"{}.{}: {}",
42+
flow_ctx.flow.flow_instance.name, import_op.name, stats
43+
);
44+
} else {
45+
trace!(
46+
"{}.{}: {}",
47+
flow_ctx.flow.flow_instance.name,
48+
import_op.name,
49+
stats
50+
);
51+
}
52+
};
53+
3454
let mut update_start = Instant::now();
3555
source_context.update(&pool, &source_update_stats).await?;
56+
maybe_print_stats(&source_update_stats);
3657

37-
let import_op = &plan.import_ops[source_idx];
38-
if let (true, Some(refresh_interval)) = (live_mode, import_op.refresh_options.refresh_interval)
39-
{
58+
if let (true, Some(refresh_interval)) = (
59+
options.live_mode,
60+
import_op.refresh_options.refresh_interval,
61+
) {
62+
let mut last_stats = source_update_stats.as_ref().clone();
4063
loop {
4164
let elapsed = update_start.elapsed();
4265
if elapsed < refresh_interval {
4366
tokio::time::sleep(refresh_interval - elapsed).await;
4467
}
4568
update_start = Instant::now();
4669
source_context.update(&pool, &source_update_stats).await?;
70+
71+
let this_stats = source_update_stats.as_ref().clone();
72+
maybe_print_stats(&this_stats.delta(&last_stats));
73+
last_stats = this_stats;
4774
}
4875
}
4976
Ok(())
@@ -67,7 +94,7 @@ impl FlowLiveUpdater {
6794
source_update_stats.clone(),
6895
source_idx,
6996
pool.clone(),
70-
options.live_mode,
97+
options.clone(),
7198
));
7299
source_update_stats
73100
})

src/execution/row_indexer.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::prelude::*;
33
use futures::future::try_join_all;
44
use sqlx::PgPool;
55
use std::collections::{HashMap, HashSet};
6-
use std::sync::atomic::Ordering::Relaxed;
76

87
use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey};
98
use super::db_tracking_setup;
@@ -101,7 +100,7 @@ impl SourceVersion {
101100
};
102101
if should_skip {
103102
if let Some(update_stats) = update_stats {
104-
update_stats.num_skipped.fetch_add(1, Relaxed);
103+
update_stats.num_skipped.inc(1);
105104
}
106105
}
107106
should_skip
@@ -585,12 +584,12 @@ pub async fn update_source_row(
585584

586585
if already_exists {
587586
if output.is_some() {
588-
update_stats.num_repreocesses.fetch_add(1, Relaxed);
587+
update_stats.num_repreocesses.inc(1);
589588
} else {
590-
update_stats.num_deletions.fetch_add(1, Relaxed);
589+
update_stats.num_deletions.inc(1);
591590
}
592591
} else if output.is_some() {
593-
update_stats.num_insertions.fetch_add(1, Relaxed);
592+
update_stats.num_insertions.inc(1);
594593
}
595594

596595
Ok(SkippedOr::Normal(()))

src/execution/stats.rs

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,94 @@
11
use crate::prelude::*;
22

3-
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
3+
use std::{
4+
ops::AddAssign,
5+
sync::atomic::{AtomicI64, Ordering::Relaxed},
6+
};
47

5-
#[derive(Debug, Serialize, Default)]
6-
pub struct UpdateStats {
7-
pub num_skipped: AtomicUsize,
8-
pub num_insertions: AtomicUsize,
9-
pub num_deletions: AtomicUsize,
10-
pub num_repreocesses: AtomicUsize,
11-
pub num_errors: AtomicUsize,
8+
#[derive(Default, Serialize)]
9+
pub struct Counter(pub AtomicI64);
10+
11+
impl Counter {
12+
pub fn inc(&self, by: i64) {
13+
self.0.fetch_add(by, Relaxed);
14+
}
15+
16+
pub fn get(&self) -> i64 {
17+
self.0.load(Relaxed)
18+
}
19+
20+
pub fn delta(&self, base: &Self) -> Counter {
21+
Counter(AtomicI64::new(self.get() - base.get()))
22+
}
23+
24+
pub fn into_inner(self) -> i64 {
25+
self.0.into_inner()
26+
}
27+
}
28+
29+
impl AddAssign for Counter {
30+
fn add_assign(&mut self, rhs: Self) {
31+
self.0.fetch_add(rhs.into_inner(), Relaxed);
32+
}
1233
}
1334

14-
impl Clone for UpdateStats {
35+
impl Clone for Counter {
1536
fn clone(&self) -> Self {
16-
Self {
17-
num_skipped: self.num_skipped.load(Relaxed).into(),
18-
num_insertions: self.num_insertions.load(Relaxed).into(),
19-
num_deletions: self.num_deletions.load(Relaxed).into(),
20-
num_repreocesses: self.num_repreocesses.load(Relaxed).into(),
21-
num_errors: self.num_errors.load(Relaxed).into(),
37+
Self(AtomicI64::new(self.get()))
38+
}
39+
}
40+
41+
impl std::fmt::Display for Counter {
42+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43+
write!(f, "{}", self.get())
44+
}
45+
}
46+
47+
impl std::fmt::Debug for Counter {
48+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49+
write!(f, "{}", self.get())
50+
}
51+
}
52+
53+
#[derive(Debug, Serialize, Default, Clone)]
54+
pub struct UpdateStats {
55+
pub num_skipped: Counter,
56+
pub num_insertions: Counter,
57+
pub num_deletions: Counter,
58+
pub num_repreocesses: Counter,
59+
pub num_errors: Counter,
60+
}
61+
62+
impl UpdateStats {
63+
pub fn delta(&self, base: &Self) -> Self {
64+
UpdateStats {
65+
num_skipped: self.num_skipped.delta(&base.num_skipped),
66+
num_insertions: self.num_insertions.delta(&base.num_insertions),
67+
num_deletions: self.num_deletions.delta(&base.num_deletions),
68+
num_repreocesses: self.num_repreocesses.delta(&base.num_repreocesses),
69+
num_errors: self.num_errors.delta(&base.num_errors),
2270
}
2371
}
2472
}
2573

2674
impl std::fmt::Display for UpdateStats {
2775
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28-
let num_skipped = self.num_skipped.load(Relaxed);
76+
let num_skipped = self.num_skipped.get();
2977
if num_skipped > 0 {
3078
write!(f, "{} rows skipped", num_skipped)?;
3179
}
3280

33-
let num_insertions = self.num_insertions.load(Relaxed);
34-
let num_deletions = self.num_deletions.load(Relaxed);
35-
let num_reprocesses = self.num_repreocesses.load(Relaxed);
81+
let num_insertions = self.num_insertions.get();
82+
let num_deletions = self.num_deletions.get();
83+
let num_reprocesses = self.num_repreocesses.get();
3684
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
3785
if num_source_rows > 0 {
3886
if num_skipped > 0 {
39-
write!(f, ", ")?;
87+
write!(f, "; ")?;
4088
}
4189
write!(f, "{num_source_rows} source rows processed",)?;
4290

43-
let num_errors = self.num_errors.load(Relaxed);
91+
let num_errors = self.num_errors.get();
4492
if num_errors > 0 {
4593
write!(f, " with {num_errors} ERRORS",)?;
4694
}

src/service/flows.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,10 @@ pub async fn update(
174174
let mut live_updater = execution::FlowLiveUpdater::start(
175175
flow_ctx.clone(),
176176
&lib_context.pool,
177-
execution::FlowLiveUpdaterOptions { live_mode: false },
177+
execution::FlowLiveUpdaterOptions {
178+
live_mode: false,
179+
..Default::default()
180+
},
178181
)
179182
.await?;
180183
live_updater.wait().await?;

0 commit comments

Comments
 (0)