Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::ops::Deref;

use serde::{Deserialize, Serialize};
use crate::prelude::*;

use super::schema::{EnrichedValueType, FieldSchema};
use std::ops::Deref;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
Expand Down Expand Up @@ -163,9 +162,17 @@ pub struct OpSpec {
pub spec: serde_json::Map<String, serde_json::Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SourceRefreshOptions {
pub refresh_interval: Option<std::time::Duration>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImportOpSpec {
pub source: OpSpec,

#[serde(default)]
pub refresh_options: SourceRefreshOptions,
}

/// Transform data using a given operator.
Expand Down
1 change: 1 addition & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ impl AnalyzerContext<'_> {
.typ
.clone(),
name: op_name,
refresh_options: import_op.spec.refresh_options,
})
};
Ok(result_fut)
Expand Down
4 changes: 3 additions & 1 deletion src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,14 @@ impl FlowBuilder {
self.root_data_scope_ref.clone()
}

#[pyo3(signature = (kind, op_spec, target_scope, name))]
#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None))]
pub fn add_source(
&mut self,
kind: String,
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
target_scope: Option<DataScopeRef>,
name: String,
refresh_options: Option<py::Pythonized<spec::SourceRefreshOptions>>,
) -> PyResult<DataSlice> {
if let Some(target_scope) = target_scope {
if !Arc::ptr_eq(&target_scope.0, &self.root_data_scope_ref.0) {
Expand All @@ -402,6 +403,7 @@ impl FlowBuilder {
kind,
spec: op_spec.into_inner(),
},
refresh_options: refresh_options.map(|o| o.into_inner()).unwrap_or_default(),
},
};
let analyzer_ctx = AnalyzerContext {
Expand Down
13 changes: 5 additions & 8 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use std::sync::Arc;
use crate::prelude::*;

use serde::Serialize;

use crate::base::schema::ValueType;
use crate::base::value;
use crate::execution::db_tracking_setup;
use crate::ops::interface::*;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
Expand Down Expand Up @@ -60,7 +56,8 @@ pub struct AnalyzedImportOp {
pub source_id: i32,
pub executor: Box<dyn SourceExecutor>,
pub output: AnalyzedOpOutput,
pub primary_key_type: ValueType,
pub primary_key_type: schema::ValueType,
pub refresh_options: spec::SourceRefreshOptions,
}

pub struct AnalyzedFunctionExecInfo {
Expand All @@ -69,7 +66,7 @@ pub struct AnalyzedFunctionExecInfo {

/// Fingerprinter of the function's behavior.
pub fingerprinter: Fingerprinter,
pub output_type: ValueType,
pub output_type: schema::ValueType,
}

pub struct AnalyzedTransformOp {
Expand Down Expand Up @@ -106,7 +103,7 @@ pub struct AnalyzedExportOp {
pub executor: Arc<dyn ExportTargetExecutor>,
pub query_target: Option<Arc<dyn QueryTarget>>,
pub primary_key_def: AnalyzedPrimaryKeyDef,
pub primary_key_type: ValueType,
pub primary_key_type: schema::ValueType,
/// idx for value fields - excluding the primary key field.
pub value_fields: Vec<u32>,
/// If true, value is never changed on the same primary key.
Expand Down
3 changes: 3 additions & 0 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ pub(crate) mod row_indexer;
pub(crate) mod source_indexer;
pub(crate) mod stats;

mod synchronizer;
pub(crate) use synchronizer::*;

mod db_tracking;
39 changes: 11 additions & 28 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::collections::{hash_map, HashMap};

use crate::prelude::*;

use sqlx::PgPool;
use std::collections::{hash_map, HashMap};
use tokio::{sync::Semaphore, task::JoinSet};

use super::{
db_tracking,
row_indexer::{self, SkippedOr, SourceVersion},
stats,
};
use futures::future::try_join_all;
use sqlx::PgPool;
use tokio::{sync::Semaphore, task::JoinSet};
struct SourceRowIndexingState {
source_version: SourceVersion,
processing_sem: Arc<Semaphore>,
Expand Down Expand Up @@ -190,7 +189,11 @@ impl SourceIndexingContext {
);
}

async fn update_source(self: &Arc<Self>, pool: &PgPool) -> Result<stats::SourceUpdateInfo> {
pub async fn update(
self: &Arc<Self>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
) -> Result<()> {
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
let mut rows_stream = import_op
Expand All @@ -204,13 +207,12 @@ impl SourceIndexingContext {
state.scan_generation += 1;
state.scan_generation
};
let update_stats = Arc::new(stats::UpdateStats::default());
while let Some(row) = rows_stream.next().await {
for row in row? {
self.process_source_key_if_newer(
row.key,
SourceVersion::from_current(row.ordinal),
&update_stats,
update_stats,
pool,
&mut join_set,
);
Expand Down Expand Up @@ -252,25 +254,6 @@ impl SourceIndexingContext {
}
}

Ok(stats::SourceUpdateInfo {
source_name: import_op.name.clone(),
stats: Arc::unwrap_or_clone(update_stats),
})
Ok(())
}
}

pub async fn update(flow_context: &FlowContext, pool: &PgPool) -> Result<stats::IndexUpdateInfo> {
let plan = flow_context.flow.get_execution_plan().await?;
let source_update_stats = try_join_all(
(0..plan.import_ops.len())
.map(|idx| async move {
let source_context = flow_context.get_source_indexing_context(idx, pool).await?;
source_context.update_source(pool).await
})
.collect::<Vec<_>>(),
)
.await?;
Ok(stats::IndexUpdateInfo {
sources: source_update_stats,
})
}
106 changes: 106 additions & 0 deletions src/execution/synchronizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::time::Instant;

use crate::prelude::*;

use super::stats;
use sqlx::PgPool;
use tokio::task::JoinSet;

pub struct FlowSynchronizer {
flow_ctx: Arc<FlowContext>,
tasks: JoinSet<Result<()>>,
sources_update_stats: Vec<Arc<stats::UpdateStats>>,
}

pub struct FlowSynchronizerOptions {
pub keep_refreshed: bool,
}

async fn sync_source(
flow_ctx: Arc<FlowContext>,
plan: Arc<plan::ExecutionPlan>,
source_update_stats: Arc<stats::UpdateStats>,
source_idx: usize,
pool: PgPool,
keep_refreshed: bool,
) -> Result<()> {
let source_context = flow_ctx
.get_source_indexing_context(source_idx, &pool)
.await?;

let mut update_start = Instant::now();
source_context.update(&pool, &source_update_stats).await?;

let import_op = &plan.import_ops[source_idx];
if let (true, Some(refresh_interval)) =
(keep_refreshed, import_op.refresh_options.refresh_interval)
{
loop {
let elapsed = update_start.elapsed();
if elapsed < refresh_interval {
tokio::time::sleep(refresh_interval - elapsed).await;
}
update_start = Instant::now();
source_context.update(&pool, &source_update_stats).await?;
}
}
Ok(())
}

impl FlowSynchronizer {
pub async fn start(
flow_ctx: Arc<FlowContext>,
pool: &PgPool,
options: &FlowSynchronizerOptions,
) -> Result<Self> {
let plan = flow_ctx.flow.get_execution_plan().await?;

let mut tasks = JoinSet::new();
let sources_update_stats = (0..plan.import_ops.len())
.map(|source_idx| {
let source_update_stats = Arc::new(stats::UpdateStats::default());
tasks.spawn(sync_source(
flow_ctx.clone(),
plan.clone(),
source_update_stats.clone(),
source_idx,
pool.clone(),
options.keep_refreshed,
));
source_update_stats
})
.collect();
Ok(Self {
flow_ctx,
tasks,
sources_update_stats,
})
}

pub async fn join(&mut self) -> Result<()> {
while let Some(result) = self.tasks.join_next().await {
if let Err(e) = (|| anyhow::Ok(result??))() {
error!("{:?}", e.context("Error in synchronizing a source"));
}
}
Ok(())
}

pub fn abort(&mut self) {
self.tasks.abort_all();
}

pub fn index_update_info(&self) -> stats::IndexUpdateInfo {
stats::IndexUpdateInfo {
sources: std::iter::zip(
self.flow_ctx.flow.flow_instance.import_ops.iter(),
self.sources_update_stats.iter(),
)
.map(|(import_op, stats)| stats::SourceUpdateInfo {
source_name: import_op.name.clone(),
stats: (&**stats).clone(),
})
.collect(),
}
}
}
80 changes: 79 additions & 1 deletion src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,52 @@ impl IndexUpdateInfo {
#[pyclass]
pub struct Flow(pub Arc<FlowContext>);

#[pyclass]
pub struct FlowSynchronizer(pub async_lock::RwLock<execution::FlowSynchronizer>);

#[pymethods]
impl FlowSynchronizer {
pub fn join(&self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
lib_context
.runtime
.block_on(async {
let mut synchronizer = self.0.write().await;
synchronizer.join().await
})
.into_py_result()
})
}

pub fn abort(&self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
lib_context.runtime.block_on(async {
let mut synchronizer = self.0.write().await;
synchronizer.abort();
});
Ok(())
})
}

pub fn index_update_info(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
py.allow_threads(|| {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
lib_context
.runtime
.block_on(async {
let synchronizer = self.0.read().await;
anyhow::Ok(IndexUpdateInfo(synchronizer.index_update_info()))
})
.into_py_result()
})
}
}

#[pymethods]
impl Flow {
pub fn __str__(&self) -> String {
Expand All @@ -113,13 +159,44 @@ impl Flow {
let update_info = lib_context
.runtime
.block_on(async {
execution::source_indexer::update(&self.0, &lib_context.pool).await
let mut synchronizer = execution::FlowSynchronizer::start(
self.0.clone(),
&lib_context.pool,
&execution::FlowSynchronizerOptions {
keep_refreshed: false,
},
)
.await?;
synchronizer.join().await?;
anyhow::Ok(synchronizer.index_update_info())
})
.into_py_result()?;
Ok(IndexUpdateInfo(update_info))
})
}

pub fn keep_in_sync(&self, py: Python<'_>) -> PyResult<FlowSynchronizer> {
py.allow_threads(|| {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
let synchronizer = lib_context
.runtime
.block_on(async {
let synchronizer = execution::FlowSynchronizer::start(
self.0.clone(),
&lib_context.pool,
&execution::FlowSynchronizerOptions {
keep_refreshed: false,
},
)
.await?;
anyhow::Ok(synchronizer)
})
.into_py_result()?;
Ok(FlowSynchronizer(async_lock::RwLock::new(synchronizer)))
})
}

pub fn evaluate_and_dump(
&self,
py: Python<'_>,
Expand Down Expand Up @@ -308,6 +385,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<builder::flow_builder::DataSlice>()?;
m.add_class::<builder::flow_builder::DataScopeRef>()?;
m.add_class::<Flow>()?;
m.add_class::<FlowSynchronizer>()?;
m.add_class::<TransientFlow>()?;
m.add_class::<IndexUpdateInfo>()?;
m.add_class::<SimpleSemanticsQueryHandler>()?;
Expand Down
Loading