@@ -24,6 +24,7 @@ lazy_static! {
24
24
}
25
25
26
26
static EVENT_LOOP : OnceCell < PyObject > = OnceCell :: new ( ) ;
27
+ static EXECUTOR : OnceCell < PyObject > = OnceCell :: new ( ) ;
27
28
static CALL_SOON : OnceCell < PyObject > = OnceCell :: new ( ) ;
28
29
static CREATE_TASK : OnceCell < PyObject > = OnceCell :: new ( ) ;
29
30
static CREATE_FUTURE : OnceCell < PyObject > = OnceCell :: new ( ) ;
@@ -34,12 +35,16 @@ static CREATE_FUTURE: OnceCell<PyObject> = OnceCell::new();
34
35
pub fn try_init ( py : Python ) -> PyResult < ( ) > {
35
36
let asyncio = py. import ( "asyncio" ) ?;
36
37
let event_loop = asyncio. call_method0 ( "get_event_loop" ) ?;
38
+ let executor = py. import ( "concurrent.futures.thread" ) ?. getattr ( "ThreadPoolExecutor" ) ?. call0 ( ) ?;
39
+
40
+ event_loop. call_method1 ( "set_default_executor" , ( executor, ) ) ?;
37
41
38
42
let call_soon = event_loop. getattr ( "call_soon_threadsafe" ) ?;
39
43
let create_task = asyncio. getattr ( "run_coroutine_threadsafe" ) ?;
40
44
let create_future = event_loop. getattr ( "create_future" ) ?;
41
45
42
46
EVENT_LOOP . get_or_init ( || event_loop. into ( ) ) ;
47
+ EXECUTOR . get_or_init ( || executor. into ( ) ) ;
43
48
CALL_SOON . get_or_init ( || call_soon. into ( ) ) ;
44
49
CREATE_TASK . get_or_init ( || create_task. into ( ) ) ;
45
50
CREATE_FUTURE . get_or_init ( || create_future. into ( ) ) ;
87
92
}
88
93
89
94
pub fn close ( py : Python ) -> PyResult < ( ) > {
95
+ // Shutdown the executor and wait until all threads are cleaned up
96
+ EXECUTOR . get ( ) . unwrap ( ) . call_method0 ( py, "shutdown" ) ?;
97
+
98
+ EVENT_LOOP
99
+ . get ( )
100
+ . unwrap ( )
101
+ . call_method0 ( py, "stop" ) ?;
90
102
EVENT_LOOP . get ( ) . unwrap ( ) . call_method0 ( py, "close" ) ?;
103
+
91
104
Ok ( ( ) )
92
105
}
93
106
0 commit comments