diff --git a/CHANGELOG.md b/CHANGELOG.md index e38c81a..5abc0b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ To see unreleased changes, please see the CHANGELOG on the main branch. - Avoid attaching to the runtime when cloning TaskLocals by using std::sync::Arc. [#62](https://github.com/PyO3/pyo3-async-runtimes/pull/62) +- **Breaking**: Finalize the future without holding GIL inside async-std/tokio runtime. + Trait `Runtime` now requires `spawn_blocking` function, + `future_into_py` functions now require future return type to be `Send`. + [#60](https://github.com/PyO3/pyo3-async-runtimes/pull/60) ## [0.26.0] - 2025-09-02 diff --git a/src/async_std.rs b/src/async_std.rs index 3014ef6..66f4454 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -17,7 +17,7 @@ use async_std::task; use futures::FutureExt; use pyo3::prelude::*; -use std::{any::Any, cell::RefCell, future::Future, panic::AssertUnwindSafe, pin::Pin}; +use std::{any::Any, cell::RefCell, future::Future, panic, panic::AssertUnwindSafe, pin::Pin}; use crate::{ generic::{self, ContextExt, JoinError, LocalContextExt, Runtime, SpawnLocalExt}, @@ -74,6 +74,15 @@ impl Runtime for AsyncStdRuntime { .map_err(AsyncStdJoinErr) }) } + + fn spawn_blocking(f: F) -> Self::JoinHandle + where + F: FnOnce() + Send + 'static, + { + task::spawn_blocking(move || { + panic::catch_unwind(AssertUnwindSafe(f)).map_err(|e| AsyncStdJoinErr(Box::new(e))) + }) + } } impl ContextExt for AsyncStdRuntime { @@ -276,7 +285,7 @@ pub fn future_into_py_with_locals( ) -> PyResult> where F: Future> + Send + 'static, - T: for<'py> IntoPyObject<'py>, + T: for<'py> IntoPyObject<'py> + Send + 'static, { generic::future_into_py_with_locals::(py, locals, fut) } @@ -322,7 +331,7 @@ where pub fn future_into_py(py: Python, fut: F) -> PyResult> where F: Future> + Send + 'static, - T: for<'py> IntoPyObject<'py>, + T: for<'py> IntoPyObject<'py> + Send + 'static, { generic::future_into_py::(py, fut) } diff --git a/src/generic.rs b/src/generic.rs index fc60281..2440d96 100644 --- a/src/generic.rs +++ b/src/generic.rs @@ -52,6 +52,11 @@ pub trait Runtime: Send + 'static { fn spawn(fut: F) -> Self::JoinHandle where F: Future + Send + 'static; + + /// Spawn a function onto this runtime's blocking event loop + fn spawn_blocking(f: F) -> Self::JoinHandle + where + F: FnOnce() + Send + 'static; } /// Extension trait for async/await runtimes that support spawning local tasks @@ -161,6 +166,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -265,6 +274,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -415,6 +428,10 @@ fn set_result( /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -540,6 +557,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -581,7 +602,7 @@ pub fn future_into_py_with_locals( where R: Runtime + ContextExt, F: Future> + Send + 'static, - T: for<'py> IntoPyObject<'py>, + T: for<'py> IntoPyObject<'py> + Send + 'static, { let (cancel_tx, cancel_rx) = oneshot::channel(); @@ -606,44 +627,50 @@ where ) .await; - Python::attach(move |py| { - if cancelled(future_tx1.bind(py)) - .map_err(dump_err(py)) - .unwrap_or(false) - { - return; - } - - let _ = set_result( - &locals2.event_loop(py), - future_tx1.bind(py), - result.and_then(|val| val.into_py_any(py)), - ) - .map_err(dump_err(py)); - }); - }) - .await - { - if e.is_panic() { + // We should not hold GIL inside async-std/tokio event loop, + // because a blocked task may prevent other tasks from progressing. + R::spawn_blocking(|| { Python::attach(move |py| { - if cancelled(future_tx2.bind(py)) + if cancelled(future_tx1.bind(py)) .map_err(dump_err(py)) .unwrap_or(false) { return; } - let panic_message = format!( - "rust future panicked: {}", - get_panic_message(&e.into_panic()) - ); let _ = set_result( - locals.0.event_loop.bind(py), - future_tx2.bind(py), - Err(RustPanic::new_err(panic_message)), + &locals2.event_loop(py), + future_tx1.bind(py), + result.and_then(|val| val.into_py_any(py)), ) .map_err(dump_err(py)); }); + }); + }) + .await + { + if e.is_panic() { + R::spawn_blocking(|| { + Python::attach(move |py| { + if cancelled(future_tx2.bind(py)) + .map_err(dump_err(py)) + .unwrap_or(false) + { + return; + } + + let panic_message = format!( + "rust future panicked: {}", + get_panic_message(&e.into_panic()) + ); + let _ = set_result( + locals.0.event_loop.bind(py), + future_tx2.bind(py), + Err(RustPanic::new_err(panic_message)), + ) + .map_err(dump_err(py)); + }); + }); } } }); @@ -812,6 +839,10 @@ impl PyDoneCallback { /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -844,7 +875,7 @@ pub fn future_into_py(py: Python, fut: F) -> PyResult> where R: Runtime + ContextExt, F: Future> + Send + 'static, - T: for<'py> IntoPyObject<'py>, + T: for<'py> IntoPyObject<'py> + Send + 'static, { future_into_py_with_locals::(py, get_current_locals::(py)?, fut) } @@ -921,6 +952,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -1126,6 +1161,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -1240,6 +1279,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -1389,6 +1432,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -1584,6 +1631,10 @@ async def forward(gen, sender): /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { @@ -1737,6 +1788,10 @@ where /// # { /// # unreachable!() /// # } +/// # +/// # fn spawn_blocking(f: F) -> Self::JoinHandle where F: FnOnce() + Send + 'static { +/// # unreachable!() +/// # } /// # } /// # /// # impl ContextExt for MyCustomRuntime { diff --git a/src/tokio.rs b/src/tokio.rs index 4865b93..0c09261 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -94,6 +94,13 @@ impl GenericRuntime for TokioRuntime { fut.await; }) } + + fn spawn_blocking(f: F) -> Self::JoinHandle + where + F: FnOnce() + Send + 'static, + { + get_runtime().spawn_blocking(f) + } } impl ContextExt for TokioRuntime { @@ -318,7 +325,7 @@ pub fn future_into_py_with_locals( ) -> PyResult> where F: Future> + Send + 'static, - T: for<'py> IntoPyObject<'py>, + T: for<'py> IntoPyObject<'py> + Send + 'static, { generic::future_into_py_with_locals::(py, locals, fut) } @@ -364,7 +371,7 @@ where pub fn future_into_py(py: Python, fut: F) -> PyResult> where F: Future> + Send + 'static, - T: for<'py> IntoPyObject<'py>, + T: for<'py> IntoPyObject<'py> + Send + 'static, { generic::future_into_py::(py, fut) }