Skip to content

Commit 2d315cf

Browse files
author
Andrew J Westlake
committed
Made tokio initialization lazy, current thread scheduler init is a little more complicated now
1 parent 85b4c08 commit 2d315cf

File tree

6 files changed

+38
-104
lines changed

6 files changed

+38
-104
lines changed

README.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,7 @@ fn rust_sleep(py: Python) -> PyResult<PyObject> {
162162
#[pymodule]
163163
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
164164
pyo3_asyncio::try_init(py)?;
165-
// Tokio needs explicit initialization before any pyo3-asyncio conversions.
166-
// The module import is a prime place to do this.
167-
pyo3_asyncio::tokio::init_multi_thread_once();
168-
165+
169166
m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
170167

171168
Ok(())

pyo3-asyncio-macros/src/tokio.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,16 +219,23 @@ fn parse_knobs(
219219

220220
let config = config.build()?;
221221

222-
let mut rt = match config.flavor {
222+
let builder = match config.flavor {
223223
RuntimeFlavor::CurrentThread => quote! {
224224
pyo3_asyncio::tokio::re_exports::runtime::Builder::new_current_thread()
225225
},
226226
RuntimeFlavor::Threaded => quote! {
227227
pyo3_asyncio::tokio::re_exports::runtime::Builder::new_multi_thread()
228228
},
229229
};
230+
231+
let mut builder_init = quote! {
232+
builder.enable_all();
233+
};
230234
if let Some(v) = config.worker_threads {
231-
rt = quote! { #rt.worker_threads(#v) };
235+
builder_init = quote! {
236+
builder.worker_threads(#v);
237+
#builder_init;
238+
};
232239
}
233240

234241
let rt_init = match config.flavor {
@@ -247,12 +254,10 @@ fn parse_knobs(
247254
#body
248255
}
249256

250-
pyo3_asyncio::tokio::init(
251-
#rt
252-
.enable_all()
253-
.build()
254-
.unwrap()
255-
);
257+
let mut builder = #builder;
258+
#builder_init;
259+
260+
pyo3_asyncio::tokio::init(builder);
256261

257262
#rt_init
258263

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
mod tokio_run_forever;
22

33
fn main() {
4-
pyo3_asyncio::tokio::init_current_thread();
4+
let mut builder = tokio::runtime::Builder::new_current_thread();
5+
builder.enable_all();
6+
7+
pyo3_asyncio::tokio::init(builder);
8+
std::thread::spawn(move || {
9+
pyo3_asyncio::tokio::get_runtime().block_on(std::future::pending::<()>());
10+
});
511

612
tokio_run_forever::test_main();
713
}
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
mod tokio_run_forever;
22

33
fn main() {
4-
pyo3_asyncio::tokio::init_multi_thread();
5-
64
tokio_run_forever::test_main();
75
}

pytests/tokio_asyncio/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,6 @@ fn test_init_twice() -> PyResult<()> {
7373
common::test_init_twice()
7474
}
7575

76-
#[pyo3_asyncio::tokio::test]
77-
fn test_init_tokio_twice() -> PyResult<()> {
78-
// tokio has already been initialized in test main. call these functions to
79-
// make sure they don't cause problems with the other tests.
80-
pyo3_asyncio::tokio::init_multi_thread_once();
81-
pyo3_asyncio::tokio::init_current_thread_once();
82-
83-
Ok(())
84-
}
85-
8676
#[pyo3_asyncio::tokio::test]
8777
fn test_local_set_coroutine() -> PyResult<()> {
8878
tokio::task::LocalSet::new().block_on(pyo3_asyncio::tokio::get_runtime(), async {

src/tokio.rs

Lines changed: 17 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
use std::{future::Future, thread};
1+
use std::{future::Future, sync::Mutex};
22

33
use ::tokio::{
44
runtime::{Builder, Runtime},
55
task,
66
};
7-
use futures::future::pending;
8-
use once_cell::sync::OnceCell;
7+
use once_cell::sync::{Lazy, OnceCell};
98
use pyo3::prelude::*;
109

1110
use crate::generic;
@@ -30,10 +29,9 @@ pub use pyo3_asyncio_macros::tokio_main as main;
3029
#[cfg(all(feature = "attributes", feature = "testing"))]
3130
pub use pyo3_asyncio_macros::tokio_test as test;
3231

32+
static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
3333
static TOKIO_RUNTIME: OnceCell<Runtime> = OnceCell::new();
3434

35-
const EXPECT_TOKIO_INIT: &str = "Tokio runtime must be initialized";
36-
3735
impl generic::JoinError for task::JoinError {
3836
fn is_panic(&self) -> bool {
3937
task::JoinError::is_panic(self)
@@ -65,79 +63,26 @@ impl generic::SpawnLocalExt for TokioRuntime {
6563
}
6664
}
6765

68-
/// Initialize the Tokio Runtime with a custom build
69-
pub fn init(runtime: Runtime) {
70-
TOKIO_RUNTIME
71-
.set(runtime)
72-
.expect("Tokio Runtime has already been initialized");
73-
}
74-
75-
fn current_thread() -> Runtime {
76-
Builder::new_current_thread()
77-
.enable_all()
78-
.build()
79-
.expect("Couldn't build the current-thread Tokio runtime")
80-
}
81-
82-
fn start_current_thread() {
83-
thread::spawn(move || {
84-
TOKIO_RUNTIME.get().unwrap().block_on(pending::<()>());
85-
});
86-
}
87-
88-
/// Initialize the Tokio Runtime with current-thread scheduler
89-
///
90-
/// # Panics
91-
/// This function will panic if called a second time. See [`init_current_thread_once`] if you want
92-
/// to avoid this panic.
93-
pub fn init_current_thread() {
94-
init(current_thread());
95-
start_current_thread();
66+
/// Initialize the Tokio runtime with a custom build
67+
pub fn init(builder: Builder) {
68+
*TOKIO_BUILDER.lock().unwrap() = builder
9669
}
9770

9871
/// Get a reference to the current tokio runtime
9972
pub fn get_runtime<'a>() -> &'a Runtime {
100-
TOKIO_RUNTIME.get().expect(EXPECT_TOKIO_INIT)
101-
}
102-
103-
fn multi_thread() -> Runtime {
104-
Builder::new_multi_thread()
105-
.enable_all()
106-
.build()
107-
.expect("Couldn't build the multi-thread Tokio runtime")
108-
}
109-
110-
/// Initialize the Tokio Runtime with the multi-thread scheduler
111-
///
112-
/// # Panics
113-
/// This function will panic if called a second time. See [`init_multi_thread_once`] if you want to
114-
/// avoid this panic.
115-
pub fn init_multi_thread() {
116-
init(multi_thread());
117-
}
118-
119-
/// Ensure that the Tokio Runtime is initialized
120-
///
121-
/// If the runtime has not been initialized already, the multi-thread scheduler
122-
/// is used. Calling this function a second time is a no-op.
123-
pub fn init_multi_thread_once() {
124-
TOKIO_RUNTIME.get_or_init(|| multi_thread());
125-
}
126-
127-
/// Ensure that the Tokio Runtime is initialized
128-
///
129-
/// If the runtime has not been initialized already, the current-thread
130-
/// scheduler is used. Calling this function a second time is a no-op.
131-
pub fn init_current_thread_once() {
132-
let mut initialized = false;
13373
TOKIO_RUNTIME.get_or_init(|| {
134-
initialized = true;
135-
current_thread()
136-
});
74+
TOKIO_BUILDER
75+
.lock()
76+
.unwrap()
77+
.build()
78+
.expect("Unable to build Tokio runtime")
79+
})
80+
}
13781

138-
if initialized {
139-
start_current_thread();
140-
}
82+
fn multi_thread() -> Builder {
83+
let mut builder = Builder::new_multi_thread();
84+
builder.enable_all();
85+
builder
14186
}
14287

14388
/// Run the event loop until the given Future completes
@@ -157,16 +102,9 @@ pub fn init_current_thread_once() {
157102
/// # use std::time::Duration;
158103
/// #
159104
/// # use pyo3::prelude::*;
160-
/// # use tokio::runtime::{Builder, Runtime};
161-
/// #
162-
/// # let runtime = Builder::new_current_thread()
163-
/// # .enable_all()
164-
/// # .build()
165-
/// # .expect("Couldn't build the runtime");
166105
/// #
167106
/// # Python::with_gil(|py| {
168107
/// # pyo3_asyncio::with_runtime(py, || {
169-
/// # pyo3_asyncio::tokio::init_current_thread();
170108
/// pyo3_asyncio::tokio::run_until_complete(py, async move {
171109
/// tokio::time::sleep(Duration::from_secs(1)).await;
172110
/// Ok(())

0 commit comments

Comments
 (0)