Skip to content

Commit 49c836d

Browse files
authored
Support periodically refresh by Synchrnoizer (#235)
* Add `refresh_options` to `ImportOpSpec`. * Add a synchronizer and expose `keep_in_sync` API to Python SDK.
1 parent 8f10543 commit 49c836d

File tree

9 files changed

+229
-44
lines changed

9 files changed

+229
-44
lines changed

src/base/spec.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use std::ops::Deref;
2-
3-
use serde::{Deserialize, Serialize};
1+
use crate::prelude::*;
42

53
use super::schema::{EnrichedValueType, FieldSchema};
4+
use std::ops::Deref;
65

76
#[derive(Debug, Clone, Serialize, Deserialize)]
87
#[serde(tag = "kind")]
@@ -163,9 +162,17 @@ pub struct OpSpec {
163162
pub spec: serde_json::Map<String, serde_json::Value>,
164163
}
165164

165+
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
166+
pub struct SourceRefreshOptions {
167+
pub refresh_interval: Option<std::time::Duration>,
168+
}
169+
166170
#[derive(Debug, Clone, Serialize, Deserialize)]
167171
pub struct ImportOpSpec {
168172
pub source: OpSpec,
173+
174+
#[serde(default)]
175+
pub refresh_options: SourceRefreshOptions,
169176
}
170177

171178
/// Transform data using a given operator.

src/builder/analyzer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,7 @@ impl AnalyzerContext<'_> {
668668
.typ
669669
.clone(),
670670
name: op_name,
671+
refresh_options: import_op.spec.refresh_options,
671672
})
672673
};
673674
Ok(result_fut)

src/builder/flow_builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,13 +380,14 @@ impl FlowBuilder {
380380
self.root_data_scope_ref.clone()
381381
}
382382

383-
#[pyo3(signature = (kind, op_spec, target_scope, name))]
383+
#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None))]
384384
pub fn add_source(
385385
&mut self,
386386
kind: String,
387387
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
388388
target_scope: Option<DataScopeRef>,
389389
name: String,
390+
refresh_options: Option<py::Pythonized<spec::SourceRefreshOptions>>,
390391
) -> PyResult<DataSlice> {
391392
if let Some(target_scope) = target_scope {
392393
if !Arc::ptr_eq(&target_scope.0, &self.root_data_scope_ref.0) {
@@ -402,6 +403,7 @@ impl FlowBuilder {
402403
kind,
403404
spec: op_spec.into_inner(),
404405
},
406+
refresh_options: refresh_options.map(|o| o.into_inner()).unwrap_or_default(),
405407
},
406408
};
407409
let analyzer_ctx = AnalyzerContext {

src/builder/plan.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
1-
use std::sync::Arc;
1+
use crate::prelude::*;
22

3-
use serde::Serialize;
4-
5-
use crate::base::schema::ValueType;
6-
use crate::base::value;
73
use crate::execution::db_tracking_setup;
84
use crate::ops::interface::*;
95
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
@@ -60,7 +56,8 @@ pub struct AnalyzedImportOp {
6056
pub source_id: i32,
6157
pub executor: Box<dyn SourceExecutor>,
6258
pub output: AnalyzedOpOutput,
63-
pub primary_key_type: ValueType,
59+
pub primary_key_type: schema::ValueType,
60+
pub refresh_options: spec::SourceRefreshOptions,
6461
}
6562

6663
pub struct AnalyzedFunctionExecInfo {
@@ -69,7 +66,7 @@ pub struct AnalyzedFunctionExecInfo {
6966

7067
/// Fingerprinter of the function's behavior.
7168
pub fingerprinter: Fingerprinter,
72-
pub output_type: ValueType,
69+
pub output_type: schema::ValueType,
7370
}
7471

7572
pub struct AnalyzedTransformOp {
@@ -106,7 +103,7 @@ pub struct AnalyzedExportOp {
106103
pub executor: Arc<dyn ExportTargetExecutor>,
107104
pub query_target: Option<Arc<dyn QueryTarget>>,
108105
pub primary_key_def: AnalyzedPrimaryKeyDef,
109-
pub primary_key_type: ValueType,
106+
pub primary_key_type: schema::ValueType,
110107
/// idx for value fields - excluding the primary key field.
111108
pub value_fields: Vec<u32>,
112109
/// If true, value is never changed on the same primary key.

src/execution/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,7 @@ pub(crate) mod row_indexer;
77
pub(crate) mod source_indexer;
88
pub(crate) mod stats;
99

10+
mod synchronizer;
11+
pub(crate) use synchronizer::*;
12+
1013
mod db_tracking;

src/execution/source_indexer.rs

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
use std::collections::{hash_map, HashMap};
2-
31
use crate::prelude::*;
42

3+
use sqlx::PgPool;
4+
use std::collections::{hash_map, HashMap};
5+
use tokio::{sync::Semaphore, task::JoinSet};
6+
57
use super::{
68
db_tracking,
79
row_indexer::{self, SkippedOr, SourceVersion},
810
stats,
911
};
10-
use futures::future::try_join_all;
11-
use sqlx::PgPool;
12-
use tokio::{sync::Semaphore, task::JoinSet};
1312
struct SourceRowIndexingState {
1413
source_version: SourceVersion,
1514
processing_sem: Arc<Semaphore>,
@@ -190,7 +189,11 @@ impl SourceIndexingContext {
190189
);
191190
}
192191

193-
async fn update_source(self: &Arc<Self>, pool: &PgPool) -> Result<stats::SourceUpdateInfo> {
192+
pub async fn update(
193+
self: &Arc<Self>,
194+
pool: &PgPool,
195+
update_stats: &Arc<stats::UpdateStats>,
196+
) -> Result<()> {
194197
let plan = self.flow.get_execution_plan().await?;
195198
let import_op = &plan.import_ops[self.source_idx];
196199
let mut rows_stream = import_op
@@ -204,13 +207,12 @@ impl SourceIndexingContext {
204207
state.scan_generation += 1;
205208
state.scan_generation
206209
};
207-
let update_stats = Arc::new(stats::UpdateStats::default());
208210
while let Some(row) = rows_stream.next().await {
209211
for row in row? {
210212
self.process_source_key_if_newer(
211213
row.key,
212214
SourceVersion::from_current(row.ordinal),
213-
&update_stats,
215+
update_stats,
214216
pool,
215217
&mut join_set,
216218
);
@@ -252,25 +254,6 @@ impl SourceIndexingContext {
252254
}
253255
}
254256

255-
Ok(stats::SourceUpdateInfo {
256-
source_name: import_op.name.clone(),
257-
stats: Arc::unwrap_or_clone(update_stats),
258-
})
257+
Ok(())
259258
}
260259
}
261-
262-
pub async fn update(flow_context: &FlowContext, pool: &PgPool) -> Result<stats::IndexUpdateInfo> {
263-
let plan = flow_context.flow.get_execution_plan().await?;
264-
let source_update_stats = try_join_all(
265-
(0..plan.import_ops.len())
266-
.map(|idx| async move {
267-
let source_context = flow_context.get_source_indexing_context(idx, pool).await?;
268-
source_context.update_source(pool).await
269-
})
270-
.collect::<Vec<_>>(),
271-
)
272-
.await?;
273-
Ok(stats::IndexUpdateInfo {
274-
sources: source_update_stats,
275-
})
276-
}

src/execution/synchronizer.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use std::time::Instant;
2+
3+
use crate::prelude::*;
4+
5+
use super::stats;
6+
use sqlx::PgPool;
7+
use tokio::task::JoinSet;
8+
9+
pub struct FlowSynchronizer {
10+
flow_ctx: Arc<FlowContext>,
11+
tasks: JoinSet<Result<()>>,
12+
sources_update_stats: Vec<Arc<stats::UpdateStats>>,
13+
}
14+
15+
pub struct FlowSynchronizerOptions {
16+
pub keep_refreshed: bool,
17+
}
18+
19+
async fn sync_source(
20+
flow_ctx: Arc<FlowContext>,
21+
plan: Arc<plan::ExecutionPlan>,
22+
source_update_stats: Arc<stats::UpdateStats>,
23+
source_idx: usize,
24+
pool: PgPool,
25+
keep_refreshed: bool,
26+
) -> Result<()> {
27+
let source_context = flow_ctx
28+
.get_source_indexing_context(source_idx, &pool)
29+
.await?;
30+
31+
let mut update_start = Instant::now();
32+
source_context.update(&pool, &source_update_stats).await?;
33+
34+
let import_op = &plan.import_ops[source_idx];
35+
if let (true, Some(refresh_interval)) =
36+
(keep_refreshed, import_op.refresh_options.refresh_interval)
37+
{
38+
loop {
39+
let elapsed = update_start.elapsed();
40+
if elapsed < refresh_interval {
41+
tokio::time::sleep(refresh_interval - elapsed).await;
42+
}
43+
update_start = Instant::now();
44+
source_context.update(&pool, &source_update_stats).await?;
45+
}
46+
}
47+
Ok(())
48+
}
49+
50+
impl FlowSynchronizer {
51+
pub async fn start(
52+
flow_ctx: Arc<FlowContext>,
53+
pool: &PgPool,
54+
options: &FlowSynchronizerOptions,
55+
) -> Result<Self> {
56+
let plan = flow_ctx.flow.get_execution_plan().await?;
57+
58+
let mut tasks = JoinSet::new();
59+
let sources_update_stats = (0..plan.import_ops.len())
60+
.map(|source_idx| {
61+
let source_update_stats = Arc::new(stats::UpdateStats::default());
62+
tasks.spawn(sync_source(
63+
flow_ctx.clone(),
64+
plan.clone(),
65+
source_update_stats.clone(),
66+
source_idx,
67+
pool.clone(),
68+
options.keep_refreshed,
69+
));
70+
source_update_stats
71+
})
72+
.collect();
73+
Ok(Self {
74+
flow_ctx,
75+
tasks,
76+
sources_update_stats,
77+
})
78+
}
79+
80+
pub async fn join(&mut self) -> Result<()> {
81+
while let Some(result) = self.tasks.join_next().await {
82+
if let Err(e) = (|| anyhow::Ok(result??))() {
83+
error!("{:?}", e.context("Error in synchronizing a source"));
84+
}
85+
}
86+
Ok(())
87+
}
88+
89+
pub fn abort(&mut self) {
90+
self.tasks.abort_all();
91+
}
92+
93+
pub fn index_update_info(&self) -> stats::IndexUpdateInfo {
94+
stats::IndexUpdateInfo {
95+
sources: std::iter::zip(
96+
self.flow_ctx.flow.flow_instance.import_ops.iter(),
97+
self.sources_update_stats.iter(),
98+
)
99+
.map(|(import_op, stats)| stats::SourceUpdateInfo {
100+
source_name: import_op.name.clone(),
101+
stats: (&**stats).clone(),
102+
})
103+
.collect(),
104+
}
105+
}
106+
}

src/py/mod.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,52 @@ impl IndexUpdateInfo {
9292
#[pyclass]
9393
pub struct Flow(pub Arc<FlowContext>);
9494

95+
#[pyclass]
96+
pub struct FlowSynchronizer(pub async_lock::RwLock<execution::FlowSynchronizer>);
97+
98+
#[pymethods]
99+
impl FlowSynchronizer {
100+
pub fn join(&self, py: Python<'_>) -> PyResult<()> {
101+
py.allow_threads(|| {
102+
let lib_context = get_lib_context()
103+
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
104+
lib_context
105+
.runtime
106+
.block_on(async {
107+
let mut synchronizer = self.0.write().await;
108+
synchronizer.join().await
109+
})
110+
.into_py_result()
111+
})
112+
}
113+
114+
pub fn abort(&self, py: Python<'_>) -> PyResult<()> {
115+
py.allow_threads(|| {
116+
let lib_context = get_lib_context()
117+
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
118+
lib_context.runtime.block_on(async {
119+
let mut synchronizer = self.0.write().await;
120+
synchronizer.abort();
121+
});
122+
Ok(())
123+
})
124+
}
125+
126+
pub fn index_update_info(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
127+
py.allow_threads(|| {
128+
let lib_context = get_lib_context()
129+
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
130+
lib_context
131+
.runtime
132+
.block_on(async {
133+
let synchronizer = self.0.read().await;
134+
anyhow::Ok(IndexUpdateInfo(synchronizer.index_update_info()))
135+
})
136+
.into_py_result()
137+
})
138+
}
139+
}
140+
95141
#[pymethods]
96142
impl Flow {
97143
pub fn __str__(&self) -> String {
@@ -113,13 +159,44 @@ impl Flow {
113159
let update_info = lib_context
114160
.runtime
115161
.block_on(async {
116-
execution::source_indexer::update(&self.0, &lib_context.pool).await
162+
let mut synchronizer = execution::FlowSynchronizer::start(
163+
self.0.clone(),
164+
&lib_context.pool,
165+
&execution::FlowSynchronizerOptions {
166+
keep_refreshed: false,
167+
},
168+
)
169+
.await?;
170+
synchronizer.join().await?;
171+
anyhow::Ok(synchronizer.index_update_info())
117172
})
118173
.into_py_result()?;
119174
Ok(IndexUpdateInfo(update_info))
120175
})
121176
}
122177

178+
pub fn keep_in_sync(&self, py: Python<'_>) -> PyResult<FlowSynchronizer> {
179+
py.allow_threads(|| {
180+
let lib_context = get_lib_context()
181+
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
182+
let synchronizer = lib_context
183+
.runtime
184+
.block_on(async {
185+
let synchronizer = execution::FlowSynchronizer::start(
186+
self.0.clone(),
187+
&lib_context.pool,
188+
&execution::FlowSynchronizerOptions {
189+
keep_refreshed: false,
190+
},
191+
)
192+
.await?;
193+
anyhow::Ok(synchronizer)
194+
})
195+
.into_py_result()?;
196+
Ok(FlowSynchronizer(async_lock::RwLock::new(synchronizer)))
197+
})
198+
}
199+
123200
pub fn evaluate_and_dump(
124201
&self,
125202
py: Python<'_>,
@@ -308,6 +385,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
308385
m.add_class::<builder::flow_builder::DataSlice>()?;
309386
m.add_class::<builder::flow_builder::DataScopeRef>()?;
310387
m.add_class::<Flow>()?;
388+
m.add_class::<FlowSynchronizer>()?;
311389
m.add_class::<TransientFlow>()?;
312390
m.add_class::<IndexUpdateInfo>()?;
313391
m.add_class::<SimpleSemanticsQueryHandler>()?;

0 commit comments

Comments
 (0)