1- use std:: { rc:: Rc , time:: Duration } ;
1+ use std:: {
2+ rc:: Rc ,
3+ sync:: { Arc , Mutex } ,
4+ time:: Duration ,
5+ } ;
26
37use pyo3:: {
48 prelude:: * ,
@@ -168,12 +172,19 @@ async fn test_panic() -> PyResult<()> {
168172
169173#[ pyo3_asyncio:: tokio:: test]
170174async fn test_cancel ( ) -> PyResult < ( ) > {
175+ let completed = Arc :: new ( Mutex :: new ( false ) ) ;
176+
171177 let py_future = Python :: with_gil ( |py| -> PyResult < PyObject > {
172- Ok ( pyo3_asyncio:: tokio:: future_into_py ( py, async {
173- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
174- Ok ( Python :: with_gil ( |py| py. None ( ) ) )
175- } ) ?
176- . into ( ) )
178+ let completed = Arc :: clone ( & completed) ;
179+ Ok (
180+ pyo3_asyncio:: tokio:: cancellable_future_into_py ( py, async move {
181+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
182+ * completed. lock ( ) . unwrap ( ) = true ;
183+
184+ Ok ( Python :: with_gil ( |py| py. None ( ) ) )
185+ } ) ?
186+ . into ( ) ,
187+ )
177188 } ) ?;
178189
179190 if let Err ( e) = Python :: with_gil ( |py| -> PyResult < _ > {
@@ -195,8 +206,62 @@ async fn test_cancel() -> PyResult<()> {
195206 panic ! ( "expected CancelledError" ) ;
196207 }
197208
209+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
210+ if * completed. lock ( ) . unwrap ( ) {
211+ panic ! ( "future still completed" )
212+ }
213+
198214 Ok ( ( ) )
199215}
216+
217+ #[ pyo3_asyncio:: tokio:: test]
218+ fn test_local_cancel ( event_loop : PyObject ) -> PyResult < ( ) > {
219+ tokio:: task:: LocalSet :: new ( ) . block_on (
220+ pyo3_asyncio:: tokio:: get_runtime ( ) ,
221+ pyo3_asyncio:: tokio:: scope_local ( event_loop, async {
222+ let completed = Arc :: new ( Mutex :: new ( false ) ) ;
223+ let py_future = Python :: with_gil ( |py| -> PyResult < PyObject > {
224+ let completed = Arc :: clone ( & completed) ;
225+ Ok (
226+ pyo3_asyncio:: tokio:: local_cancellable_future_into_py ( py, async move {
227+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
228+ * completed. lock ( ) . unwrap ( ) = true ;
229+ Ok ( Python :: with_gil ( |py| py. None ( ) ) )
230+ } ) ?
231+ . into ( ) ,
232+ )
233+ } ) ?;
234+
235+ if let Err ( e) = Python :: with_gil ( |py| -> PyResult < _ > {
236+ py_future. as_ref ( py) . call_method0 ( "cancel" ) ?;
237+ pyo3_asyncio:: tokio:: into_future ( py_future. as_ref ( py) )
238+ } ) ?
239+ . await
240+ {
241+ Python :: with_gil ( |py| -> PyResult < ( ) > {
242+ assert ! ( py
243+ . import( "asyncio" ) ?
244+ . getattr( "CancelledError" ) ?
245+ . downcast:: <PyType >( )
246+ . unwrap( )
247+ . is_instance( e. pvalue( py) ) ?) ;
248+ Ok ( ( ) )
249+ } ) ?;
250+ } else {
251+ panic ! ( "expected CancelledError" ) ;
252+ }
253+
254+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
255+
256+ if * completed. lock ( ) . unwrap ( ) {
257+ panic ! ( "future still completed" )
258+ }
259+
260+ Ok ( ( ) )
261+ } ) ,
262+ )
263+ }
264+
200265/// This module is implemented in Rust.
201266#[ pymodule]
202267fn test_mod ( _py : Python , m : & PyModule ) -> PyResult < ( ) > {
0 commit comments