Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/docs/core/flow_methods.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ cocoindex update --setup main.py

The `Flow.update()` method creates/updates data in the target.

Once the function returns, the target data is fresh up to the moment when the function is called.

```python
stats = demo_flow.update()
Expand All @@ -171,6 +170,16 @@ stats = await demo_flow.update_async()
print(stats)
```

Once the function finishes, the target data is fresh up to the moment when the function is called.

:::info

`update()` and `update_async()` can be called simultaneously, even if a previous call is not finished yet.
It's quite cheap to do so, as CocoIndex will automatically combine multiple calls into a single update, as long as we hold the guarantee that the target data is fresh up to the moment when the last call is initiated.

:::


</TabItem>
</Tabs>

Expand Down
50 changes: 48 additions & 2 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::prelude::*;
use crate::{
prelude::*,
service::error::{SharedError, SharedResult, SharedResultExt},
};

use futures::future::Ready;
use sqlx::PgPool;
Expand Down Expand Up @@ -33,9 +36,12 @@ struct SourceIndexingState {
rows: HashMap<value::KeyValue, SourceRowIndexingState>,
scan_generation: usize,
}

pub struct SourceIndexingContext {
flow: Arc<builder::AnalyzedFlow>,
source_idx: usize,
pending_update: Mutex<Option<Shared<BoxFuture<'static, SharedResult<()>>>>>,
update_sem: Semaphore,
state: Mutex<SourceIndexingState>,
setup_execution_ctx: Arc<exec_ctx::FlowSetupExecutionContext>,
}
Expand Down Expand Up @@ -88,6 +94,8 @@ impl SourceIndexingContext {
rows,
scan_generation,
}),
pending_update: Mutex::new(None),
update_sem: Semaphore::new(1),
setup_execution_ctx,
})
}
Expand Down Expand Up @@ -269,11 +277,49 @@ impl SourceIndexingContext {
pool.clone(),
))
}

pub async fn update(
self: &Arc<Self>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
) -> Result<()> {
let pending_update_fut = {
let mut pending_update = self.pending_update.lock().unwrap();
if let Some(pending_update_fut) = &*pending_update {
pending_update_fut.clone()
} else {
let slf = self.clone();
let pool = pool.clone();
let update_stats = update_stats.clone();
let task = tokio::spawn(async move {
{
let _permit = slf.update_sem.acquire().await?;
{
let mut pending_update = slf.pending_update.lock().unwrap();
*pending_update = None;
}
slf.update_once(&pool, &update_stats).await?;
}
anyhow::Ok(())
});
let pending_update_fut = async move {
task.await
.map_err(SharedError::from)?
.map_err(SharedError::new)
}
.boxed()
.shared();
*pending_update = Some(pending_update_fut.clone());
pending_update_fut
}
};
pending_update_fut.await.std_result()?;
Ok(())
}

async fn update_once(
self: &Arc<Self>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
) -> Result<()> {
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
Expand Down
2 changes: 2 additions & 0 deletions src/service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub fn shared_ok<T>(value: T) -> Result<T, SharedError> {
Ok(value)
}

pub type SharedResult<T> = Result<T, SharedError>;

pub struct SharedErrorWrapper(SharedError);

impl Display for SharedErrorWrapper {
Expand Down