Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ name = "cocoindex_engine"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.23.5", features = ["chrono"] }
pyo3 = { version = "0.24.1", features = ["chrono"] }
pythonize = "0.24.0"
pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }

anyhow = { version = "1.0.97", features = ["std"] }
async-trait = "0.1.88"
axum = "0.7.9"
Expand All @@ -21,7 +24,7 @@ chrono = "0.4.40"
config = "0.14.1"
const_format = "0.2.34"
futures = "0.3.31"
log = "0.4.26"
log = "0.4.27"
regex = "1.11.1"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
Expand All @@ -31,7 +34,14 @@ sqlx = { version = "0.8.3", features = [
"runtime-tokio",
"uuid",
] }
tokio = { version = "1.44.1", features = ["macros", "rt-multi-thread", "full", "tracing", "fs"] }
tokio = { version = "1.44.1", features = [
"macros",
"rt-multi-thread",
"full",
"tracing",
"fs",
"sync",
] }
tower = "0.5.2"
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
indexmap = { version = "2.8.0", features = ["serde"] }
Expand All @@ -40,9 +50,7 @@ pgvector = { version = "0.4.0", features = ["sqlx"] }
indenter = "0.3.3"
itertools = "0.14.0"
derivative = "2.2.0"
async-lock = "3.4.0"
hex = "0.4.3"
pythonize = "0.23.0"
schemars = "0.8.22"
console-subscriber = "0.4.1"
env_logger = "0.11.7"
Expand Down Expand Up @@ -82,12 +90,12 @@ tree-sitter-yaml = "0.7.0"
globset = "0.4.16"
unicase = "2.8.1"
google-drive3 = "6.0.0"
hyper-util = "0.1.10"
hyper-util = "0.1.11"
hyper-rustls = { version = "0.27.5" }
yup-oauth2 = "12.1.0"
rustls = { version = "0.23.25" }
http-body-util = "0.1.3"
yaml-rust2 = "0.10.0"
yaml-rust2 = "0.10.1"
urlencoding = "2.1.3"
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }
tokio-stream = "0.1.17"
Expand Down
3 changes: 2 additions & 1 deletion python/cocoindex/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import click
import datetime

Expand Down Expand Up @@ -50,7 +51,7 @@ def update(flow_name: str | None):
"""
Update the index defined by the flow.
"""
stats = _flow_by_name(flow_name).update()
stats = asyncio.run(_flow_by_name(flow_name).update())
print(stats)

@cli.command()
Expand Down
4 changes: 2 additions & 2 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,12 @@ def name(self) -> str:
"""
return self._lazy_engine_flow().name()

def update(self):
async def update(self):
"""
Update the index defined by the flow.
Once the function returns, the indice is fresh up to the moment when the function is called.
"""
return self._lazy_engine_flow().update()
return await self._lazy_engine_flow().update()

def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
"""
Expand Down
6 changes: 3 additions & 3 deletions src/execution/memoization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct StoredMemoizationInfo {
pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,
}

pub type CacheEntryCell = Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>;
pub type CacheEntryCell = Arc<tokio::sync::OnceCell<Result<value::Value, SharedError>>>;
enum CacheData {
/// Existing entry in previous runs, but not in current run yet.
Previous(serde_json::Value),
Expand Down Expand Up @@ -180,7 +180,7 @@ impl EvaluationMemory {
match &mut entry_mut.data {
CacheData::Previous(value) => {
let value = value::Value::from_json(std::mem::take(value), typ)?;
let cell = Arc::new(async_lock::OnceCell::from(Ok(value)));
let cell = Arc::new(tokio::sync::OnceCell::from(Ok(value)));
let time = entry_mut.time;
entry.insert(CacheEntry {
time,
Expand All @@ -192,7 +192,7 @@ impl EvaluationMemory {
}
}
entry => {
let cell = Arc::new(async_lock::OnceCell::new());
let cell = Arc::new(tokio::sync::OnceCell::new());
entry.insert_entry(CacheEntry {
time: self.current_time,
data: CacheData::Current(cell.clone()),
Expand Down
8 changes: 4 additions & 4 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ use crate::service::error::ApiError;
use crate::settings;
use crate::setup;
use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler};
use async_lock::OnceCell;
use axum::http::StatusCode;
use sqlx::PgPool;
use tokio::runtime::Runtime;

pub struct FlowContext {
pub flow: Arc<AnalyzedFlow>,
pub source_indexing_contexts: Vec<OnceCell<Arc<SourceIndexingContext>>>,
pub source_indexing_contexts: Vec<tokio::sync::OnceCell<Arc<SourceIndexingContext>>>,
pub query_handlers: Mutex<BTreeMap<String, Arc<SimpleSemanticsQueryHandler>>>,
}

impl FlowContext {
pub fn new(flow: Arc<AnalyzedFlow>) -> Self {
let mut source_indexing_contexts = Vec::new();
source_indexing_contexts
.resize_with(flow.flow_instance.import_ops.len(), || OnceCell::new());
source_indexing_contexts.resize_with(flow.flow_instance.import_ops.len(), || {
tokio::sync::OnceCell::new()
});
Self {
flow,
source_indexing_contexts,
Expand Down
77 changes: 30 additions & 47 deletions src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,47 +93,29 @@ impl IndexUpdateInfo {
pub struct Flow(pub Arc<FlowContext>);

#[pyclass]
pub struct FlowSynchronizer(pub async_lock::RwLock<execution::FlowSynchronizer>);
pub struct FlowSynchronizer(pub Arc<tokio::sync::RwLock<execution::FlowSynchronizer>>);

#[pymethods]
impl FlowSynchronizer {
pub fn join(&self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
lib_context
.runtime
.block_on(async {
let mut synchronizer = self.0.write().await;
synchronizer.join().await
})
.into_py_result()
pub fn join<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let synchronizer = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let mut synchronizer = synchronizer.write().await;
synchronizer.join().await.into_py_result()
})
}

pub fn abort(&self, py: Python<'_>) -> PyResult<()> {
pub fn abort(&self, py: Python<'_>) {
py.allow_threads(|| {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
lib_context.runtime.block_on(async {
let mut synchronizer = self.0.write().await;
synchronizer.abort();
});
Ok(())
let mut synchronizer = self.0.blocking_write();
synchronizer.abort();
})
}

pub fn index_update_info(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
pub fn index_update_info(&self, py: Python<'_>) -> IndexUpdateInfo {
py.allow_threads(|| {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
lib_context
.runtime
.block_on(async {
let synchronizer = self.0.read().await;
anyhow::Ok(IndexUpdateInfo(synchronizer.index_update_info()))
})
.into_py_result()
let synchronizer = self.0.blocking_read();
IndexUpdateInfo(synchronizer.index_update_info())
})
}
}
Expand All @@ -152,25 +134,24 @@ impl Flow {
&self.0.flow.flow_instance.name
}

pub fn update(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
py.allow_threads(|| {
pub fn update<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let flow_ctx = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
let update_info = lib_context
.runtime
.block_on(async {
let mut synchronizer = execution::FlowSynchronizer::start(
self.0.clone(),
&lib_context.pool,
&execution::FlowSynchronizerOptions {
keep_refreshed: false,
},
)
.await?;
synchronizer.join().await?;
anyhow::Ok(synchronizer.index_update_info())
})
let update_info = {
let mut synchronizer = execution::FlowSynchronizer::start(
flow_ctx,
&lib_context.pool,
&execution::FlowSynchronizerOptions {
keep_refreshed: false,
},
)
.await
.into_py_result()?;
synchronizer.join().await.into_py_result()?;
synchronizer.index_update_info()
};
Ok(IndexUpdateInfo(update_info))
})
}
Expand All @@ -193,7 +174,9 @@ impl Flow {
anyhow::Ok(synchronizer)
})
.into_py_result()?;
Ok(FlowSynchronizer(async_lock::RwLock::new(synchronizer)))
Ok(FlowSynchronizer(Arc::new(tokio::sync::RwLock::new(
synchronizer,
))))
})
}

Expand Down