Skip to content

Commit 633a512

Browse files
author
Andrew J Westlake
committed
Added a 'cancellable' set of functions to allow Rust futures to be cancelled from Python (may become default behaviour in next release)
1 parent 97d8c01 commit 633a512

File tree

6 files changed

+459
-13
lines changed

6 files changed

+459
-13
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ clap = { version = "2.33", optional = true }
106106
futures = "0.3"
107107
inventory = "0.1"
108108
once_cell = "1.5"
109+
pin-project-lite = "0.2"
109110
pyo3 = "0.14"
110111
pyo3-asyncio-macros = { path = "pyo3-asyncio-macros", version = "=0.14.0", optional = true }
111112

pytests/test_async_std_asyncio.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
mod common;
22

3-
use std::{rc::Rc, time::Duration};
3+
use std::{
4+
rc::Rc,
5+
sync::{Arc, Mutex},
6+
time::Duration,
7+
};
48

59
use async_std::task;
610
use pyo3::{
@@ -171,12 +175,19 @@ async fn test_local_future_into_py() -> PyResult<()> {
171175

172176
#[pyo3_asyncio::async_std::test]
173177
async fn test_cancel() -> PyResult<()> {
178+
let completed = Arc::new(Mutex::new(false));
179+
174180
let py_future = Python::with_gil(|py| -> PyResult<PyObject> {
175-
Ok(pyo3_asyncio::async_std::future_into_py(py, async {
176-
async_std::task::sleep(Duration::from_secs(1)).await;
177-
Ok(Python::with_gil(|py| py.None()))
178-
})?
179-
.into())
181+
let completed = Arc::clone(&completed);
182+
Ok(
183+
pyo3_asyncio::async_std::cancellable_future_into_py(py, async move {
184+
async_std::task::sleep(Duration::from_secs(1)).await;
185+
*completed.lock().unwrap() = true;
186+
187+
Ok(Python::with_gil(|py| py.None()))
188+
})?
189+
.into(),
190+
)
180191
})?;
181192

182193
if let Err(e) = Python::with_gil(|py| -> PyResult<_> {
@@ -198,6 +209,11 @@ async fn test_cancel() -> PyResult<()> {
198209
panic!("expected CancelledError");
199210
}
200211

212+
async_std::task::sleep(Duration::from_secs(1)).await;
213+
if *completed.lock().unwrap() {
214+
panic!("future still completed")
215+
}
216+
201217
Ok(())
202218
}
203219

pytests/tokio_asyncio/mod.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{rc::Rc, time::Duration};
1+
use std::{
2+
rc::Rc,
3+
sync::{Arc, Mutex},
4+
time::Duration,
5+
};
26

37
use pyo3::{
48
prelude::*,
@@ -168,12 +172,19 @@ async fn test_panic() -> PyResult<()> {
168172

169173
#[pyo3_asyncio::tokio::test]
170174
async fn test_cancel() -> PyResult<()> {
175+
let completed = Arc::new(Mutex::new(false));
176+
171177
let py_future = Python::with_gil(|py| -> PyResult<PyObject> {
172-
Ok(pyo3_asyncio::tokio::future_into_py(py, async {
173-
tokio::time::sleep(Duration::from_secs(1)).await;
174-
Ok(Python::with_gil(|py| py.None()))
175-
})?
176-
.into())
178+
let completed = Arc::clone(&completed);
179+
Ok(
180+
pyo3_asyncio::tokio::cancellable_future_into_py(py, async move {
181+
tokio::time::sleep(Duration::from_secs(1)).await;
182+
*completed.lock().unwrap() = true;
183+
184+
Ok(Python::with_gil(|py| py.None()))
185+
})?
186+
.into(),
187+
)
177188
})?;
178189

179190
if let Err(e) = Python::with_gil(|py| -> PyResult<_> {
@@ -195,8 +206,14 @@ async fn test_cancel() -> PyResult<()> {
195206
panic!("expected CancelledError");
196207
}
197208

209+
tokio::time::sleep(Duration::from_secs(1)).await;
210+
if *completed.lock().unwrap() {
211+
panic!("future still completed")
212+
}
213+
198214
Ok(())
199215
}
216+
200217
/// This module is implemented in Rust.
201218
#[pymodule]
202219
fn test_mod(_py: Python, m: &PyModule) -> PyResult<()> {

src/async_std.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,42 @@ where
273273
generic::future_into_py_with_loop::<AsyncStdRuntime, F>(event_loop, fut)
274274
}
275275

276+
/// Convert a Rust Future into a Python awaitable
277+
///
278+
/// Unlike [`future_into_py_with_loop`], this function will stop the Rust future from running when
279+
/// the `asyncio.Future` is cancelled from Python.
280+
///
281+
/// # Arguments
282+
/// * `event_loop` - The Python event loop that the awaitable should be attached to
283+
/// * `fut` - The Rust future to be converted
284+
///
285+
/// # Examples
286+
///
287+
/// ```
288+
/// use std::time::Duration;
289+
///
290+
/// use pyo3::prelude::*;
291+
///
292+
/// /// Awaitable sleep function
293+
/// #[pyfunction]
294+
/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
295+
/// let secs = secs.extract()?;
296+
/// pyo3_asyncio::async_std::cancellable_future_into_py_with_loop(
297+
/// pyo3_asyncio::async_std::get_current_loop(py)?,
298+
/// async move {
299+
/// async_std::task::sleep(Duration::from_secs(secs)).await;
300+
/// Python::with_gil(|py| Ok(py.None()))
301+
/// }
302+
/// )
303+
/// }
304+
/// ```
305+
pub fn cancellable_future_into_py_with_loop<F>(event_loop: &PyAny, fut: F) -> PyResult<&PyAny>
306+
where
307+
F: Future<Output = PyResult<PyObject>> + Send + 'static,
308+
{
309+
generic::cancellable_future_into_py_with_loop::<AsyncStdRuntime, F>(event_loop, fut)
310+
}
311+
276312
/// Convert a Rust Future into a Python awaitable
277313
///
278314
/// # Arguments
@@ -303,6 +339,39 @@ where
303339
generic::future_into_py::<AsyncStdRuntime, _>(py, fut)
304340
}
305341

342+
/// Convert a Rust Future into a Python awaitable
343+
///
344+
/// Unlike [`future_into_py`], this function will stop the Rust future from running when
345+
/// the `asyncio.Future` is cancelled from Python.
346+
///
347+
/// # Arguments
348+
/// * `py` - The current PyO3 GIL guard
349+
/// * `fut` - The Rust future to be converted
350+
///
351+
/// # Examples
352+
///
353+
/// ```
354+
/// use std::time::Duration;
355+
///
356+
/// use pyo3::prelude::*;
357+
///
358+
/// /// Awaitable sleep function
359+
/// #[pyfunction]
360+
/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
361+
/// let secs = secs.extract()?;
362+
/// pyo3_asyncio::async_std::cancellable_future_into_py(py, async move {
363+
/// async_std::task::sleep(Duration::from_secs(secs)).await;
364+
/// Python::with_gil(|py| Ok(py.None()))
365+
/// })
366+
/// }
367+
/// ```
368+
pub fn cancellable_future_into_py<F>(py: Python, fut: F) -> PyResult<&PyAny>
369+
where
370+
F: Future<Output = PyResult<PyObject>> + Send + 'static,
371+
{
372+
generic::cancellable_future_into_py::<AsyncStdRuntime, _>(py, fut)
373+
}
374+
306375
/// Convert a `!Send` Rust Future into a Python awaitable
307376
///
308377
/// # Arguments

0 commit comments

Comments
 (0)