Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use anyhow::{anyhow, bail, Result};
use crate::prelude::*;

use pyo3::{exceptions::PyException, prelude::*};
use std::{
collections::{btree_map, hash_map::Entry, HashMap},
ops::Deref,
sync::{Arc, Mutex, Weak},
};

use super::analyzer::{
build_flow_instance_context, AnalyzerContext, CollectorBuilder, DataScopeBuilder,
ExecutionScope, ValueTypeBuilder,
};
use crate::{
api_bail,
base::{
schema::{self, CollectorSchema, FieldSchema},
spec::{self, FieldName, NamedSpec},
schema::{CollectorSchema, FieldSchema},
spec::{FieldName, NamedSpec},
},
get_lib_context,
lib_context::LibContext,
Expand Down Expand Up @@ -649,21 +648,21 @@ impl FlowBuilder {
))
})
.into_py_result()?;
let analyzed_flow = Arc::new(analyzed_flow);

let mut analyzed_flows = self.lib_context.flows.write().unwrap();
match analyzed_flows.entry(self.flow_instance_name.clone()) {
let mut flow_ctxs = self.lib_context.flows.lock().unwrap();
let flow_ctx = match flow_ctxs.entry(self.flow_instance_name.clone()) {
btree_map::Entry::Occupied(_) => {
return Err(PyException::new_err(format!(
"flow instance name already exists: {}",
self.flow_instance_name
)));
}
btree_map::Entry::Vacant(entry) => {
entry.insert(FlowContext::new(analyzed_flow.clone()));
let flow_ctx = Arc::new(FlowContext::new(Arc::new(analyzed_flow)));
entry.insert(flow_ctx.clone());
flow_ctx
}
}
Ok(py::Flow(analyzed_flow))
};
Ok(py::Flow(flow_ctx))
}

pub fn build_transient_flow(&self, py: Python<'_>) -> PyResult<py::TransientFlow> {
Expand Down
29 changes: 13 additions & 16 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use super::{
use futures::future::try_join_all;
use sqlx::PgPool;
use tokio::{sync::Semaphore, task::JoinSet};
struct SourceRowState {
struct SourceRowIndexingState {
source_version: SourceVersion,
processing_sem: Arc<Semaphore>,
touched_generation: usize,
}

impl Default for SourceRowState {
impl Default for SourceRowIndexingState {
fn default() -> Self {
Self {
source_version: SourceVersion::default(),
Expand All @@ -26,17 +26,17 @@ impl Default for SourceRowState {
}
}

struct SourceState {
rows: HashMap<value::KeyValue, SourceRowState>,
struct SourceIndexingState {
rows: HashMap<value::KeyValue, SourceRowIndexingState>,
scan_generation: usize,
}
pub struct SourceContext {
pub struct SourceIndexingContext {
flow: Arc<builder::AnalyzedFlow>,
source_idx: usize,
state: Mutex<SourceState>,
state: Mutex<SourceIndexingState>,
}

impl SourceContext {
impl SourceIndexingContext {
pub async fn load(
flow: Arc<builder::AnalyzedFlow>,
source_idx: usize,
Expand All @@ -58,7 +58,7 @@ impl SourceContext {
.into_key()?;
rows.insert(
source_key,
SourceRowState {
SourceRowIndexingState {
source_version: SourceVersion::from_stored(
key_metadata.processed_source_ordinal,
&key_metadata.process_logic_fingerprint,
Expand All @@ -72,7 +72,7 @@ impl SourceContext {
Ok(Self {
flow,
source_idx,
state: Mutex::new(SourceState {
state: Mutex::new(SourceIndexingState {
rows,
scan_generation,
}),
Expand Down Expand Up @@ -144,7 +144,7 @@ impl SourceContext {
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert(SourceRowState {
entry.insert(SourceRowIndexingState {
source_version: target_source_version,
touched_generation: scan_generation,
..Default::default()
Expand Down Expand Up @@ -259,15 +259,12 @@ impl SourceContext {
}
}

pub async fn update(
flow: &Arc<builder::AnalyzedFlow>,
pool: &PgPool,
) -> Result<stats::IndexUpdateInfo> {
let plan = flow.get_execution_plan().await?;
pub async fn update(flow_context: &FlowContext, pool: &PgPool) -> Result<stats::IndexUpdateInfo> {
let plan = flow_context.flow.get_execution_plan().await?;
let source_update_stats = try_join_all(
(0..plan.source_ops.len())
.map(|idx| async move {
let source_context = Arc::new(SourceContext::load(flow.clone(), idx, pool).await?);
let source_context = flow_context.get_source_indexing_context(idx, pool).await?;
source_context.update_source(pool).await
})
.collect::<Vec<_>>(),
Expand Down
71 changes: 53 additions & 18 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,85 @@
use crate::prelude::*;

use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

use crate::execution::source_indexer::SourceIndexingContext;
use crate::service::error::ApiError;
use crate::settings;
use crate::setup;
use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler};
use anyhow::Result;
use async_lock::OnceCell;
use axum::http::StatusCode;
use sqlx::PgPool;
use tokio::runtime::Runtime;

pub struct FlowContext {
pub flow: Arc<AnalyzedFlow>,
pub query_handlers: BTreeMap<String, Arc<SimpleSemanticsQueryHandler>>,
pub source_indexing_contexts: Vec<OnceCell<Arc<SourceIndexingContext>>>,
pub query_handlers: Mutex<BTreeMap<String, Arc<SimpleSemanticsQueryHandler>>>,
}

impl FlowContext {
pub fn new(flow: Arc<AnalyzedFlow>) -> Self {
let mut source_indexing_contexts = Vec::new();
source_indexing_contexts
.resize_with(flow.flow_instance.source_ops.len(), || OnceCell::new());
Self {
flow,
query_handlers: BTreeMap::new(),
source_indexing_contexts,
query_handlers: Mutex::new(BTreeMap::new()),
}
}

pub async fn get_source_indexing_context(
&self,
source_idx: usize,
pool: &PgPool,
) -> Result<&Arc<SourceIndexingContext>> {
self.source_indexing_contexts[source_idx]
.get_or_try_init(|| async move {
Ok(Arc::new(
SourceIndexingContext::load(self.flow.clone(), source_idx, pool).await?,
))
})
.await
}

pub fn get_query_handler(&self, name: &str) -> Result<Arc<SimpleSemanticsQueryHandler>> {
let query_handlers = self.query_handlers.lock().unwrap();
let query_handler = query_handlers
.get(name)
.ok_or_else(|| {
ApiError::new(
&format!("Query handler not found: {name}"),
StatusCode::NOT_FOUND,
)
})?
.clone();
Ok(query_handler)
}
}

pub struct LibContext {
pub runtime: Runtime,
pub pool: PgPool,
pub flows: RwLock<BTreeMap<String, FlowContext>>,
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
pub combined_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
}

impl LibContext {
pub fn with_flow_context<R>(
&self,
flow_name: &str,
f: impl FnOnce(&FlowContext) -> R,
) -> Result<R, ApiError> {
let flows = self.flows.read().unwrap();
let flow_context = flows.get(flow_name).ok_or_else(|| {
ApiError::new(
&format!("Flow instance not found: {flow_name}"),
StatusCode::NOT_FOUND,
)
})?;
Ok(f(flow_context))
pub fn get_flow_context(&self, flow_name: &str) -> Result<Arc<FlowContext>> {
let flows = self.flows.lock().unwrap();
let flow_ctx = flows
.get(flow_name)
.ok_or_else(|| {
ApiError::new(
&format!("Flow instance not found: {flow_name}"),
StatusCode::NOT_FOUND,
)
})?
.clone();
Ok(flow_ctx)
}
}

Expand All @@ -62,6 +97,6 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
runtime,
pool,
combined_setup_states: RwLock::new(all_css),
flows: RwLock::new(BTreeMap::new()),
flows: Mutex::new(BTreeMap::new()),
})
}
5 changes: 4 additions & 1 deletion src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
pub(crate) use futures::{FutureExt, StreamExt};
pub(crate) use itertools::Itertools;
pub(crate) use serde::{Deserialize, Serialize};
pub(crate) use std::sync::{Arc, Mutex};
pub(crate) use std::sync::{Arc, Mutex, Weak};

pub(crate) use crate::base::{schema, spec, value};
pub(crate) use crate::builder::{self, plan};
pub(crate) use crate::execution;
pub(crate) use crate::lib_context::{FlowContext, LibContext};
pub(crate) use crate::ops::interface;
pub(crate) use crate::service::error::ApiError;

pub(crate) use crate::{api_bail, api_error};

pub(crate) use anyhow::{anyhow, bail};
pub(crate) use log::{debug, error, info, trace, warn};
31 changes: 15 additions & 16 deletions src/py/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::prelude::*;

use crate::base::spec::VectorSimilarityMetric;
use crate::execution::query;
use crate::get_lib_context;
Expand All @@ -7,13 +9,10 @@ use crate::ops::py_factory::PyOpArgSchema;
use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory};
use crate::server::{self, ServerSettings};
use crate::settings::Settings;
use crate::setup;
use crate::LIB_CONTEXT;
use crate::{api_error, setup};
use crate::{builder, execution};
use anyhow::anyhow;
use pyo3::{exceptions::PyException, prelude::*};
use std::collections::btree_map;
use std::sync::Arc;

mod convert;
pub use convert::*;
Expand Down Expand Up @@ -91,20 +90,20 @@ impl IndexUpdateInfo {
}

#[pyclass]
pub struct Flow(pub Arc<builder::AnalyzedFlow>);
pub struct Flow(pub Arc<FlowContext>);

#[pymethods]
impl Flow {
pub fn __str__(&self) -> String {
serde_json::to_string_pretty(&self.0.flow_instance).unwrap()
serde_json::to_string_pretty(&self.0.flow.flow_instance).unwrap()
}

pub fn __repr__(&self) -> String {
self.__str__()
}

pub fn name(&self) -> &str {
&self.0.flow_instance.name
&self.0.flow.flow_instance.name
}

pub fn update(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
Expand Down Expand Up @@ -132,10 +131,10 @@ impl Flow {
lib_context
.runtime
.block_on(async {
let exec_plan = self.0.get_execution_plan().await?;
let exec_plan = self.0.flow.get_execution_plan().await?;
execution::dumper::evaluate_and_dump(
&exec_plan,
&self.0.data_schema,
&self.0.flow.data_schema,
options.into_inner(),
&lib_context.pool,
)
Expand Down Expand Up @@ -181,7 +180,7 @@ impl SimpleSemanticsQueryHandler {
let handler = lib_context
.runtime
.block_on(query::SimpleSemanticsQueryHandler::new(
flow.0.clone(),
flow.0.flow.clone(),
target_name,
query_transform_flow.0.clone(),
default_similarity_metric.0,
Expand All @@ -194,11 +193,11 @@ impl SimpleSemanticsQueryHandler {
pub fn register_query_handler(&self, name: String) -> PyResult<()> {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
let mut flows = lib_context.flows.write().unwrap();
let flow_ctx = flows
.get_mut(&self.0.flow_name)
.ok_or_else(|| PyException::new_err(format!("flow not found: {}", self.0.flow_name)))?;
match flow_ctx.query_handlers.entry(name) {
let flow_ctx = lib_context
.get_flow_context(&self.0.flow_name)
.into_py_result()?;
let mut query_handlers = flow_ctx.query_handlers.lock().unwrap();
match query_handlers.entry(name) {
btree_map::Entry::Occupied(entry) => {
return Err(PyException::new_err(format!(
"query handler name already exists: {}",
Expand Down Expand Up @@ -270,8 +269,8 @@ fn check_setup_status(
) -> PyResult<SetupStatusCheck> {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
let flows = lib_context.flows.lock().unwrap();
let all_css = lib_context.combined_setup_states.read().unwrap();
let flows = lib_context.flows.read().unwrap();
let setup_status =
setup::check_setup_status(&flows, &all_css, options.into_inner()).into_py_result()?;
Ok(SetupStatusCheck(setup_status))
Expand Down
Loading