diff --git a/Cargo.toml b/Cargo.toml index 8aec94148..7d068f123 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,6 @@ tower-http = { version = "0.6.2", features = ["cors", "trace"] } indexmap = { version = "2.8.0", features = ["serde"] } blake2 = "0.10.6" pgvector = { version = "0.4.0", features = ["sqlx"] } -blocking = "1.6.1" indenter = "0.3.3" itertools = "0.14.0" derivative = "2.2.0" diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index fc2794e0c..dfafee0b4 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -1,7 +1,6 @@ use std::{collections::BTreeMap, sync::Arc}; use axum::async_trait; -use blocking::unblock; use futures::FutureExt; use pyo3::{ exceptions::PyException, @@ -219,7 +218,7 @@ struct PyFunctionExecutor { impl SimpleFunctionExecutor for Arc { async fn evaluate(&self, input: Vec) -> Result { let self = self.clone(); - unblock(move || { + let result = tokio::task::spawn_blocking(move || { Python::with_gil(|py| -> Result<_> { let mut args = Vec::with_capacity(self.num_positional_args); for v in input[0..self.num_positional_args].iter() { @@ -255,7 +254,8 @@ impl SimpleFunctionExecutor for Arc { )?) }) }) - .await + .await??; + Ok(result) } fn enable_cache(&self) -> bool { @@ -323,27 +323,31 @@ impl SimpleFunctionFactory for PyFunctionFactory { let executor_fut = { let result_type = result_type.clone(); - unblock(move || { - let (enable_cache, behavior_version) = - Python::with_gil(|py| -> anyhow::Result<_> { - executor.call_method(py, "prepare", (), None)?; - let enable_cache = executor - .call_method(py, "enable_cache", (), None)? - .extract::(py)?; - let behavior_version = executor - .call_method(py, "behavior_version", (), None)? - .extract::>(py)?; - Ok((enable_cache, behavior_version)) - })?; - Ok(Box::new(Arc::new(PyFunctionExecutor { - py_function_executor: executor, - num_positional_args, - kw_args_names, - result_type, - enable_cache, - behavior_version, - })) as Box) - }) + async move { + let executor = tokio::task::spawn_blocking(move || -> Result<_> { + let (enable_cache, behavior_version) = + Python::with_gil(|py| -> anyhow::Result<_> { + executor.call_method(py, "prepare", (), None)?; + let enable_cache = executor + .call_method(py, "enable_cache", (), None)? + .extract::(py)?; + let behavior_version = executor + .call_method(py, "behavior_version", (), None)? + .extract::>(py)?; + Ok((enable_cache, behavior_version)) + })?; + Ok(Box::new(Arc::new(PyFunctionExecutor { + py_function_executor: executor, + num_positional_args, + kw_args_names, + result_type, + enable_cache, + behavior_version, + })) as Box) + }) + .await??; + Ok(executor) + } }; Ok((result_type, executor_fut.boxed()))