Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,24 @@ def setup(delete_legacy_flows):

@cli.command()
@click.argument("flow_name", type=str, required=False)
def update(flow_name: str | None):
@click.option(
"-L", "--live", is_flag=True, show_default=True, default=False,
help="If true, it will continuously watch changes from data sources and apply to the target index.")
def update(flow_name: str | None, live: bool):
"""
Update the index defined by the flow.
Update the index to reflect the latest data from data sources.
"""
stats = asyncio.run(_flow_by_name(flow_name).update())
print(stats)
async def _update_all():
async def _update_flow(fl: flow.Flow):
updater = flow.FlowLiveUpdater(fl, flow.FlowLiveUpdaterOptions(live_mode=live))
await updater.wait()
print(f"Updated index for {fl.name}:")
for line in str(updater.update_stats()).split("\n"):
if line := line.strip():
print(f" {line}")
print()
await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name)))
asyncio.run(_update_all())

@cli.command()
@click.argument("flow_name", type=str, required=False)
Expand Down Expand Up @@ -112,3 +124,9 @@ def _flow_name(name: str | None) -> str:

def _flow_by_name(name: str | None) -> flow.Flow:
return flow.flow_by_name(_flow_name(name))

def _flows_by_name(name: str | None) -> list[flow.Flow]:
if name is None:
return flow.flows()
else:
return [flow.flow_by_name(name)]
48 changes: 47 additions & 1 deletion python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,43 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
),
name
)

@dataclass
class FlowLiveUpdaterOptions:
"""
Options for live updating a flow.
"""
live_mode: bool = False

class FlowLiveUpdater:
"""
A live updater for a flow.
"""
_engine_live_updater: _engine.FlowLiveUpdater

def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions):
self._engine_live_updater = _engine.FlowLiveUpdater(
fl._lazy_engine_flow(), _dump_engine_object(options))

async def wait(self):
"""
Wait for the live updater to finish.
"""
return await self._engine_live_updater.wait()

def abort(self):
"""
Abort the live updater.
"""
self._engine_live_updater.abort()

def update_stats(self) -> _engine.IndexUpdateInfo:
"""
Get the index update info.
"""
return self._engine_live_updater.index_update_info()


@dataclass
class EvaluateAndDumpOptions:
"""
Expand Down Expand Up @@ -388,7 +425,9 @@ async def update(self):
Update the index defined by the flow.
Once the function returns, the indice is fresh up to the moment when the function is called.
"""
return await self._lazy_engine_flow().update()
updater = FlowLiveUpdater(self, FlowLiveUpdaterOptions(live_mode=False))
await updater.wait()
return updater.update_stats()

def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
"""
Expand Down Expand Up @@ -441,6 +480,13 @@ def flow_names() -> list[str]:
with _flows_lock:
return list(_flows.keys())

def flows() -> list[Flow]:
"""
Get all flows.
"""
with _flows_lock:
return list(_flows.values())

def flow_by_name(name: str) -> Flow:
"""
Get a flow by name.
Expand Down
28 changes: 15 additions & 13 deletions src/execution/synchronizer.rs → src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ use super::stats;
use sqlx::PgPool;
use tokio::task::JoinSet;

pub struct FlowSynchronizer {
pub struct FlowLiveUpdater {
flow_ctx: Arc<FlowContext>,
tasks: JoinSet<Result<()>>,
sources_update_stats: Vec<Arc<stats::UpdateStats>>,
}

pub struct FlowSynchronizerOptions {
pub keep_refreshed: bool,
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FlowLiveUpdaterOptions {
/// If true, the updater will keep refreshing the index.
/// Otherwise, it will only apply changes from the source up to the current time.
pub live_mode: bool,
}

async fn sync_source(
async fn update_source(
flow_ctx: Arc<FlowContext>,
plan: Arc<plan::ExecutionPlan>,
source_update_stats: Arc<stats::UpdateStats>,
source_idx: usize,
pool: PgPool,
keep_refreshed: bool,
live_mode: bool,
) -> Result<()> {
let source_context = flow_ctx
.get_source_indexing_context(source_idx, &pool)
Expand All @@ -32,8 +35,7 @@ async fn sync_source(
source_context.update(&pool, &source_update_stats).await?;

let import_op = &plan.import_ops[source_idx];
if let (true, Some(refresh_interval)) =
(keep_refreshed, import_op.refresh_options.refresh_interval)
if let (true, Some(refresh_interval)) = (live_mode, import_op.refresh_options.refresh_interval)
{
loop {
let elapsed = update_start.elapsed();
Expand All @@ -47,25 +49,25 @@ async fn sync_source(
Ok(())
}

impl FlowSynchronizer {
impl FlowLiveUpdater {
pub async fn start(
flow_ctx: Arc<FlowContext>,
pool: &PgPool,
options: &FlowSynchronizerOptions,
options: FlowLiveUpdaterOptions,
) -> Result<Self> {
let plan = flow_ctx.flow.get_execution_plan().await?;

let mut tasks = JoinSet::new();
let sources_update_stats = (0..plan.import_ops.len())
.map(|source_idx| {
let source_update_stats = Arc::new(stats::UpdateStats::default());
tasks.spawn(sync_source(
tasks.spawn(update_source(
flow_ctx.clone(),
plan.clone(),
source_update_stats.clone(),
source_idx,
pool.clone(),
options.keep_refreshed,
options.live_mode,
));
source_update_stats
})
Expand All @@ -77,10 +79,10 @@ impl FlowSynchronizer {
})
}

pub async fn join(&mut self) -> Result<()> {
pub async fn wait(&mut self) -> Result<()> {
while let Some(result) = self.tasks.join_next().await {
if let Err(e) = (|| anyhow::Ok(result??))() {
error!("{:?}", e.context("Error in synchronizing a source"));
error!("{:?}", e.context("Error in applying changes from a source"));
}
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub(crate) mod row_indexer;
pub(crate) mod source_indexer;
pub(crate) mod stats;

mod synchronizer;
pub(crate) use synchronizer::*;
mod live_updater;
pub(crate) use live_updater::*;

mod db_tracking;
88 changes: 35 additions & 53 deletions src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::server::{self, ServerSettings};
use crate::settings::Settings;
use crate::setup;
use pyo3::{exceptions::PyException, prelude::*};
use pyo3_async_runtimes::tokio::future_into_py;
use std::collections::btree_map;

mod convert;
Expand Down Expand Up @@ -82,29 +83,51 @@ impl IndexUpdateInfo {
pub struct Flow(pub Arc<FlowContext>);

#[pyclass]
pub struct FlowSynchronizer(pub Arc<tokio::sync::RwLock<execution::FlowSynchronizer>>);
pub struct FlowLiveUpdater(pub Arc<tokio::sync::RwLock<execution::FlowLiveUpdater>>);

#[pymethods]
impl FlowSynchronizer {
pub fn join<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let synchronizer = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let mut synchronizer = synchronizer.write().await;
synchronizer.join().await.into_py_result()
impl FlowLiveUpdater {
#[new]
pub fn new(
py: Python<'_>,
flow: &Flow,
options: Pythonized<execution::FlowLiveUpdaterOptions>,
) -> PyResult<Self> {
py.allow_threads(|| {
let live_updater = get_runtime()
.block_on(async {
let live_updater = execution::FlowLiveUpdater::start(
flow.0.clone(),
&get_lib_context()?.pool,
options.into_inner(),
)
.await?;
anyhow::Ok(live_updater)
})
.into_py_result()?;
Ok(Self(Arc::new(tokio::sync::RwLock::new(live_updater))))
})
}

pub fn wait<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let live_updater = self.0.clone();
future_into_py(py, async move {
let mut live_updater = live_updater.write().await;
live_updater.wait().await.into_py_result()
})
}

pub fn abort(&self, py: Python<'_>) {
py.allow_threads(|| {
let mut synchronizer = self.0.blocking_write();
synchronizer.abort();
let mut live_updater = self.0.blocking_write();
live_updater.abort();
})
}

pub fn index_update_info(&self, py: Python<'_>) -> IndexUpdateInfo {
py.allow_threads(|| {
let synchronizer = self.0.blocking_read();
IndexUpdateInfo(synchronizer.index_update_info())
let live_updater = self.0.blocking_read();
IndexUpdateInfo(live_updater.index_update_info())
})
}
}
Expand All @@ -123,47 +146,6 @@ impl Flow {
&self.0.flow.flow_instance.name
}

pub fn update<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let flow_ctx = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let update_info = {
let mut synchronizer = execution::FlowSynchronizer::start(
flow_ctx,
&get_lib_context().into_py_result()?.pool,
&execution::FlowSynchronizerOptions {
keep_refreshed: false,
},
)
.await
.into_py_result()?;
synchronizer.join().await.into_py_result()?;
synchronizer.index_update_info()
};
Ok(IndexUpdateInfo(update_info))
})
}

pub fn keep_in_sync(&self, py: Python<'_>) -> PyResult<FlowSynchronizer> {
py.allow_threads(|| {
let synchronizer = get_runtime()
.block_on(async {
let synchronizer = execution::FlowSynchronizer::start(
self.0.clone(),
&get_lib_context()?.pool,
&execution::FlowSynchronizerOptions {
keep_refreshed: false,
},
)
.await?;
anyhow::Ok(synchronizer)
})
.into_py_result()?;
Ok(FlowSynchronizer(Arc::new(tokio::sync::RwLock::new(
synchronizer,
))))
})
}

pub fn evaluate_and_dump(
&self,
py: Python<'_>,
Expand Down Expand Up @@ -340,7 +322,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<builder::flow_builder::DataSlice>()?;
m.add_class::<builder::flow_builder::DataScopeRef>()?;
m.add_class::<Flow>()?;
m.add_class::<FlowSynchronizer>()?;
m.add_class::<FlowLiveUpdater>()?;
m.add_class::<TransientFlow>()?;
m.add_class::<IndexUpdateInfo>()?;
m.add_class::<SimpleSemanticsQueryHandler>()?;
Expand Down
10 changes: 4 additions & 6 deletions src/service/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,12 @@ pub async fn update(
State(lib_context): State<Arc<LibContext>>,
) -> Result<Json<stats::IndexUpdateInfo>, ApiError> {
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
let mut synchronizer = execution::FlowSynchronizer::start(
let mut live_updater = execution::FlowLiveUpdater::start(
flow_ctx.clone(),
&lib_context.pool,
&execution::FlowSynchronizerOptions {
keep_refreshed: false,
},
execution::FlowLiveUpdaterOptions { live_mode: false },
)
.await?;
synchronizer.join().await?;
Ok(Json(synchronizer.index_update_info()))
live_updater.wait().await?;
Ok(Json(live_updater.index_update_info()))
}