@@ -23,6 +23,14 @@ pub trait Runtime {
23
23
F : Future < Output = ( ) > + Send + ' static ;
24
24
}
25
25
26
+ /// Extension trait for async/await runtimes that support spawning local tasks
27
+ pub trait SpawnLocalExt : Runtime {
28
+ /// Spawn a !Send future onto this runtime's event loop
29
+ fn spawn_local < F > ( fut : F ) -> Self :: JoinHandle
30
+ where
31
+ F : Future < Output = ( ) > + ' static ;
32
+ }
33
+
26
34
/// Run the event loop until the given Future completes
27
35
///
28
36
/// After this function returns, the event loop can be resumed with either [`run_until_complete`] or
@@ -236,3 +244,123 @@ where
236
244
237
245
Ok ( future_rx)
238
246
}
247
+
248
+ /// Convert a `!Send` Rust Future into a Python coroutine with a generic runtime
249
+ ///
250
+ /// # Arguments
251
+ /// * `py` - The current PyO3 GIL guard
252
+ /// * `fut` - The Rust future to be converted
253
+ ///
254
+ /// # Examples
255
+ ///
256
+ /// ```no_run
257
+ /// # use std::{task::{Context, Poll}, pin::Pin, future::Future};
258
+ /// #
259
+ /// # use pyo3_asyncio::generic::{JoinError, SpawnLocalExt, Runtime};
260
+ /// #
261
+ /// # struct MyCustomJoinError;
262
+ /// #
263
+ /// # impl JoinError for MyCustomJoinError {
264
+ /// # fn is_panic(&self) -> bool {
265
+ /// # unreachable!()
266
+ /// # }
267
+ /// # }
268
+ /// #
269
+ /// # struct MyCustomJoinHandle;
270
+ /// #
271
+ /// # impl Future for MyCustomJoinHandle {
272
+ /// # type Output = Result<(), MyCustomJoinError>;
273
+ /// #
274
+ /// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
275
+ /// # unreachable!()
276
+ /// # }
277
+ /// # }
278
+ /// #
279
+ /// # struct MyCustomRuntime;
280
+ /// #
281
+ /// # impl MyCustomRuntime {
282
+ /// # async fn sleep(_: Duration) {
283
+ /// # unreachable!()
284
+ /// # }
285
+ /// # }
286
+ /// #
287
+ /// # impl Runtime for MyCustomRuntime {
288
+ /// # type JoinError = MyCustomJoinError;
289
+ /// # type JoinHandle = MyCustomJoinHandle;
290
+ /// #
291
+ /// # fn spawn<F>(fut: F) -> Self::JoinHandle
292
+ /// # where
293
+ /// # F: Future<Output = ()> + Send + 'static
294
+ /// # {
295
+ /// # unreachable!()
296
+ /// # }
297
+ /// # }
298
+ /// #
299
+ /// # impl SpawnLocalExt for MyCustomRuntime {
300
+ /// # fn spawn_local<F>(fut: F) -> Self::JoinHandle
301
+ /// # where
302
+ /// # F: Future<Output = ()> + 'static
303
+ /// # {
304
+ /// # unreachable!()
305
+ /// # }
306
+ /// # }
307
+ /// #
308
+ /// use std::time::Duration;
309
+ ///
310
+ /// use pyo3::prelude::*;
311
+ ///
312
+ /// /// Awaitable sleep function
313
+ /// #[pyfunction]
314
+ /// fn sleep_for(py: Python, secs: &PyAny) -> PyResult<PyObject> {
315
+ /// let secs = secs.extract()?;
316
+ ///
317
+ /// pyo3_asyncio::generic::into_local_py_future::<MyCustomRuntime, _>(py, async move {
318
+ /// MyCustomRuntime::sleep(Duration::from_secs(secs)).await;
319
+ /// Python::with_gil(|py| Ok(py.None()))
320
+ /// })
321
+ /// }
322
+ /// ```
323
+ pub fn into_local_py_future < R , F > ( py : Python , fut : F ) -> PyResult < PyObject >
324
+ where
325
+ R : SpawnLocalExt ,
326
+ F : Future < Output = PyResult < PyObject > > + ' static ,
327
+ {
328
+ let future_rx = CREATE_FUTURE . get ( ) . expect ( EXPECT_INIT ) . call0 ( py) ?;
329
+ let future_tx1 = future_rx. clone ( ) ;
330
+ let future_tx2 = future_rx. clone ( ) ;
331
+
332
+ R :: spawn_local ( async move {
333
+ if let Err ( e) = R :: spawn_local ( async move {
334
+ let result = fut. await ;
335
+
336
+ Python :: with_gil ( move |py| {
337
+ if set_result ( py, future_tx1. as_ref ( py) , result)
338
+ . map_err ( dump_err ( py) )
339
+ . is_err ( )
340
+ {
341
+
342
+ // Cancelled
343
+ }
344
+ } ) ;
345
+ } )
346
+ . await
347
+ {
348
+ if e. is_panic ( ) {
349
+ Python :: with_gil ( move |py| {
350
+ if set_result (
351
+ py,
352
+ future_tx2. as_ref ( py) ,
353
+ Err ( PyException :: new_err ( "rust future panicked" ) ) ,
354
+ )
355
+ . map_err ( dump_err ( py) )
356
+ . is_err ( )
357
+ {
358
+ // Cancelled
359
+ }
360
+ } ) ;
361
+ }
362
+ }
363
+ } ) ;
364
+
365
+ Ok ( future_rx)
366
+ }
0 commit comments