From 536b2ae453dbb75fe3703512d2b5150b065ca0fc Mon Sep 17 00:00:00 2001 From: LJ Date: Mon, 31 Mar 2025 09:16:14 -0700 Subject: [PATCH 1/2] Add `refresh_options` to `ImportOpSpec`. --- src/base/spec.rs | 13 ++++++++++--- src/builder/analyzer.rs | 1 + src/builder/flow_builder.rs | 4 +++- src/builder/plan.rs | 13 +++++-------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/base/spec.rs b/src/base/spec.rs index da5301d6e..1c1ec154a 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -1,8 +1,7 @@ -use std::ops::Deref; - -use serde::{Deserialize, Serialize}; +use crate::prelude::*; use super::schema::{EnrichedValueType, FieldSchema}; +use std::ops::Deref; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind")] @@ -163,9 +162,17 @@ pub struct OpSpec { pub spec: serde_json::Map, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SourceRefreshOptions { + pub refresh_interval: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ImportOpSpec { pub source: OpSpec, + + #[serde(default)] + pub refresh_options: SourceRefreshOptions, } /// Transform data using a given operator. diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index a97776239..27f971b8f 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -668,6 +668,7 @@ impl AnalyzerContext<'_> { .typ .clone(), name: op_name, + refresh_options: import_op.spec.refresh_options, }) }; Ok(result_fut) diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index d7dd2f5cb..2812fc961 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -380,13 +380,14 @@ impl FlowBuilder { self.root_data_scope_ref.clone() } - #[pyo3(signature = (kind, op_spec, target_scope, name))] + #[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None))] pub fn add_source( &mut self, kind: String, op_spec: py::Pythonized>, target_scope: Option, name: String, + refresh_options: Option>, ) -> PyResult { if let Some(target_scope) = target_scope { if !Arc::ptr_eq(&target_scope.0, &self.root_data_scope_ref.0) { @@ -402,6 +403,7 @@ impl FlowBuilder { kind, spec: op_spec.into_inner(), }, + refresh_options: refresh_options.map(|o| o.into_inner()).unwrap_or_default(), }, }; let analyzer_ctx = AnalyzerContext { diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 1d3b95e6f..bfca28648 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -1,9 +1,5 @@ -use std::sync::Arc; +use crate::prelude::*; -use serde::Serialize; - -use crate::base::schema::ValueType; -use crate::base::value; use crate::execution::db_tracking_setup; use crate::ops::interface::*; use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; @@ -60,7 +56,8 @@ pub struct AnalyzedImportOp { pub source_id: i32, pub executor: Box, pub output: AnalyzedOpOutput, - pub primary_key_type: ValueType, + pub primary_key_type: schema::ValueType, + pub refresh_options: spec::SourceRefreshOptions, } pub struct AnalyzedFunctionExecInfo { @@ -69,7 +66,7 @@ pub struct AnalyzedFunctionExecInfo { /// Fingerprinter of the function's behavior. pub fingerprinter: Fingerprinter, - pub output_type: ValueType, + pub output_type: schema::ValueType, } pub struct AnalyzedTransformOp { @@ -106,7 +103,7 @@ pub struct AnalyzedExportOp { pub executor: Arc, pub query_target: Option>, pub primary_key_def: AnalyzedPrimaryKeyDef, - pub primary_key_type: ValueType, + pub primary_key_type: schema::ValueType, /// idx for value fields - excluding the primary key field. pub value_fields: Vec, /// If true, value is never changed on the same primary key. From bdd65a2a104393a76fb23fa88619642d8d85bc2d Mon Sep 17 00:00:00 2001 From: LJ Date: Mon, 31 Mar 2025 13:05:22 -0700 Subject: [PATCH 2/2] Add a synchronizer and expose `keep_in_sync` API to Python SDK. --- src/base/spec.rs | 2 +- src/execution/mod.rs | 3 + src/execution/source_indexer.rs | 39 ++++-------- src/execution/synchronizer.rs | 106 ++++++++++++++++++++++++++++++++ src/py/mod.rs | 80 +++++++++++++++++++++++- src/service/flows.rs | 14 ++++- 6 files changed, 211 insertions(+), 33 deletions(-) create mode 100644 src/execution/synchronizer.rs diff --git a/src/base/spec.rs b/src/base/spec.rs index 1c1ec154a..020be201a 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -164,7 +164,7 @@ pub struct OpSpec { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct SourceRefreshOptions { - pub refresh_interval: Option, + pub refresh_interval: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/execution/mod.rs b/src/execution/mod.rs index fce1a8c80..c3cf7986d 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -7,4 +7,7 @@ pub(crate) mod row_indexer; pub(crate) mod source_indexer; pub(crate) mod stats; +mod synchronizer; +pub(crate) use synchronizer::*; + mod db_tracking; diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 664c7aec0..7f78823ef 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -1,15 +1,14 @@ -use std::collections::{hash_map, HashMap}; - use crate::prelude::*; +use sqlx::PgPool; +use std::collections::{hash_map, HashMap}; +use tokio::{sync::Semaphore, task::JoinSet}; + use super::{ db_tracking, row_indexer::{self, SkippedOr, SourceVersion}, stats, }; -use futures::future::try_join_all; -use sqlx::PgPool; -use tokio::{sync::Semaphore, task::JoinSet}; struct SourceRowIndexingState { source_version: SourceVersion, processing_sem: Arc, @@ -190,7 +189,11 @@ impl SourceIndexingContext { ); } - async fn update_source(self: &Arc, pool: &PgPool) -> Result { + pub async fn update( + self: &Arc, + pool: &PgPool, + update_stats: &Arc, + ) -> Result<()> { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; let mut rows_stream = import_op @@ -204,13 +207,12 @@ impl SourceIndexingContext { state.scan_generation += 1; state.scan_generation }; - let update_stats = Arc::new(stats::UpdateStats::default()); while let Some(row) = rows_stream.next().await { for row in row? { self.process_source_key_if_newer( row.key, SourceVersion::from_current(row.ordinal), - &update_stats, + update_stats, pool, &mut join_set, ); @@ -252,25 +254,6 @@ impl SourceIndexingContext { } } - Ok(stats::SourceUpdateInfo { - source_name: import_op.name.clone(), - stats: Arc::unwrap_or_clone(update_stats), - }) + Ok(()) } } - -pub async fn update(flow_context: &FlowContext, pool: &PgPool) -> Result { - let plan = flow_context.flow.get_execution_plan().await?; - let source_update_stats = try_join_all( - (0..plan.import_ops.len()) - .map(|idx| async move { - let source_context = flow_context.get_source_indexing_context(idx, pool).await?; - source_context.update_source(pool).await - }) - .collect::>(), - ) - .await?; - Ok(stats::IndexUpdateInfo { - sources: source_update_stats, - }) -} diff --git a/src/execution/synchronizer.rs b/src/execution/synchronizer.rs new file mode 100644 index 000000000..ef85418dc --- /dev/null +++ b/src/execution/synchronizer.rs @@ -0,0 +1,106 @@ +use std::time::Instant; + +use crate::prelude::*; + +use super::stats; +use sqlx::PgPool; +use tokio::task::JoinSet; + +pub struct FlowSynchronizer { + flow_ctx: Arc, + tasks: JoinSet>, + sources_update_stats: Vec>, +} + +pub struct FlowSynchronizerOptions { + pub keep_refreshed: bool, +} + +async fn sync_source( + flow_ctx: Arc, + plan: Arc, + source_update_stats: Arc, + source_idx: usize, + pool: PgPool, + keep_refreshed: bool, +) -> Result<()> { + let source_context = flow_ctx + .get_source_indexing_context(source_idx, &pool) + .await?; + + let mut update_start = Instant::now(); + source_context.update(&pool, &source_update_stats).await?; + + let import_op = &plan.import_ops[source_idx]; + if let (true, Some(refresh_interval)) = + (keep_refreshed, import_op.refresh_options.refresh_interval) + { + loop { + let elapsed = update_start.elapsed(); + if elapsed < refresh_interval { + tokio::time::sleep(refresh_interval - elapsed).await; + } + update_start = Instant::now(); + source_context.update(&pool, &source_update_stats).await?; + } + } + Ok(()) +} + +impl FlowSynchronizer { + pub async fn start( + flow_ctx: Arc, + pool: &PgPool, + options: &FlowSynchronizerOptions, + ) -> Result { + let plan = flow_ctx.flow.get_execution_plan().await?; + + let mut tasks = JoinSet::new(); + let sources_update_stats = (0..plan.import_ops.len()) + .map(|source_idx| { + let source_update_stats = Arc::new(stats::UpdateStats::default()); + tasks.spawn(sync_source( + flow_ctx.clone(), + plan.clone(), + source_update_stats.clone(), + source_idx, + pool.clone(), + options.keep_refreshed, + )); + source_update_stats + }) + .collect(); + Ok(Self { + flow_ctx, + tasks, + sources_update_stats, + }) + } + + pub async fn join(&mut self) -> Result<()> { + while let Some(result) = self.tasks.join_next().await { + if let Err(e) = (|| anyhow::Ok(result??))() { + error!("{:?}", e.context("Error in synchronizing a source")); + } + } + Ok(()) + } + + pub fn abort(&mut self) { + self.tasks.abort_all(); + } + + pub fn index_update_info(&self) -> stats::IndexUpdateInfo { + stats::IndexUpdateInfo { + sources: std::iter::zip( + self.flow_ctx.flow.flow_instance.import_ops.iter(), + self.sources_update_stats.iter(), + ) + .map(|(import_op, stats)| stats::SourceUpdateInfo { + source_name: import_op.name.clone(), + stats: (&**stats).clone(), + }) + .collect(), + } + } +} diff --git a/src/py/mod.rs b/src/py/mod.rs index 4cb63c9c3..3f78cf050 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -92,6 +92,52 @@ impl IndexUpdateInfo { #[pyclass] pub struct Flow(pub Arc); +#[pyclass] +pub struct FlowSynchronizer(pub async_lock::RwLock); + +#[pymethods] +impl FlowSynchronizer { + pub fn join(&self, py: Python<'_>) -> PyResult<()> { + py.allow_threads(|| { + let lib_context = get_lib_context() + .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + lib_context + .runtime + .block_on(async { + let mut synchronizer = self.0.write().await; + synchronizer.join().await + }) + .into_py_result() + }) + } + + pub fn abort(&self, py: Python<'_>) -> PyResult<()> { + py.allow_threads(|| { + let lib_context = get_lib_context() + .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + lib_context.runtime.block_on(async { + let mut synchronizer = self.0.write().await; + synchronizer.abort(); + }); + Ok(()) + }) + } + + pub fn index_update_info(&self, py: Python<'_>) -> PyResult { + py.allow_threads(|| { + let lib_context = get_lib_context() + .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + lib_context + .runtime + .block_on(async { + let synchronizer = self.0.read().await; + anyhow::Ok(IndexUpdateInfo(synchronizer.index_update_info())) + }) + .into_py_result() + }) + } +} + #[pymethods] impl Flow { pub fn __str__(&self) -> String { @@ -113,13 +159,44 @@ impl Flow { let update_info = lib_context .runtime .block_on(async { - execution::source_indexer::update(&self.0, &lib_context.pool).await + let mut synchronizer = execution::FlowSynchronizer::start( + self.0.clone(), + &lib_context.pool, + &execution::FlowSynchronizerOptions { + keep_refreshed: false, + }, + ) + .await?; + synchronizer.join().await?; + anyhow::Ok(synchronizer.index_update_info()) }) .into_py_result()?; Ok(IndexUpdateInfo(update_info)) }) } + pub fn keep_in_sync(&self, py: Python<'_>) -> PyResult { + py.allow_threads(|| { + let lib_context = get_lib_context() + .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + let synchronizer = lib_context + .runtime + .block_on(async { + let synchronizer = execution::FlowSynchronizer::start( + self.0.clone(), + &lib_context.pool, + &execution::FlowSynchronizerOptions { + keep_refreshed: false, + }, + ) + .await?; + anyhow::Ok(synchronizer) + }) + .into_py_result()?; + Ok(FlowSynchronizer(async_lock::RwLock::new(synchronizer))) + }) + } + pub fn evaluate_and_dump( &self, py: Python<'_>, @@ -308,6 +385,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/service/flows.rs b/src/service/flows.rs index 84ee7ae7a..020026a2a 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -1,11 +1,11 @@ use crate::prelude::*; +use crate::lib_context::LibContext; use crate::{base::schema::DataSchema, ops::interface::SourceExecutorListOptions}; use crate::{ execution::memoization, execution::{row_indexer, stats}, }; -use crate::{execution::source_indexer, lib_context::LibContext}; use axum::{ extract::{Path, State}, http::StatusCode, @@ -171,6 +171,14 @@ pub async fn update( State(lib_context): State>, ) -> Result, ApiError> { let flow_ctx = lib_context.get_flow_context(&flow_name)?; - let update_info = source_indexer::update(&flow_ctx, &lib_context.pool).await?; - Ok(Json(update_info)) + let mut synchronizer = execution::FlowSynchronizer::start( + flow_ctx.clone(), + &lib_context.pool, + &execution::FlowSynchronizerOptions { + keep_refreshed: false, + }, + ) + .await?; + synchronizer.join().await?; + Ok(Json(synchronizer.index_update_info())) }