diff --git a/Cargo.toml b/Cargo.toml index 722a5186e..b7e26ab13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,10 @@ name = "cocoindex_engine" crate-type = ["cdylib"] [dependencies] -pyo3 = { version = "0.23.5", features = ["chrono"] } +pyo3 = { version = "0.24.1", features = ["chrono"] } +pythonize = "0.24.0" +pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] } + anyhow = { version = "1.0.97", features = ["std"] } async-trait = "0.1.88" axum = "0.7.9" @@ -21,7 +24,7 @@ chrono = "0.4.40" config = "0.14.1" const_format = "0.2.34" futures = "0.3.31" -log = "0.4.26" +log = "0.4.27" regex = "1.11.1" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" @@ -31,7 +34,14 @@ sqlx = { version = "0.8.3", features = [ "runtime-tokio", "uuid", ] } -tokio = { version = "1.44.1", features = ["macros", "rt-multi-thread", "full", "tracing", "fs"] } +tokio = { version = "1.44.1", features = [ + "macros", + "rt-multi-thread", + "full", + "tracing", + "fs", + "sync", +] } tower = "0.5.2" tower-http = { version = "0.6.2", features = ["cors", "trace"] } indexmap = { version = "2.8.0", features = ["serde"] } @@ -40,9 +50,7 @@ pgvector = { version = "0.4.0", features = ["sqlx"] } indenter = "0.3.3" itertools = "0.14.0" derivative = "2.2.0" -async-lock = "3.4.0" hex = "0.4.3" -pythonize = "0.23.0" schemars = "0.8.22" console-subscriber = "0.4.1" env_logger = "0.11.7" @@ -82,12 +90,12 @@ tree-sitter-yaml = "0.7.0" globset = "0.4.16" unicase = "2.8.1" google-drive3 = "6.0.0" -hyper-util = "0.1.10" +hyper-util = "0.1.11" hyper-rustls = { version = "0.27.5" } yup-oauth2 = "12.1.0" rustls = { version = "0.23.25" } http-body-util = "0.1.3" -yaml-rust2 = "0.10.0" +yaml-rust2 = "0.10.1" urlencoding = "2.1.3" uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] } tokio-stream = "0.1.17" diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index e474fb1d8..1ae99444e 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -1,3 +1,4 @@ +import asyncio import click import datetime @@ -50,7 +51,7 @@ def update(flow_name: str | None): """ Update the index defined by the flow. """ - stats = _flow_by_name(flow_name).update() + stats = asyncio.run(_flow_by_name(flow_name).update()) print(stats) @cli.command() diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index b5925a3e7..890e7d711 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -383,12 +383,12 @@ def name(self) -> str: """ return self._lazy_engine_flow().name() - def update(self): + async def update(self): """ Update the index defined by the flow. Once the function returns, the indice is fresh up to the moment when the function is called. """ - return self._lazy_engine_flow().update() + return await self._lazy_engine_flow().update() def evaluate_and_dump(self, options: EvaluateAndDumpOptions): """ diff --git a/src/execution/memoization.rs b/src/execution/memoization.rs index b51d75124..923722d17 100644 --- a/src/execution/memoization.rs +++ b/src/execution/memoization.rs @@ -27,7 +27,7 @@ pub struct StoredMemoizationInfo { pub uuids: HashMap>, } -pub type CacheEntryCell = Arc>>; +pub type CacheEntryCell = Arc>>; enum CacheData { /// Existing entry in previous runs, but not in current run yet. Previous(serde_json::Value), @@ -180,7 +180,7 @@ impl EvaluationMemory { match &mut entry_mut.data { CacheData::Previous(value) => { let value = value::Value::from_json(std::mem::take(value), typ)?; - let cell = Arc::new(async_lock::OnceCell::from(Ok(value))); + let cell = Arc::new(tokio::sync::OnceCell::from(Ok(value))); let time = entry_mut.time; entry.insert(CacheEntry { time, @@ -192,7 +192,7 @@ impl EvaluationMemory { } } entry => { - let cell = Arc::new(async_lock::OnceCell::new()); + let cell = Arc::new(tokio::sync::OnceCell::new()); entry.insert_entry(CacheEntry { time: self.current_time, data: CacheData::Current(cell.clone()), diff --git a/src/lib_context.rs b/src/lib_context.rs index 281da49a9..104ed2789 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -8,22 +8,22 @@ use crate::service::error::ApiError; use crate::settings; use crate::setup; use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler}; -use async_lock::OnceCell; use axum::http::StatusCode; use sqlx::PgPool; use tokio::runtime::Runtime; pub struct FlowContext { pub flow: Arc, - pub source_indexing_contexts: Vec>>, + pub source_indexing_contexts: Vec>>, pub query_handlers: Mutex>>, } impl FlowContext { pub fn new(flow: Arc) -> Self { let mut source_indexing_contexts = Vec::new(); - source_indexing_contexts - .resize_with(flow.flow_instance.import_ops.len(), || OnceCell::new()); + source_indexing_contexts.resize_with(flow.flow_instance.import_ops.len(), || { + tokio::sync::OnceCell::new() + }); Self { flow, source_indexing_contexts, diff --git a/src/py/mod.rs b/src/py/mod.rs index 3f78cf050..fbcf9d096 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -93,47 +93,29 @@ impl IndexUpdateInfo { pub struct Flow(pub Arc); #[pyclass] -pub struct FlowSynchronizer(pub async_lock::RwLock); +pub struct FlowSynchronizer(pub Arc>); #[pymethods] impl FlowSynchronizer { - pub fn join(&self, py: Python<'_>) -> PyResult<()> { - py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - lib_context - .runtime - .block_on(async { - let mut synchronizer = self.0.write().await; - synchronizer.join().await - }) - .into_py_result() + pub fn join<'py>(&self, py: Python<'py>) -> PyResult> { + let synchronizer = self.0.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let mut synchronizer = synchronizer.write().await; + synchronizer.join().await.into_py_result() }) } - pub fn abort(&self, py: Python<'_>) -> PyResult<()> { + pub fn abort(&self, py: Python<'_>) { py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - lib_context.runtime.block_on(async { - let mut synchronizer = self.0.write().await; - synchronizer.abort(); - }); - Ok(()) + let mut synchronizer = self.0.blocking_write(); + synchronizer.abort(); }) } - pub fn index_update_info(&self, py: Python<'_>) -> PyResult { + pub fn index_update_info(&self, py: Python<'_>) -> IndexUpdateInfo { py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - lib_context - .runtime - .block_on(async { - let synchronizer = self.0.read().await; - anyhow::Ok(IndexUpdateInfo(synchronizer.index_update_info())) - }) - .into_py_result() + let synchronizer = self.0.blocking_read(); + IndexUpdateInfo(synchronizer.index_update_info()) }) } } @@ -152,25 +134,24 @@ impl Flow { &self.0.flow.flow_instance.name } - pub fn update(&self, py: Python<'_>) -> PyResult { - py.allow_threads(|| { + pub fn update<'py>(&self, py: Python<'py>) -> PyResult> { + let flow_ctx = self.0.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { let lib_context = get_lib_context() .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - let update_info = lib_context - .runtime - .block_on(async { - let mut synchronizer = execution::FlowSynchronizer::start( - self.0.clone(), - &lib_context.pool, - &execution::FlowSynchronizerOptions { - keep_refreshed: false, - }, - ) - .await?; - synchronizer.join().await?; - anyhow::Ok(synchronizer.index_update_info()) - }) + let update_info = { + let mut synchronizer = execution::FlowSynchronizer::start( + flow_ctx, + &lib_context.pool, + &execution::FlowSynchronizerOptions { + keep_refreshed: false, + }, + ) + .await .into_py_result()?; + synchronizer.join().await.into_py_result()?; + synchronizer.index_update_info() + }; Ok(IndexUpdateInfo(update_info)) }) } @@ -193,7 +174,9 @@ impl Flow { anyhow::Ok(synchronizer) }) .into_py_result()?; - Ok(FlowSynchronizer(async_lock::RwLock::new(synchronizer))) + Ok(FlowSynchronizer(Arc::new(tokio::sync::RwLock::new( + synchronizer, + )))) }) }