diff --git a/docs/docs/core/flow_methods.mdx b/docs/docs/core/flow_methods.mdx index f67425d7d..b06c07409 100644 --- a/docs/docs/core/flow_methods.mdx +++ b/docs/docs/core/flow_methods.mdx @@ -157,7 +157,6 @@ cocoindex update --setup main.py The `Flow.update()` method creates/updates data in the target. -Once the function returns, the target data is fresh up to the moment when the function is called. ```python stats = demo_flow.update() @@ -171,6 +170,16 @@ stats = await demo_flow.update_async() print(stats) ``` +Once the function finishes, the target data is fresh up to the moment when the function is called. + +:::info + +`update()` and `update_async()` can be called simultaneously, even if a previous call is not finished yet. +It's quite cheap to do so, as CocoIndex will automatically combine multiple calls into a single update, as long as we hold the guarantee that the target data is fresh up to the moment when the last call is initiated. + +::: + + diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index a24697357..d0a99ba22 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -1,4 +1,7 @@ -use crate::prelude::*; +use crate::{ + prelude::*, + service::error::{SharedError, SharedResult, SharedResultExt}, +}; use futures::future::Ready; use sqlx::PgPool; @@ -33,9 +36,12 @@ struct SourceIndexingState { rows: HashMap, scan_generation: usize, } + pub struct SourceIndexingContext { flow: Arc, source_idx: usize, + pending_update: Mutex>>>>, + update_sem: Semaphore, state: Mutex, setup_execution_ctx: Arc, } @@ -88,6 +94,8 @@ impl SourceIndexingContext { rows, scan_generation, }), + pending_update: Mutex::new(None), + update_sem: Semaphore::new(1), setup_execution_ctx, }) } @@ -269,11 +277,49 @@ impl SourceIndexingContext { pool.clone(), )) } - pub async fn update( self: &Arc, pool: &PgPool, update_stats: &Arc, + ) -> Result<()> { + let pending_update_fut = { + let mut pending_update = self.pending_update.lock().unwrap(); + if let Some(pending_update_fut) = &*pending_update { + pending_update_fut.clone() + } else { + let slf = self.clone(); + let pool = pool.clone(); + let update_stats = update_stats.clone(); + let task = tokio::spawn(async move { + { + let _permit = slf.update_sem.acquire().await?; + { + let mut pending_update = slf.pending_update.lock().unwrap(); + *pending_update = None; + } + slf.update_once(&pool, &update_stats).await?; + } + anyhow::Ok(()) + }); + let pending_update_fut = async move { + task.await + .map_err(SharedError::from)? + .map_err(SharedError::new) + } + .boxed() + .shared(); + *pending_update = Some(pending_update_fut.clone()); + pending_update_fut + } + }; + pending_update_fut.await.std_result()?; + Ok(()) + } + + async fn update_once( + self: &Arc, + pool: &PgPool, + update_stats: &Arc, ) -> Result<()> { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; diff --git a/src/service/error.rs b/src/service/error.rs index bb57cec5c..f28aaf73d 100644 --- a/src/service/error.rs +++ b/src/service/error.rs @@ -117,6 +117,8 @@ pub fn shared_ok(value: T) -> Result { Ok(value) } +pub type SharedResult = Result; + pub struct SharedErrorWrapper(SharedError); impl Display for SharedErrorWrapper {