Skip to content

Commit 130bdfd

Browse files
committed
Expose update() and FlowSynchronizer::join() as async Python function
1 parent 49c836d commit 130bdfd

File tree

6 files changed

+56
-64
lines changed

6 files changed

+56
-64
lines changed

Cargo.toml

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ name = "cocoindex_engine"
1111
crate-type = ["cdylib"]
1212

1313
[dependencies]
14-
pyo3 = { version = "0.23.5", features = ["chrono"] }
14+
pyo3 = { version = "0.24.1", features = ["chrono"] }
15+
pythonize = "0.24.0"
16+
pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
17+
1518
anyhow = { version = "1.0.97", features = ["std"] }
1619
async-trait = "0.1.88"
1720
axum = "0.7.9"
@@ -21,7 +24,7 @@ chrono = "0.4.40"
2124
config = "0.14.1"
2225
const_format = "0.2.34"
2326
futures = "0.3.31"
24-
log = "0.4.26"
27+
log = "0.4.27"
2528
regex = "1.11.1"
2629
serde = { version = "1.0.219", features = ["derive"] }
2730
serde_json = "1.0.140"
@@ -31,7 +34,14 @@ sqlx = { version = "0.8.3", features = [
3134
"runtime-tokio",
3235
"uuid",
3336
] }
34-
tokio = { version = "1.44.1", features = ["macros", "rt-multi-thread", "full", "tracing", "fs"] }
37+
tokio = { version = "1.44.1", features = [
38+
"macros",
39+
"rt-multi-thread",
40+
"full",
41+
"tracing",
42+
"fs",
43+
"sync",
44+
] }
3545
tower = "0.5.2"
3646
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
3747
indexmap = { version = "2.8.0", features = ["serde"] }
@@ -40,9 +50,7 @@ pgvector = { version = "0.4.0", features = ["sqlx"] }
4050
indenter = "0.3.3"
4151
itertools = "0.14.0"
4252
derivative = "2.2.0"
43-
async-lock = "3.4.0"
4453
hex = "0.4.3"
45-
pythonize = "0.23.0"
4654
schemars = "0.8.22"
4755
console-subscriber = "0.4.1"
4856
env_logger = "0.11.7"
@@ -82,12 +90,12 @@ tree-sitter-yaml = "0.7.0"
8290
globset = "0.4.16"
8391
unicase = "2.8.1"
8492
google-drive3 = "6.0.0"
85-
hyper-util = "0.1.10"
93+
hyper-util = "0.1.11"
8694
hyper-rustls = { version = "0.27.5" }
8795
yup-oauth2 = "12.1.0"
8896
rustls = { version = "0.23.25" }
8997
http-body-util = "0.1.3"
90-
yaml-rust2 = "0.10.0"
98+
yaml-rust2 = "0.10.1"
9199
urlencoding = "2.1.3"
92100
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }
93101
tokio-stream = "0.1.17"

python/cocoindex/cli.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import click
23
import datetime
34

@@ -50,7 +51,7 @@ def update(flow_name: str | None):
5051
"""
5152
Update the index defined by the flow.
5253
"""
53-
stats = _flow_by_name(flow_name).update()
54+
stats = asyncio.run(_flow_by_name(flow_name).update())
5455
print(stats)
5556

5657
@cli.command()

python/cocoindex/flow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,12 +383,12 @@ def name(self) -> str:
383383
"""
384384
return self._lazy_engine_flow().name()
385385

386-
def update(self):
386+
async def update(self):
387387
"""
388388
Update the index defined by the flow.
389389
Once the function returns, the indice is fresh up to the moment when the function is called.
390390
"""
391-
return self._lazy_engine_flow().update()
391+
return await self._lazy_engine_flow().update()
392392

393393
def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
394394
"""

src/execution/memoization.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub struct StoredMemoizationInfo {
2727
pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,
2828
}
2929

30-
pub type CacheEntryCell = Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>;
30+
pub type CacheEntryCell = Arc<tokio::sync::OnceCell<Result<value::Value, SharedError>>>;
3131
enum CacheData {
3232
/// Existing entry in previous runs, but not in current run yet.
3333
Previous(serde_json::Value),
@@ -180,7 +180,7 @@ impl EvaluationMemory {
180180
match &mut entry_mut.data {
181181
CacheData::Previous(value) => {
182182
let value = value::Value::from_json(std::mem::take(value), typ)?;
183-
let cell = Arc::new(async_lock::OnceCell::from(Ok(value)));
183+
let cell = Arc::new(tokio::sync::OnceCell::from(Ok(value)));
184184
let time = entry_mut.time;
185185
entry.insert(CacheEntry {
186186
time,
@@ -192,7 +192,7 @@ impl EvaluationMemory {
192192
}
193193
}
194194
entry => {
195-
let cell = Arc::new(async_lock::OnceCell::new());
195+
let cell = Arc::new(tokio::sync::OnceCell::new());
196196
entry.insert_entry(CacheEntry {
197197
time: self.current_time,
198198
data: CacheData::Current(cell.clone()),

src/lib_context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ use crate::service::error::ApiError;
88
use crate::settings;
99
use crate::setup;
1010
use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler};
11-
use async_lock::OnceCell;
1211
use axum::http::StatusCode;
1312
use sqlx::PgPool;
1413
use tokio::runtime::Runtime;
1514

1615
pub struct FlowContext {
1716
pub flow: Arc<AnalyzedFlow>,
18-
pub source_indexing_contexts: Vec<OnceCell<Arc<SourceIndexingContext>>>,
17+
pub source_indexing_contexts: Vec<tokio::sync::OnceCell<Arc<SourceIndexingContext>>>,
1918
pub query_handlers: Mutex<BTreeMap<String, Arc<SimpleSemanticsQueryHandler>>>,
2019
}
2120

2221
impl FlowContext {
2322
pub fn new(flow: Arc<AnalyzedFlow>) -> Self {
2423
let mut source_indexing_contexts = Vec::new();
25-
source_indexing_contexts
26-
.resize_with(flow.flow_instance.import_ops.len(), || OnceCell::new());
24+
source_indexing_contexts.resize_with(flow.flow_instance.import_ops.len(), || {
25+
tokio::sync::OnceCell::new()
26+
});
2727
Self {
2828
flow,
2929
source_indexing_contexts,

src/py/mod.rs

Lines changed: 30 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -93,47 +93,29 @@ impl IndexUpdateInfo {
9393
pub struct Flow(pub Arc<FlowContext>);
9494

9595
#[pyclass]
96-
pub struct FlowSynchronizer(pub async_lock::RwLock<execution::FlowSynchronizer>);
96+
pub struct FlowSynchronizer(pub Arc<tokio::sync::RwLock<execution::FlowSynchronizer>>);
9797

9898
#[pymethods]
9999
impl FlowSynchronizer {
100-
pub fn join(&self, py: Python<'_>) -> PyResult<()> {
101-
py.allow_threads(|| {
102-
let lib_context = get_lib_context()
103-
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
104-
lib_context
105-
.runtime
106-
.block_on(async {
107-
let mut synchronizer = self.0.write().await;
108-
synchronizer.join().await
109-
})
110-
.into_py_result()
100+
pub fn join<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
101+
let synchronizer = self.0.clone();
102+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
103+
let mut synchronizer = synchronizer.write().await;
104+
synchronizer.join().await.into_py_result()
111105
})
112106
}
113107

114-
pub fn abort(&self, py: Python<'_>) -> PyResult<()> {
108+
pub fn abort(&self, py: Python<'_>) {
115109
py.allow_threads(|| {
116-
let lib_context = get_lib_context()
117-
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
118-
lib_context.runtime.block_on(async {
119-
let mut synchronizer = self.0.write().await;
120-
synchronizer.abort();
121-
});
122-
Ok(())
110+
let mut synchronizer = self.0.blocking_write();
111+
synchronizer.abort();
123112
})
124113
}
125114

126-
pub fn index_update_info(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
115+
pub fn index_update_info(&self, py: Python<'_>) -> IndexUpdateInfo {
127116
py.allow_threads(|| {
128-
let lib_context = get_lib_context()
129-
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
130-
lib_context
131-
.runtime
132-
.block_on(async {
133-
let synchronizer = self.0.read().await;
134-
anyhow::Ok(IndexUpdateInfo(synchronizer.index_update_info()))
135-
})
136-
.into_py_result()
117+
let synchronizer = self.0.blocking_read();
118+
IndexUpdateInfo(synchronizer.index_update_info())
137119
})
138120
}
139121
}
@@ -152,25 +134,24 @@ impl Flow {
152134
&self.0.flow.flow_instance.name
153135
}
154136

155-
pub fn update(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
156-
py.allow_threads(|| {
137+
pub fn update<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
138+
let flow_ctx = self.0.clone();
139+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
157140
let lib_context = get_lib_context()
158141
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
159-
let update_info = lib_context
160-
.runtime
161-
.block_on(async {
162-
let mut synchronizer = execution::FlowSynchronizer::start(
163-
self.0.clone(),
164-
&lib_context.pool,
165-
&execution::FlowSynchronizerOptions {
166-
keep_refreshed: false,
167-
},
168-
)
169-
.await?;
170-
synchronizer.join().await?;
171-
anyhow::Ok(synchronizer.index_update_info())
172-
})
142+
let update_info = {
143+
let mut synchronizer = execution::FlowSynchronizer::start(
144+
flow_ctx,
145+
&lib_context.pool,
146+
&execution::FlowSynchronizerOptions {
147+
keep_refreshed: false,
148+
},
149+
)
150+
.await
173151
.into_py_result()?;
152+
synchronizer.join().await.into_py_result()?;
153+
synchronizer.index_update_info()
154+
};
174155
Ok(IndexUpdateInfo(update_info))
175156
})
176157
}
@@ -193,7 +174,9 @@ impl Flow {
193174
anyhow::Ok(synchronizer)
194175
})
195176
.into_py_result()?;
196-
Ok(FlowSynchronizer(async_lock::RwLock::new(synchronizer)))
177+
Ok(FlowSynchronizer(Arc::new(tokio::sync::RwLock::new(
178+
synchronizer,
179+
))))
197180
})
198181
}
199182

0 commit comments

Comments
 (0)