Skip to content

Commit 42ff48a

Browse files
authored
Expose --live option to update CLI subcommand. (#240)
1 parent c59163a commit 42ff48a

File tree

6 files changed

+125
-79
lines changed

6 files changed

+125
-79
lines changed

python/cocoindex/cli.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,24 @@ def setup(delete_legacy_flows):
4747

4848
@cli.command()
4949
@click.argument("flow_name", type=str, required=False)
50-
def update(flow_name: str | None):
50+
@click.option(
51+
"-L", "--live", is_flag=True, show_default=True, default=False,
52+
help="If true, it will continuously watch changes from data sources and apply to the target index.")
53+
def update(flow_name: str | None, live: bool):
5154
"""
52-
Update the index defined by the flow.
55+
Update the index to reflect the latest data from data sources.
5356
"""
54-
stats = asyncio.run(_flow_by_name(flow_name).update())
55-
print(stats)
57+
async def _update_all():
58+
async def _update_flow(fl: flow.Flow):
59+
updater = flow.FlowLiveUpdater(fl, flow.FlowLiveUpdaterOptions(live_mode=live))
60+
await updater.wait()
61+
print(f"Updated index for {fl.name}:")
62+
for line in str(updater.update_stats()).split("\n"):
63+
if line := line.strip():
64+
print(f" {line}")
65+
print()
66+
await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name)))
67+
asyncio.run(_update_all())
5668

5769
@cli.command()
5870
@click.argument("flow_name", type=str, required=False)
@@ -112,3 +124,9 @@ def _flow_name(name: str | None) -> str:
112124

113125
def _flow_by_name(name: str | None) -> flow.Flow:
114126
return flow.flow_by_name(_flow_name(name))
127+
128+
def _flows_by_name(name: str | None) -> list[flow.Flow]:
129+
if name is None:
130+
return flow.flows()
131+
else:
132+
return [flow.flow_by_name(name)]

python/cocoindex/flow.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,43 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
344344
),
345345
name
346346
)
347+
348+
@dataclass
349+
class FlowLiveUpdaterOptions:
350+
"""
351+
Options for live updating a flow.
352+
"""
353+
live_mode: bool = False
354+
355+
class FlowLiveUpdater:
356+
"""
357+
A live updater for a flow.
358+
"""
359+
_engine_live_updater: _engine.FlowLiveUpdater
360+
361+
def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions):
362+
self._engine_live_updater = _engine.FlowLiveUpdater(
363+
fl._lazy_engine_flow(), _dump_engine_object(options))
364+
365+
async def wait(self):
366+
"""
367+
Wait for the live updater to finish.
368+
"""
369+
return await self._engine_live_updater.wait()
370+
371+
def abort(self):
372+
"""
373+
Abort the live updater.
374+
"""
375+
self._engine_live_updater.abort()
376+
377+
def update_stats(self) -> _engine.IndexUpdateInfo:
378+
"""
379+
Get the index update info.
380+
"""
381+
return self._engine_live_updater.index_update_info()
382+
383+
347384
@dataclass
348385
class EvaluateAndDumpOptions:
349386
"""
@@ -388,7 +425,9 @@ async def update(self):
388425
Update the index defined by the flow.
389426
Once the function returns, the indice is fresh up to the moment when the function is called.
390427
"""
391-
return await self._lazy_engine_flow().update()
428+
updater = FlowLiveUpdater(self, FlowLiveUpdaterOptions(live_mode=False))
429+
await updater.wait()
430+
return updater.update_stats()
392431

393432
def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
394433
"""
@@ -441,6 +480,13 @@ def flow_names() -> list[str]:
441480
with _flows_lock:
442481
return list(_flows.keys())
443482

483+
def flows() -> list[Flow]:
484+
"""
485+
Get all flows.
486+
"""
487+
with _flows_lock:
488+
return list(_flows.values())
489+
444490
def flow_by_name(name: str) -> Flow:
445491
"""
446492
Get a flow by name.

src/execution/synchronizer.rs renamed to src/execution/live_updater.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,26 @@ use super::stats;
66
use sqlx::PgPool;
77
use tokio::task::JoinSet;
88

9-
pub struct FlowSynchronizer {
9+
pub struct FlowLiveUpdater {
1010
flow_ctx: Arc<FlowContext>,
1111
tasks: JoinSet<Result<()>>,
1212
sources_update_stats: Vec<Arc<stats::UpdateStats>>,
1313
}
1414

15-
pub struct FlowSynchronizerOptions {
16-
pub keep_refreshed: bool,
15+
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
16+
pub struct FlowLiveUpdaterOptions {
17+
/// If true, the updater will keep refreshing the index.
18+
/// Otherwise, it will only apply changes from the source up to the current time.
19+
pub live_mode: bool,
1720
}
1821

19-
async fn sync_source(
22+
async fn update_source(
2023
flow_ctx: Arc<FlowContext>,
2124
plan: Arc<plan::ExecutionPlan>,
2225
source_update_stats: Arc<stats::UpdateStats>,
2326
source_idx: usize,
2427
pool: PgPool,
25-
keep_refreshed: bool,
28+
live_mode: bool,
2629
) -> Result<()> {
2730
let source_context = flow_ctx
2831
.get_source_indexing_context(source_idx, &pool)
@@ -32,8 +35,7 @@ async fn sync_source(
3235
source_context.update(&pool, &source_update_stats).await?;
3336

3437
let import_op = &plan.import_ops[source_idx];
35-
if let (true, Some(refresh_interval)) =
36-
(keep_refreshed, import_op.refresh_options.refresh_interval)
38+
if let (true, Some(refresh_interval)) = (live_mode, import_op.refresh_options.refresh_interval)
3739
{
3840
loop {
3941
let elapsed = update_start.elapsed();
@@ -47,25 +49,25 @@ async fn sync_source(
4749
Ok(())
4850
}
4951

50-
impl FlowSynchronizer {
52+
impl FlowLiveUpdater {
5153
pub async fn start(
5254
flow_ctx: Arc<FlowContext>,
5355
pool: &PgPool,
54-
options: &FlowSynchronizerOptions,
56+
options: FlowLiveUpdaterOptions,
5557
) -> Result<Self> {
5658
let plan = flow_ctx.flow.get_execution_plan().await?;
5759

5860
let mut tasks = JoinSet::new();
5961
let sources_update_stats = (0..plan.import_ops.len())
6062
.map(|source_idx| {
6163
let source_update_stats = Arc::new(stats::UpdateStats::default());
62-
tasks.spawn(sync_source(
64+
tasks.spawn(update_source(
6365
flow_ctx.clone(),
6466
plan.clone(),
6567
source_update_stats.clone(),
6668
source_idx,
6769
pool.clone(),
68-
options.keep_refreshed,
70+
options.live_mode,
6971
));
7072
source_update_stats
7173
})
@@ -77,10 +79,10 @@ impl FlowSynchronizer {
7779
})
7880
}
7981

80-
pub async fn join(&mut self) -> Result<()> {
82+
pub async fn wait(&mut self) -> Result<()> {
8183
while let Some(result) = self.tasks.join_next().await {
8284
if let Err(e) = (|| anyhow::Ok(result??))() {
83-
error!("{:?}", e.context("Error in synchronizing a source"));
85+
error!("{:?}", e.context("Error in applying changes from a source"));
8486
}
8587
}
8688
Ok(())

src/execution/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub(crate) mod row_indexer;
77
pub(crate) mod source_indexer;
88
pub(crate) mod stats;
99

10-
mod synchronizer;
11-
pub(crate) use synchronizer::*;
10+
mod live_updater;
11+
pub(crate) use live_updater::*;
1212

1313
mod db_tracking;

src/py/mod.rs

Lines changed: 35 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::server::{self, ServerSettings};
1010
use crate::settings::Settings;
1111
use crate::setup;
1212
use pyo3::{exceptions::PyException, prelude::*};
13+
use pyo3_async_runtimes::tokio::future_into_py;
1314
use std::collections::btree_map;
1415

1516
mod convert;
@@ -82,29 +83,51 @@ impl IndexUpdateInfo {
8283
pub struct Flow(pub Arc<FlowContext>);
8384

8485
#[pyclass]
85-
pub struct FlowSynchronizer(pub Arc<tokio::sync::RwLock<execution::FlowSynchronizer>>);
86+
pub struct FlowLiveUpdater(pub Arc<tokio::sync::RwLock<execution::FlowLiveUpdater>>);
8687

8788
#[pymethods]
88-
impl FlowSynchronizer {
89-
pub fn join<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
90-
let synchronizer = self.0.clone();
91-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
92-
let mut synchronizer = synchronizer.write().await;
93-
synchronizer.join().await.into_py_result()
89+
impl FlowLiveUpdater {
90+
#[new]
91+
pub fn new(
92+
py: Python<'_>,
93+
flow: &Flow,
94+
options: Pythonized<execution::FlowLiveUpdaterOptions>,
95+
) -> PyResult<Self> {
96+
py.allow_threads(|| {
97+
let live_updater = get_runtime()
98+
.block_on(async {
99+
let live_updater = execution::FlowLiveUpdater::start(
100+
flow.0.clone(),
101+
&get_lib_context()?.pool,
102+
options.into_inner(),
103+
)
104+
.await?;
105+
anyhow::Ok(live_updater)
106+
})
107+
.into_py_result()?;
108+
Ok(Self(Arc::new(tokio::sync::RwLock::new(live_updater))))
109+
})
110+
}
111+
112+
pub fn wait<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
113+
let live_updater = self.0.clone();
114+
future_into_py(py, async move {
115+
let mut live_updater = live_updater.write().await;
116+
live_updater.wait().await.into_py_result()
94117
})
95118
}
96119

97120
pub fn abort(&self, py: Python<'_>) {
98121
py.allow_threads(|| {
99-
let mut synchronizer = self.0.blocking_write();
100-
synchronizer.abort();
122+
let mut live_updater = self.0.blocking_write();
123+
live_updater.abort();
101124
})
102125
}
103126

104127
pub fn index_update_info(&self, py: Python<'_>) -> IndexUpdateInfo {
105128
py.allow_threads(|| {
106-
let synchronizer = self.0.blocking_read();
107-
IndexUpdateInfo(synchronizer.index_update_info())
129+
let live_updater = self.0.blocking_read();
130+
IndexUpdateInfo(live_updater.index_update_info())
108131
})
109132
}
110133
}
@@ -123,47 +146,6 @@ impl Flow {
123146
&self.0.flow.flow_instance.name
124147
}
125148

126-
pub fn update<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
127-
let flow_ctx = self.0.clone();
128-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
129-
let update_info = {
130-
let mut synchronizer = execution::FlowSynchronizer::start(
131-
flow_ctx,
132-
&get_lib_context().into_py_result()?.pool,
133-
&execution::FlowSynchronizerOptions {
134-
keep_refreshed: false,
135-
},
136-
)
137-
.await
138-
.into_py_result()?;
139-
synchronizer.join().await.into_py_result()?;
140-
synchronizer.index_update_info()
141-
};
142-
Ok(IndexUpdateInfo(update_info))
143-
})
144-
}
145-
146-
pub fn keep_in_sync(&self, py: Python<'_>) -> PyResult<FlowSynchronizer> {
147-
py.allow_threads(|| {
148-
let synchronizer = get_runtime()
149-
.block_on(async {
150-
let synchronizer = execution::FlowSynchronizer::start(
151-
self.0.clone(),
152-
&get_lib_context()?.pool,
153-
&execution::FlowSynchronizerOptions {
154-
keep_refreshed: false,
155-
},
156-
)
157-
.await?;
158-
anyhow::Ok(synchronizer)
159-
})
160-
.into_py_result()?;
161-
Ok(FlowSynchronizer(Arc::new(tokio::sync::RwLock::new(
162-
synchronizer,
163-
))))
164-
})
165-
}
166-
167149
pub fn evaluate_and_dump(
168150
&self,
169151
py: Python<'_>,
@@ -340,7 +322,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
340322
m.add_class::<builder::flow_builder::DataSlice>()?;
341323
m.add_class::<builder::flow_builder::DataScopeRef>()?;
342324
m.add_class::<Flow>()?;
343-
m.add_class::<FlowSynchronizer>()?;
325+
m.add_class::<FlowLiveUpdater>()?;
344326
m.add_class::<TransientFlow>()?;
345327
m.add_class::<IndexUpdateInfo>()?;
346328
m.add_class::<SimpleSemanticsQueryHandler>()?;

src/service/flows.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,12 @@ pub async fn update(
171171
State(lib_context): State<Arc<LibContext>>,
172172
) -> Result<Json<stats::IndexUpdateInfo>, ApiError> {
173173
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
174-
let mut synchronizer = execution::FlowSynchronizer::start(
174+
let mut live_updater = execution::FlowLiveUpdater::start(
175175
flow_ctx.clone(),
176176
&lib_context.pool,
177-
&execution::FlowSynchronizerOptions {
178-
keep_refreshed: false,
179-
},
177+
execution::FlowLiveUpdaterOptions { live_mode: false },
180178
)
181179
.await?;
182-
synchronizer.join().await?;
183-
Ok(Json(synchronizer.index_update_info()))
180+
live_updater.wait().await?;
181+
Ok(Json(live_updater.index_update_info()))
184182
}

0 commit comments

Comments
 (0)