Skip to content

Commit 82007fb

Browse files
authored
Add SourceIndexingState into FlowContext. (#233)
1 parent afc4cac commit 82007fb

File tree

8 files changed

+136
-111
lines changed

8 files changed

+136
-111
lines changed

src/builder/flow_builder.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
1-
use anyhow::{anyhow, bail, Result};
1+
use crate::prelude::*;
2+
23
use pyo3::{exceptions::PyException, prelude::*};
34
use std::{
45
collections::{btree_map, hash_map::Entry, HashMap},
56
ops::Deref,
6-
sync::{Arc, Mutex, Weak},
77
};
88

99
use super::analyzer::{
1010
build_flow_instance_context, AnalyzerContext, CollectorBuilder, DataScopeBuilder,
1111
ExecutionScope, ValueTypeBuilder,
1212
};
1313
use crate::{
14-
api_bail,
1514
base::{
16-
schema::{self, CollectorSchema, FieldSchema},
17-
spec::{self, FieldName, NamedSpec},
15+
schema::{CollectorSchema, FieldSchema},
16+
spec::{FieldName, NamedSpec},
1817
},
1918
get_lib_context,
2019
lib_context::LibContext,
@@ -649,21 +648,21 @@ impl FlowBuilder {
649648
))
650649
})
651650
.into_py_result()?;
652-
let analyzed_flow = Arc::new(analyzed_flow);
653-
654-
let mut analyzed_flows = self.lib_context.flows.write().unwrap();
655-
match analyzed_flows.entry(self.flow_instance_name.clone()) {
651+
let mut flow_ctxs = self.lib_context.flows.lock().unwrap();
652+
let flow_ctx = match flow_ctxs.entry(self.flow_instance_name.clone()) {
656653
btree_map::Entry::Occupied(_) => {
657654
return Err(PyException::new_err(format!(
658655
"flow instance name already exists: {}",
659656
self.flow_instance_name
660657
)));
661658
}
662659
btree_map::Entry::Vacant(entry) => {
663-
entry.insert(FlowContext::new(analyzed_flow.clone()));
660+
let flow_ctx = Arc::new(FlowContext::new(Arc::new(analyzed_flow)));
661+
entry.insert(flow_ctx.clone());
662+
flow_ctx
664663
}
665-
}
666-
Ok(py::Flow(analyzed_flow))
664+
};
665+
Ok(py::Flow(flow_ctx))
667666
}
668667

669668
pub fn build_transient_flow(&self, py: Python<'_>) -> PyResult<py::TransientFlow> {

src/execution/source_indexer.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ use super::{
1010
use futures::future::try_join_all;
1111
use sqlx::PgPool;
1212
use tokio::{sync::Semaphore, task::JoinSet};
13-
struct SourceRowState {
13+
struct SourceRowIndexingState {
1414
source_version: SourceVersion,
1515
processing_sem: Arc<Semaphore>,
1616
touched_generation: usize,
1717
}
1818

19-
impl Default for SourceRowState {
19+
impl Default for SourceRowIndexingState {
2020
fn default() -> Self {
2121
Self {
2222
source_version: SourceVersion::default(),
@@ -26,17 +26,17 @@ impl Default for SourceRowState {
2626
}
2727
}
2828

29-
struct SourceState {
30-
rows: HashMap<value::KeyValue, SourceRowState>,
29+
struct SourceIndexingState {
30+
rows: HashMap<value::KeyValue, SourceRowIndexingState>,
3131
scan_generation: usize,
3232
}
33-
pub struct SourceContext {
33+
pub struct SourceIndexingContext {
3434
flow: Arc<builder::AnalyzedFlow>,
3535
source_idx: usize,
36-
state: Mutex<SourceState>,
36+
state: Mutex<SourceIndexingState>,
3737
}
3838

39-
impl SourceContext {
39+
impl SourceIndexingContext {
4040
pub async fn load(
4141
flow: Arc<builder::AnalyzedFlow>,
4242
source_idx: usize,
@@ -58,7 +58,7 @@ impl SourceContext {
5858
.into_key()?;
5959
rows.insert(
6060
source_key,
61-
SourceRowState {
61+
SourceRowIndexingState {
6262
source_version: SourceVersion::from_stored(
6363
key_metadata.processed_source_ordinal,
6464
&key_metadata.process_logic_fingerprint,
@@ -72,7 +72,7 @@ impl SourceContext {
7272
Ok(Self {
7373
flow,
7474
source_idx,
75-
state: Mutex::new(SourceState {
75+
state: Mutex::new(SourceIndexingState {
7676
rows,
7777
scan_generation,
7878
}),
@@ -144,7 +144,7 @@ impl SourceContext {
144144
}
145145
}
146146
hash_map::Entry::Vacant(entry) => {
147-
entry.insert(SourceRowState {
147+
entry.insert(SourceRowIndexingState {
148148
source_version: target_source_version,
149149
touched_generation: scan_generation,
150150
..Default::default()
@@ -259,15 +259,12 @@ impl SourceContext {
259259
}
260260
}
261261

262-
pub async fn update(
263-
flow: &Arc<builder::AnalyzedFlow>,
264-
pool: &PgPool,
265-
) -> Result<stats::IndexUpdateInfo> {
266-
let plan = flow.get_execution_plan().await?;
262+
pub async fn update(flow_context: &FlowContext, pool: &PgPool) -> Result<stats::IndexUpdateInfo> {
263+
let plan = flow_context.flow.get_execution_plan().await?;
267264
let source_update_stats = try_join_all(
268265
(0..plan.source_ops.len())
269266
.map(|idx| async move {
270-
let source_context = Arc::new(SourceContext::load(flow.clone(), idx, pool).await?);
267+
let source_context = flow_context.get_source_indexing_context(idx, pool).await?;
271268
source_context.update_source(pool).await
272269
})
273270
.collect::<Vec<_>>(),

src/lib_context.rs

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,85 @@
1+
use crate::prelude::*;
2+
13
use std::collections::BTreeMap;
24
use std::sync::{Arc, RwLock};
35

6+
use crate::execution::source_indexer::SourceIndexingContext;
47
use crate::service::error::ApiError;
58
use crate::settings;
69
use crate::setup;
710
use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler};
8-
use anyhow::Result;
11+
use async_lock::OnceCell;
912
use axum::http::StatusCode;
1013
use sqlx::PgPool;
1114
use tokio::runtime::Runtime;
1215

1316
pub struct FlowContext {
1417
pub flow: Arc<AnalyzedFlow>,
15-
pub query_handlers: BTreeMap<String, Arc<SimpleSemanticsQueryHandler>>,
18+
pub source_indexing_contexts: Vec<OnceCell<Arc<SourceIndexingContext>>>,
19+
pub query_handlers: Mutex<BTreeMap<String, Arc<SimpleSemanticsQueryHandler>>>,
1620
}
1721

1822
impl FlowContext {
1923
pub fn new(flow: Arc<AnalyzedFlow>) -> Self {
24+
let mut source_indexing_contexts = Vec::new();
25+
source_indexing_contexts
26+
.resize_with(flow.flow_instance.source_ops.len(), || OnceCell::new());
2027
Self {
2128
flow,
22-
query_handlers: BTreeMap::new(),
29+
source_indexing_contexts,
30+
query_handlers: Mutex::new(BTreeMap::new()),
2331
}
2432
}
33+
34+
pub async fn get_source_indexing_context(
35+
&self,
36+
source_idx: usize,
37+
pool: &PgPool,
38+
) -> Result<&Arc<SourceIndexingContext>> {
39+
self.source_indexing_contexts[source_idx]
40+
.get_or_try_init(|| async move {
41+
Ok(Arc::new(
42+
SourceIndexingContext::load(self.flow.clone(), source_idx, pool).await?,
43+
))
44+
})
45+
.await
46+
}
47+
48+
pub fn get_query_handler(&self, name: &str) -> Result<Arc<SimpleSemanticsQueryHandler>> {
49+
let query_handlers = self.query_handlers.lock().unwrap();
50+
let query_handler = query_handlers
51+
.get(name)
52+
.ok_or_else(|| {
53+
ApiError::new(
54+
&format!("Query handler not found: {name}"),
55+
StatusCode::NOT_FOUND,
56+
)
57+
})?
58+
.clone();
59+
Ok(query_handler)
60+
}
2561
}
2662

2763
pub struct LibContext {
2864
pub runtime: Runtime,
2965
pub pool: PgPool,
30-
pub flows: RwLock<BTreeMap<String, FlowContext>>,
66+
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
3167
pub combined_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
3268
}
3369

3470
impl LibContext {
35-
pub fn with_flow_context<R>(
36-
&self,
37-
flow_name: &str,
38-
f: impl FnOnce(&FlowContext) -> R,
39-
) -> Result<R, ApiError> {
40-
let flows = self.flows.read().unwrap();
41-
let flow_context = flows.get(flow_name).ok_or_else(|| {
42-
ApiError::new(
43-
&format!("Flow instance not found: {flow_name}"),
44-
StatusCode::NOT_FOUND,
45-
)
46-
})?;
47-
Ok(f(flow_context))
71+
pub fn get_flow_context(&self, flow_name: &str) -> Result<Arc<FlowContext>> {
72+
let flows = self.flows.lock().unwrap();
73+
let flow_ctx = flows
74+
.get(flow_name)
75+
.ok_or_else(|| {
76+
ApiError::new(
77+
&format!("Flow instance not found: {flow_name}"),
78+
StatusCode::NOT_FOUND,
79+
)
80+
})?
81+
.clone();
82+
Ok(flow_ctx)
4883
}
4984
}
5085

@@ -62,6 +97,6 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
6297
runtime,
6398
pool,
6499
combined_setup_states: RwLock::new(all_css),
65-
flows: RwLock::new(BTreeMap::new()),
100+
flows: Mutex::new(BTreeMap::new()),
66101
})
67102
}

src/prelude.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
66
pub(crate) use futures::{FutureExt, StreamExt};
77
pub(crate) use itertools::Itertools;
88
pub(crate) use serde::{Deserialize, Serialize};
9-
pub(crate) use std::sync::{Arc, Mutex};
9+
pub(crate) use std::sync::{Arc, Mutex, Weak};
1010

1111
pub(crate) use crate::base::{schema, spec, value};
1212
pub(crate) use crate::builder::{self, plan};
13+
pub(crate) use crate::execution;
14+
pub(crate) use crate::lib_context::{FlowContext, LibContext};
1315
pub(crate) use crate::ops::interface;
1416
pub(crate) use crate::service::error::ApiError;
1517

1618
pub(crate) use crate::{api_bail, api_error};
1719

20+
pub(crate) use anyhow::{anyhow, bail};
1821
pub(crate) use log::{debug, error, info, trace, warn};

src/py/mod.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::prelude::*;
2+
13
use crate::base::spec::VectorSimilarityMetric;
24
use crate::execution::query;
35
use crate::get_lib_context;
@@ -7,13 +9,10 @@ use crate::ops::py_factory::PyOpArgSchema;
79
use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory};
810
use crate::server::{self, ServerSettings};
911
use crate::settings::Settings;
12+
use crate::setup;
1013
use crate::LIB_CONTEXT;
11-
use crate::{api_error, setup};
12-
use crate::{builder, execution};
13-
use anyhow::anyhow;
1414
use pyo3::{exceptions::PyException, prelude::*};
1515
use std::collections::btree_map;
16-
use std::sync::Arc;
1716

1817
mod convert;
1918
pub use convert::*;
@@ -91,20 +90,20 @@ impl IndexUpdateInfo {
9190
}
9291

9392
#[pyclass]
94-
pub struct Flow(pub Arc<builder::AnalyzedFlow>);
93+
pub struct Flow(pub Arc<FlowContext>);
9594

9695
#[pymethods]
9796
impl Flow {
9897
pub fn __str__(&self) -> String {
99-
serde_json::to_string_pretty(&self.0.flow_instance).unwrap()
98+
serde_json::to_string_pretty(&self.0.flow.flow_instance).unwrap()
10099
}
101100

102101
pub fn __repr__(&self) -> String {
103102
self.__str__()
104103
}
105104

106105
pub fn name(&self) -> &str {
107-
&self.0.flow_instance.name
106+
&self.0.flow.flow_instance.name
108107
}
109108

110109
pub fn update(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
@@ -132,10 +131,10 @@ impl Flow {
132131
lib_context
133132
.runtime
134133
.block_on(async {
135-
let exec_plan = self.0.get_execution_plan().await?;
134+
let exec_plan = self.0.flow.get_execution_plan().await?;
136135
execution::dumper::evaluate_and_dump(
137136
&exec_plan,
138-
&self.0.data_schema,
137+
&self.0.flow.data_schema,
139138
options.into_inner(),
140139
&lib_context.pool,
141140
)
@@ -181,7 +180,7 @@ impl SimpleSemanticsQueryHandler {
181180
let handler = lib_context
182181
.runtime
183182
.block_on(query::SimpleSemanticsQueryHandler::new(
184-
flow.0.clone(),
183+
flow.0.flow.clone(),
185184
target_name,
186185
query_transform_flow.0.clone(),
187186
default_similarity_metric.0,
@@ -194,11 +193,11 @@ impl SimpleSemanticsQueryHandler {
194193
pub fn register_query_handler(&self, name: String) -> PyResult<()> {
195194
let lib_context = get_lib_context()
196195
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
197-
let mut flows = lib_context.flows.write().unwrap();
198-
let flow_ctx = flows
199-
.get_mut(&self.0.flow_name)
200-
.ok_or_else(|| PyException::new_err(format!("flow not found: {}", self.0.flow_name)))?;
201-
match flow_ctx.query_handlers.entry(name) {
196+
let flow_ctx = lib_context
197+
.get_flow_context(&self.0.flow_name)
198+
.into_py_result()?;
199+
let mut query_handlers = flow_ctx.query_handlers.lock().unwrap();
200+
match query_handlers.entry(name) {
202201
btree_map::Entry::Occupied(entry) => {
203202
return Err(PyException::new_err(format!(
204203
"query handler name already exists: {}",
@@ -270,8 +269,8 @@ fn check_setup_status(
270269
) -> PyResult<SetupStatusCheck> {
271270
let lib_context = get_lib_context()
272271
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
272+
let flows = lib_context.flows.lock().unwrap();
273273
let all_css = lib_context.combined_setup_states.read().unwrap();
274-
let flows = lib_context.flows.read().unwrap();
275274
let setup_status =
276275
setup::check_setup_status(&flows, &all_css, options.into_inner()).into_py_result()?;
277276
Ok(SetupStatusCheck(setup_status))

0 commit comments

Comments
 (0)