@@ -98,6 +98,9 @@ lazy_static! {
98
98
} ;
99
99
}
100
100
101
+ const EXPECT_INIT : & ' static str = "PyO3 Asyncio has not been initialized" ;
102
+
103
+ static ASYNCIO : OnceCell < PyObject > = OnceCell :: new ( ) ;
101
104
static EVENT_LOOP : OnceCell < PyObject > = OnceCell :: new ( ) ;
102
105
static EXECUTOR : OnceCell < PyObject > = OnceCell :: new ( ) ;
103
106
static CALL_SOON : OnceCell < PyObject > = OnceCell :: new ( ) ;
@@ -161,6 +164,7 @@ fn try_init(py: Python) -> PyResult<()> {
161
164
let create_task = asyncio. getattr ( "run_coroutine_threadsafe" ) ?;
162
165
let create_future = event_loop. getattr ( "create_future" ) ?;
163
166
167
+ ASYNCIO . get_or_init ( || asyncio. into ( ) ) ;
164
168
EVENT_LOOP . get_or_init ( || event_loop. into ( ) ) ;
165
169
EXECUTOR . get_or_init ( || executor. into ( ) ) ;
166
170
CALL_SOON . get_or_init ( || call_soon. into ( ) ) ;
@@ -176,10 +180,7 @@ fn try_init(py: Python) -> PyResult<()> {
176
180
177
181
/// Get a reference to the Python Event Loop from Rust
178
182
pub fn get_event_loop ( py : Python ) -> & PyAny {
179
- EVENT_LOOP
180
- . get ( )
181
- . expect ( "PyO3 Asyncio Event Loop has not been initialized" )
182
- . as_ref ( py)
183
+ EVENT_LOOP . get ( ) . expect ( EXPECT_INIT ) . as_ref ( py)
183
184
}
184
185
185
186
/// Run the event loop forever
@@ -227,7 +228,7 @@ pub fn get_event_loop(py: Python) -> &PyAny {
227
228
/// # .unwrap();
228
229
/// # })
229
230
pub fn run_forever ( py : Python ) -> PyResult < ( ) > {
230
- if let Err ( e) = EVENT_LOOP . get ( ) . unwrap ( ) . call_method0 ( py , "run_forever" ) {
231
+ if let Err ( e) = get_event_loop ( py ) . call_method0 ( "run_forever" ) {
231
232
if e. is_instance :: < PyKeyboardInterrupt > ( py) {
232
233
Ok ( ( ) )
233
234
} else {
@@ -279,21 +280,21 @@ where
279
280
Ok ( Python :: with_gil ( |py| py. None ( ) ) )
280
281
} ) ?;
281
282
282
- EVENT_LOOP
283
- . get ( )
284
- . unwrap ( )
285
- . call_method1 ( py, "run_until_complete" , ( coro, ) ) ?;
283
+ get_event_loop ( py) . call_method1 ( "run_until_complete" , ( coro, ) ) ?;
286
284
287
285
Ok ( ( ) )
288
286
}
289
287
290
288
/// Shutdown the event loops and perform any necessary cleanup
291
289
fn try_close ( py : Python ) -> PyResult < ( ) > {
292
290
// Shutdown the executor and wait until all threads are cleaned up
293
- EXECUTOR . get ( ) . unwrap ( ) . call_method0 ( py, "shutdown" ) ?;
291
+ EXECUTOR
292
+ . get ( )
293
+ . expect ( EXPECT_INIT )
294
+ . call_method0 ( py, "shutdown" ) ?;
294
295
295
- EVENT_LOOP . get ( ) . unwrap ( ) . call_method0 ( py , "stop" ) ?;
296
- EVENT_LOOP . get ( ) . unwrap ( ) . call_method0 ( py , "close" ) ?;
296
+ get_event_loop ( py ) . call_method0 ( "stop" ) ?;
297
+ get_event_loop ( py ) . call_method0 ( "close" ) ?;
297
298
Ok ( ( ) )
298
299
}
299
300
@@ -399,8 +400,14 @@ impl PyTaskCompleter {
399
400
Err ( e) => Err ( e) ,
400
401
} ;
401
402
402
- if self . tx . take ( ) . unwrap ( ) . send ( result) . is_err ( ) {
403
- // cancellation is not an error
403
+ // unclear to me whether or not this should be a panic or silent error.
404
+ //
405
+ // calling PyTaskCompleter twice should not be possible, but I don't think it really hurts
406
+ // anything if it happens.
407
+ if let Some ( tx) = self . tx . take ( ) {
408
+ if tx. send ( result) . is_err ( ) {
409
+ // cancellation is not an error
410
+ }
404
411
}
405
412
406
413
Ok ( ( ) )
@@ -460,24 +467,43 @@ pub fn into_future(
460
467
461
468
let task = CREATE_TASK
462
469
. get ( )
463
- . unwrap ( )
464
- . call1 ( py, ( coro, EVENT_LOOP . get ( ) . unwrap ( ) ) ) ?;
470
+ . expect ( EXPECT_INIT )
471
+ . call1 ( py, ( coro, get_event_loop ( py ) ) ) ?;
465
472
let on_complete = PyTaskCompleter { tx : Some ( tx) } ;
466
473
467
474
task. call_method1 ( py, "add_done_callback" , ( on_complete, ) ) ?;
468
475
469
- Ok ( async move { rx. await . unwrap ( ) } )
476
+ Ok ( async move {
477
+ match rx. await {
478
+ Ok ( item) => item,
479
+ Err ( _) => Python :: with_gil ( |py| {
480
+ Err ( PyErr :: from_instance (
481
+ ASYNCIO
482
+ . get ( )
483
+ . expect ( EXPECT_INIT )
484
+ . call_method0 ( py, "CancelledError" ) ?
485
+ . as_ref ( py) ,
486
+ ) )
487
+ } ) ,
488
+ }
489
+ } )
470
490
}
471
491
472
492
fn set_result ( py : Python , future : & PyAny , result : PyResult < PyObject > ) -> PyResult < ( ) > {
473
493
match result {
474
494
Ok ( val) => {
475
495
let set_result = future. getattr ( "set_result" ) ?;
476
- CALL_SOON . get ( ) . unwrap ( ) . call1 ( py, ( set_result, val) ) ?;
496
+ CALL_SOON
497
+ . get ( )
498
+ . expect ( EXPECT_INIT )
499
+ . call1 ( py, ( set_result, val) ) ?;
477
500
}
478
501
Err ( err) => {
479
502
let set_exception = future. getattr ( "set_exception" ) ?;
480
- CALL_SOON . get ( ) . unwrap ( ) . call1 ( py, ( set_exception, err) ) ?;
503
+ CALL_SOON
504
+ . get ( )
505
+ . expect ( EXPECT_INIT )
506
+ . call1 ( py, ( set_exception, err) ) ?;
481
507
}
482
508
}
483
509
@@ -520,7 +546,7 @@ pub fn into_coroutine<F>(py: Python, fut: F) -> PyResult<PyObject>
520
546
where
521
547
F : Future < Output = PyResult < PyObject > > + Send + ' static ,
522
548
{
523
- let future_rx = CREATE_FUTURE . get ( ) . unwrap ( ) . call0 ( py) ?;
549
+ let future_rx = CREATE_FUTURE . get ( ) . expect ( EXPECT_INIT ) . call0 ( py) ?;
524
550
let future_tx1 = future_rx. clone ( ) ;
525
551
let future_tx2 = future_rx. clone ( ) ;
526
552
@@ -529,22 +555,29 @@ where
529
555
let result = fut. await ;
530
556
531
557
Python :: with_gil ( move |py| {
532
- set_result ( py, future_tx1. as_ref ( py) , result)
558
+ if set_result ( py, future_tx1. as_ref ( py) , result)
533
559
. map_err ( dump_err ( py) )
534
- . unwrap ( )
560
+ . is_err ( )
561
+ {
562
+
563
+ // Cancelled
564
+ }
535
565
} ) ;
536
566
} )
537
567
. await
538
568
{
539
569
if e. is_panic ( ) {
540
570
Python :: with_gil ( move |py| {
541
- set_result (
571
+ if set_result (
542
572
py,
543
573
future_tx2. as_ref ( py) ,
544
574
Err ( PyException :: new_err ( "rust future panicked" ) ) ,
545
575
)
546
576
. map_err ( dump_err ( py) )
547
- . unwrap ( )
577
+ . is_err ( )
578
+ {
579
+ // Cancelled
580
+ }
548
581
} ) ;
549
582
}
550
583
}
0 commit comments