diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 1ae99444e..b5bdb762f 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -47,12 +47,24 @@ def setup(delete_legacy_flows): @cli.command() @click.argument("flow_name", type=str, required=False) -def update(flow_name: str | None): +@click.option( + "-L", "--live", is_flag=True, show_default=True, default=False, + help="If true, it will continuously watch changes from data sources and apply to the target index.") +def update(flow_name: str | None, live: bool): """ - Update the index defined by the flow. + Update the index to reflect the latest data from data sources. """ - stats = asyncio.run(_flow_by_name(flow_name).update()) - print(stats) + async def _update_all(): + async def _update_flow(fl: flow.Flow): + updater = flow.FlowLiveUpdater(fl, flow.FlowLiveUpdaterOptions(live_mode=live)) + await updater.wait() + print(f"Updated index for {fl.name}:") + for line in str(updater.update_stats()).split("\n"): + if line := line.strip(): + print(f" {line}") + print() + await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name))) + asyncio.run(_update_all()) @cli.command() @click.argument("flow_name", type=str, required=False) @@ -112,3 +124,9 @@ def _flow_name(name: str | None) -> str: def _flow_by_name(name: str | None) -> flow.Flow: return flow.flow_by_name(_flow_name(name)) + +def _flows_by_name(name: str | None) -> list[flow.Flow]: + if name is None: + return flow.flows() + else: + return [flow.flow_by_name(name)] diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 890e7d711..5b461c242 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -344,6 +344,43 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli ), name ) + +@dataclass +class FlowLiveUpdaterOptions: + """ + Options for live updating a flow. + """ + live_mode: bool = False + +class FlowLiveUpdater: + """ + A live updater for a flow. + """ + _engine_live_updater: _engine.FlowLiveUpdater + + def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions): + self._engine_live_updater = _engine.FlowLiveUpdater( + fl._lazy_engine_flow(), _dump_engine_object(options)) + + async def wait(self): + """ + Wait for the live updater to finish. + """ + return await self._engine_live_updater.wait() + + def abort(self): + """ + Abort the live updater. + """ + self._engine_live_updater.abort() + + def update_stats(self) -> _engine.IndexUpdateInfo: + """ + Get the index update info. + """ + return self._engine_live_updater.index_update_info() + + @dataclass class EvaluateAndDumpOptions: """ @@ -388,7 +425,9 @@ async def update(self): Update the index defined by the flow. Once the function returns, the indice is fresh up to the moment when the function is called. """ - return await self._lazy_engine_flow().update() + updater = FlowLiveUpdater(self, FlowLiveUpdaterOptions(live_mode=False)) + await updater.wait() + return updater.update_stats() def evaluate_and_dump(self, options: EvaluateAndDumpOptions): """ @@ -441,6 +480,13 @@ def flow_names() -> list[str]: with _flows_lock: return list(_flows.keys()) +def flows() -> list[Flow]: + """ + Get all flows. + """ + with _flows_lock: + return list(_flows.values()) + def flow_by_name(name: str) -> Flow: """ Get a flow by name. diff --git a/src/execution/synchronizer.rs b/src/execution/live_updater.rs similarity index 77% rename from src/execution/synchronizer.rs rename to src/execution/live_updater.rs index ef85418dc..4b4eb020a 100644 --- a/src/execution/synchronizer.rs +++ b/src/execution/live_updater.rs @@ -6,23 +6,26 @@ use super::stats; use sqlx::PgPool; use tokio::task::JoinSet; -pub struct FlowSynchronizer { +pub struct FlowLiveUpdater { flow_ctx: Arc, tasks: JoinSet>, sources_update_stats: Vec>, } -pub struct FlowSynchronizerOptions { - pub keep_refreshed: bool, +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct FlowLiveUpdaterOptions { + /// If true, the updater will keep refreshing the index. + /// Otherwise, it will only apply changes from the source up to the current time. + pub live_mode: bool, } -async fn sync_source( +async fn update_source( flow_ctx: Arc, plan: Arc, source_update_stats: Arc, source_idx: usize, pool: PgPool, - keep_refreshed: bool, + live_mode: bool, ) -> Result<()> { let source_context = flow_ctx .get_source_indexing_context(source_idx, &pool) @@ -32,8 +35,7 @@ async fn sync_source( source_context.update(&pool, &source_update_stats).await?; let import_op = &plan.import_ops[source_idx]; - if let (true, Some(refresh_interval)) = - (keep_refreshed, import_op.refresh_options.refresh_interval) + if let (true, Some(refresh_interval)) = (live_mode, import_op.refresh_options.refresh_interval) { loop { let elapsed = update_start.elapsed(); @@ -47,11 +49,11 @@ async fn sync_source( Ok(()) } -impl FlowSynchronizer { +impl FlowLiveUpdater { pub async fn start( flow_ctx: Arc, pool: &PgPool, - options: &FlowSynchronizerOptions, + options: FlowLiveUpdaterOptions, ) -> Result { let plan = flow_ctx.flow.get_execution_plan().await?; @@ -59,13 +61,13 @@ impl FlowSynchronizer { let sources_update_stats = (0..plan.import_ops.len()) .map(|source_idx| { let source_update_stats = Arc::new(stats::UpdateStats::default()); - tasks.spawn(sync_source( + tasks.spawn(update_source( flow_ctx.clone(), plan.clone(), source_update_stats.clone(), source_idx, pool.clone(), - options.keep_refreshed, + options.live_mode, )); source_update_stats }) @@ -77,10 +79,10 @@ impl FlowSynchronizer { }) } - pub async fn join(&mut self) -> Result<()> { + pub async fn wait(&mut self) -> Result<()> { while let Some(result) = self.tasks.join_next().await { if let Err(e) = (|| anyhow::Ok(result??))() { - error!("{:?}", e.context("Error in synchronizing a source")); + error!("{:?}", e.context("Error in applying changes from a source")); } } Ok(()) diff --git a/src/execution/mod.rs b/src/execution/mod.rs index c3cf7986d..a4703e4cf 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -7,7 +7,7 @@ pub(crate) mod row_indexer; pub(crate) mod source_indexer; pub(crate) mod stats; -mod synchronizer; -pub(crate) use synchronizer::*; +mod live_updater; +pub(crate) use live_updater::*; mod db_tracking; diff --git a/src/py/mod.rs b/src/py/mod.rs index e6f3c25ad..0b48f4037 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -10,6 +10,7 @@ use crate::server::{self, ServerSettings}; use crate::settings::Settings; use crate::setup; use pyo3::{exceptions::PyException, prelude::*}; +use pyo3_async_runtimes::tokio::future_into_py; use std::collections::btree_map; mod convert; @@ -82,29 +83,51 @@ impl IndexUpdateInfo { pub struct Flow(pub Arc); #[pyclass] -pub struct FlowSynchronizer(pub Arc>); +pub struct FlowLiveUpdater(pub Arc>); #[pymethods] -impl FlowSynchronizer { - pub fn join<'py>(&self, py: Python<'py>) -> PyResult> { - let synchronizer = self.0.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { - let mut synchronizer = synchronizer.write().await; - synchronizer.join().await.into_py_result() +impl FlowLiveUpdater { + #[new] + pub fn new( + py: Python<'_>, + flow: &Flow, + options: Pythonized, + ) -> PyResult { + py.allow_threads(|| { + let live_updater = get_runtime() + .block_on(async { + let live_updater = execution::FlowLiveUpdater::start( + flow.0.clone(), + &get_lib_context()?.pool, + options.into_inner(), + ) + .await?; + anyhow::Ok(live_updater) + }) + .into_py_result()?; + Ok(Self(Arc::new(tokio::sync::RwLock::new(live_updater)))) + }) + } + + pub fn wait<'py>(&self, py: Python<'py>) -> PyResult> { + let live_updater = self.0.clone(); + future_into_py(py, async move { + let mut live_updater = live_updater.write().await; + live_updater.wait().await.into_py_result() }) } pub fn abort(&self, py: Python<'_>) { py.allow_threads(|| { - let mut synchronizer = self.0.blocking_write(); - synchronizer.abort(); + let mut live_updater = self.0.blocking_write(); + live_updater.abort(); }) } pub fn index_update_info(&self, py: Python<'_>) -> IndexUpdateInfo { py.allow_threads(|| { - let synchronizer = self.0.blocking_read(); - IndexUpdateInfo(synchronizer.index_update_info()) + let live_updater = self.0.blocking_read(); + IndexUpdateInfo(live_updater.index_update_info()) }) } } @@ -123,47 +146,6 @@ impl Flow { &self.0.flow.flow_instance.name } - pub fn update<'py>(&self, py: Python<'py>) -> PyResult> { - let flow_ctx = self.0.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { - let update_info = { - let mut synchronizer = execution::FlowSynchronizer::start( - flow_ctx, - &get_lib_context().into_py_result()?.pool, - &execution::FlowSynchronizerOptions { - keep_refreshed: false, - }, - ) - .await - .into_py_result()?; - synchronizer.join().await.into_py_result()?; - synchronizer.index_update_info() - }; - Ok(IndexUpdateInfo(update_info)) - }) - } - - pub fn keep_in_sync(&self, py: Python<'_>) -> PyResult { - py.allow_threads(|| { - let synchronizer = get_runtime() - .block_on(async { - let synchronizer = execution::FlowSynchronizer::start( - self.0.clone(), - &get_lib_context()?.pool, - &execution::FlowSynchronizerOptions { - keep_refreshed: false, - }, - ) - .await?; - anyhow::Ok(synchronizer) - }) - .into_py_result()?; - Ok(FlowSynchronizer(Arc::new(tokio::sync::RwLock::new( - synchronizer, - )))) - }) - } - pub fn evaluate_and_dump( &self, py: Python<'_>, @@ -340,7 +322,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/service/flows.rs b/src/service/flows.rs index 020026a2a..1179c582f 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -171,14 +171,12 @@ pub async fn update( State(lib_context): State>, ) -> Result, ApiError> { let flow_ctx = lib_context.get_flow_context(&flow_name)?; - let mut synchronizer = execution::FlowSynchronizer::start( + let mut live_updater = execution::FlowLiveUpdater::start( flow_ctx.clone(), &lib_context.pool, - &execution::FlowSynchronizerOptions { - keep_refreshed: false, - }, + execution::FlowLiveUpdaterOptions { live_mode: false }, ) .await?; - synchronizer.join().await?; - Ok(Json(synchronizer.index_update_info())) + live_updater.wait().await?; + Ok(Json(live_updater.index_update_info())) }