Skip to content

Commit 0ffbe2a

Browse files
authored
Switch from blocking crate to tokio::task::spawn_blocking(). (#91)
1 parent c399325 commit 0ffbe2a

File tree

2 files changed

+28
-25
lines changed

2 files changed

+28
-25
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ tower-http = { version = "0.6.2", features = ["cors", "trace"] }
3737
indexmap = { version = "2.8.0", features = ["serde"] }
3838
blake2 = "0.10.6"
3939
pgvector = { version = "0.4.0", features = ["sqlx"] }
40-
blocking = "1.6.1"
4140
indenter = "0.3.3"
4241
itertools = "0.14.0"
4342
derivative = "2.2.0"

src/ops/py_factory.rs

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{collections::BTreeMap, sync::Arc};
22

33
use axum::async_trait;
4-
use blocking::unblock;
54
use futures::FutureExt;
65
use pyo3::{
76
exceptions::PyException,
@@ -219,7 +218,7 @@ struct PyFunctionExecutor {
219218
impl SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
220219
async fn evaluate(&self, input: Vec<value::Value>) -> Result<value::Value> {
221220
let self = self.clone();
222-
unblock(move || {
221+
let result = tokio::task::spawn_blocking(move || {
223222
Python::with_gil(|py| -> Result<_> {
224223
let mut args = Vec::with_capacity(self.num_positional_args);
225224
for v in input[0..self.num_positional_args].iter() {
@@ -255,7 +254,8 @@ impl SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
255254
)?)
256255
})
257256
})
258-
.await
257+
.await??;
258+
Ok(result)
259259
}
260260

261261
fn enable_cache(&self) -> bool {
@@ -323,27 +323,31 @@ impl SimpleFunctionFactory for PyFunctionFactory {
323323

324324
let executor_fut = {
325325
let result_type = result_type.clone();
326-
unblock(move || {
327-
let (enable_cache, behavior_version) =
328-
Python::with_gil(|py| -> anyhow::Result<_> {
329-
executor.call_method(py, "prepare", (), None)?;
330-
let enable_cache = executor
331-
.call_method(py, "enable_cache", (), None)?
332-
.extract::<bool>(py)?;
333-
let behavior_version = executor
334-
.call_method(py, "behavior_version", (), None)?
335-
.extract::<Option<u32>>(py)?;
336-
Ok((enable_cache, behavior_version))
337-
})?;
338-
Ok(Box::new(Arc::new(PyFunctionExecutor {
339-
py_function_executor: executor,
340-
num_positional_args,
341-
kw_args_names,
342-
result_type,
343-
enable_cache,
344-
behavior_version,
345-
})) as Box<dyn SimpleFunctionExecutor>)
346-
})
326+
async move {
327+
let executor = tokio::task::spawn_blocking(move || -> Result<_> {
328+
let (enable_cache, behavior_version) =
329+
Python::with_gil(|py| -> anyhow::Result<_> {
330+
executor.call_method(py, "prepare", (), None)?;
331+
let enable_cache = executor
332+
.call_method(py, "enable_cache", (), None)?
333+
.extract::<bool>(py)?;
334+
let behavior_version = executor
335+
.call_method(py, "behavior_version", (), None)?
336+
.extract::<Option<u32>>(py)?;
337+
Ok((enable_cache, behavior_version))
338+
})?;
339+
Ok(Box::new(Arc::new(PyFunctionExecutor {
340+
py_function_executor: executor,
341+
num_positional_args,
342+
kw_args_names,
343+
result_type,
344+
enable_cache,
345+
behavior_version,
346+
})) as Box<dyn SimpleFunctionExecutor>)
347+
})
348+
.await??;
349+
Ok(executor)
350+
}
347351
};
348352

349353
Ok((result_type, executor_fut.boxed()))

0 commit comments

Comments
 (0)