Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ def setup(delete_legacy_flows):
@click.argument("flow_name", type=str, required=False)
@click.option(
"-L", "--live", is_flag=True, show_default=True, default=False,
help="If true, it will continuously watch changes from data sources and apply to the target index.")
def update(flow_name: str | None, live: bool):
help="Continuously watch changes from data sources and apply to the target index.")
@click.option(
"-q", "--quiet", is_flag=True, show_default=True, default=False,
help="Avoid printing anything to the output, e.g. statistics.")
def update(flow_name: str | None, live: bool, quiet: bool):
"""
Update the index to reflect the latest data from data sources.
"""
async def _update_all():
async def _update_flow(fl: flow.Flow):
updater = flow.FlowLiveUpdater(fl, flow.FlowLiveUpdaterOptions(live_mode=live))
updater = flow.FlowLiveUpdater(
fl,
flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet))
await updater.wait()
print(f"Updated index for {fl.name}:")
for line in str(updater.update_stats()).split("\n"):
if line := line.strip():
print(f" {line}")
print()
await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name)))
asyncio.run(_update_all())

Expand Down
1 change: 1 addition & 0 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class FlowLiveUpdaterOptions:
Options for live updating a flow.
"""
live_mode: bool = False
print_stats: bool = False

class FlowLiveUpdater:
"""
Expand Down
37 changes: 32 additions & 5 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub struct FlowLiveUpdaterOptions {
/// If true, the updater will keep refreshing the index.
/// Otherwise, it will only apply changes from the source up to the current time.
pub live_mode: bool,

/// If true, stats will be printed to the console.
pub print_stats: bool,
}

async fn update_source(
Expand All @@ -25,25 +28,49 @@ async fn update_source(
source_update_stats: Arc<stats::UpdateStats>,
source_idx: usize,
pool: PgPool,
live_mode: bool,
options: FlowLiveUpdaterOptions,
) -> Result<()> {
let source_context = flow_ctx
.get_source_indexing_context(source_idx, &pool)
.await?;

let import_op = &plan.import_ops[source_idx];
let maybe_print_stats = |stats: &stats::UpdateStats| {
if options.print_stats {
println!(
"{}.{}: {}",
flow_ctx.flow.flow_instance.name, import_op.name, stats
);
} else {
trace!(
"{}.{}: {}",
flow_ctx.flow.flow_instance.name,
import_op.name,
stats
);
}
};

let mut update_start = Instant::now();
source_context.update(&pool, &source_update_stats).await?;
maybe_print_stats(&source_update_stats);

let import_op = &plan.import_ops[source_idx];
if let (true, Some(refresh_interval)) = (live_mode, import_op.refresh_options.refresh_interval)
{
if let (true, Some(refresh_interval)) = (
options.live_mode,
import_op.refresh_options.refresh_interval,
) {
let mut last_stats = source_update_stats.as_ref().clone();
loop {
let elapsed = update_start.elapsed();
if elapsed < refresh_interval {
tokio::time::sleep(refresh_interval - elapsed).await;
}
update_start = Instant::now();
source_context.update(&pool, &source_update_stats).await?;

let this_stats = source_update_stats.as_ref().clone();
maybe_print_stats(&this_stats.delta(&last_stats));
last_stats = this_stats;
}
}
Ok(())
Expand All @@ -67,7 +94,7 @@ impl FlowLiveUpdater {
source_update_stats.clone(),
source_idx,
pool.clone(),
options.live_mode,
options.clone(),
));
source_update_stats
})
Expand Down
9 changes: 4 additions & 5 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::prelude::*;
use futures::future::try_join_all;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::Ordering::Relaxed;

use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey};
use super::db_tracking_setup;
Expand Down Expand Up @@ -101,7 +100,7 @@ impl SourceVersion {
};
if should_skip {
if let Some(update_stats) = update_stats {
update_stats.num_skipped.fetch_add(1, Relaxed);
update_stats.num_skipped.inc(1);
}
}
should_skip
Expand Down Expand Up @@ -585,12 +584,12 @@ pub async fn update_source_row(

if already_exists {
if output.is_some() {
update_stats.num_repreocesses.fetch_add(1, Relaxed);
update_stats.num_repreocesses.inc(1);
} else {
update_stats.num_deletions.fetch_add(1, Relaxed);
update_stats.num_deletions.inc(1);
}
} else if output.is_some() {
update_stats.num_insertions.fetch_add(1, Relaxed);
update_stats.num_insertions.inc(1);
}

Ok(SkippedOr::Normal(()))
Expand Down
90 changes: 69 additions & 21 deletions src/execution/stats.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,94 @@
use crate::prelude::*;

use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::{
ops::AddAssign,
sync::atomic::{AtomicI64, Ordering::Relaxed},
};

#[derive(Debug, Serialize, Default)]
pub struct UpdateStats {
pub num_skipped: AtomicUsize,
pub num_insertions: AtomicUsize,
pub num_deletions: AtomicUsize,
pub num_repreocesses: AtomicUsize,
pub num_errors: AtomicUsize,
#[derive(Default, Serialize)]
pub struct Counter(pub AtomicI64);

impl Counter {
pub fn inc(&self, by: i64) {
self.0.fetch_add(by, Relaxed);
}

pub fn get(&self) -> i64 {
self.0.load(Relaxed)
}

pub fn delta(&self, base: &Self) -> Counter {
Counter(AtomicI64::new(self.get() - base.get()))
}

pub fn into_inner(self) -> i64 {
self.0.into_inner()
}
}

impl AddAssign for Counter {
fn add_assign(&mut self, rhs: Self) {
self.0.fetch_add(rhs.into_inner(), Relaxed);
}
}

impl Clone for UpdateStats {
impl Clone for Counter {
fn clone(&self) -> Self {
Self {
num_skipped: self.num_skipped.load(Relaxed).into(),
num_insertions: self.num_insertions.load(Relaxed).into(),
num_deletions: self.num_deletions.load(Relaxed).into(),
num_repreocesses: self.num_repreocesses.load(Relaxed).into(),
num_errors: self.num_errors.load(Relaxed).into(),
Self(AtomicI64::new(self.get()))
}
}

impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.get())
}
}

impl std::fmt::Debug for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.get())
}
}

#[derive(Debug, Serialize, Default, Clone)]
pub struct UpdateStats {
pub num_skipped: Counter,
pub num_insertions: Counter,
pub num_deletions: Counter,
pub num_repreocesses: Counter,
pub num_errors: Counter,
}

impl UpdateStats {
pub fn delta(&self, base: &Self) -> Self {
UpdateStats {
num_skipped: self.num_skipped.delta(&base.num_skipped),
num_insertions: self.num_insertions.delta(&base.num_insertions),
num_deletions: self.num_deletions.delta(&base.num_deletions),
num_repreocesses: self.num_repreocesses.delta(&base.num_repreocesses),
num_errors: self.num_errors.delta(&base.num_errors),
}
}
}

impl std::fmt::Display for UpdateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let num_skipped = self.num_skipped.load(Relaxed);
let num_skipped = self.num_skipped.get();
if num_skipped > 0 {
write!(f, "{} rows skipped", num_skipped)?;
}

let num_insertions = self.num_insertions.load(Relaxed);
let num_deletions = self.num_deletions.load(Relaxed);
let num_reprocesses = self.num_repreocesses.load(Relaxed);
let num_insertions = self.num_insertions.get();
let num_deletions = self.num_deletions.get();
let num_reprocesses = self.num_repreocesses.get();
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
if num_source_rows > 0 {
if num_skipped > 0 {
write!(f, ", ")?;
write!(f, "; ")?;
}
write!(f, "{num_source_rows} source rows processed",)?;

let num_errors = self.num_errors.load(Relaxed);
let num_errors = self.num_errors.get();
if num_errors > 0 {
write!(f, " with {num_errors} ERRORS",)?;
}
Expand Down
5 changes: 4 additions & 1 deletion src/service/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ pub async fn update(
let mut live_updater = execution::FlowLiveUpdater::start(
flow_ctx.clone(),
&lib_context.pool,
execution::FlowLiveUpdaterOptions { live_mode: false },
execution::FlowLiveUpdaterOptions {
live_mode: false,
..Default::default()
},
)
.await?;
live_updater.wait().await?;
Expand Down