Skip to content

Commit 0ffcea7

Browse files
author
Andrew J Westlake
committed
Merged in lazy tokio initialization
2 parents 211b9d1 + 2d315cf commit 0ffcea7

File tree

8 files changed

+54
-111
lines changed

8 files changed

+54
-111
lines changed

README.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ Export an async function that makes use of `async-std`:
124124
use pyo3::{prelude::*, wrap_pyfunction};
125125

126126
#[pyfunction]
127-
fn rust_sleep(py: Python) -> PyResult<PyObject> {
128-
pyo3_asyncio::async_std::into_coroutine(py, async {
127+
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
128+
pyo3_asyncio::async_std::future_into_py(py, async {
129129
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
130130
Ok(Python::with_gil(|py| py.None()))
131131
})
@@ -148,19 +148,15 @@ If you want to use `tokio` instead, here's what your module should look like:
148148
use pyo3::{prelude::*, wrap_pyfunction};
149149

150150
#[pyfunction]
151-
fn rust_sleep(py: Python) -> PyResult<PyObject> {
152-
pyo3_asyncio::tokio::into_coroutine(py, async {
151+
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
152+
pyo3_asyncio::tokio::future_into_py(py, async {
153153
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
154154
Ok(Python::with_gil(|py| py.None()))
155155
})
156156
}
157157

158158
#[pymodule]
159159
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
160-
// Tokio needs explicit initialization before any pyo3-asyncio conversions.
161-
// The module import is a prime place to do this.
162-
pyo3_asyncio::tokio::init_multi_thread_once();
163-
164160
m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
165161

166162
Ok(())

pyo3-asyncio-macros/src/tokio.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,16 +219,23 @@ fn parse_knobs(
219219

220220
let config = config.build()?;
221221

222-
let mut rt = match config.flavor {
222+
let builder = match config.flavor {
223223
RuntimeFlavor::CurrentThread => quote! {
224224
pyo3_asyncio::tokio::re_exports::runtime::Builder::new_current_thread()
225225
},
226226
RuntimeFlavor::Threaded => quote! {
227227
pyo3_asyncio::tokio::re_exports::runtime::Builder::new_multi_thread()
228228
},
229229
};
230+
231+
let mut builder_init = quote! {
232+
builder.enable_all();
233+
};
230234
if let Some(v) = config.worker_threads {
231-
rt = quote! { #rt.worker_threads(#v) };
235+
builder_init = quote! {
236+
builder.worker_threads(#v);
237+
#builder_init;
238+
};
232239
}
233240

234241
let rt_init = match config.flavor {
@@ -249,12 +256,10 @@ fn parse_knobs(
249256

250257
pyo3::prepare_freethreaded_python();
251258

252-
pyo3_asyncio::tokio::init(
253-
#rt
254-
.enable_all()
255-
.build()
256-
.unwrap()
257-
);
259+
let mut builder = #builder;
260+
#builder_init;
261+
262+
pyo3_asyncio::tokio::init(builder);
258263

259264
#rt_init
260265

pytests/test_tokio_current_thread_asyncio.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@ fn main() -> pyo3::PyResult<()> {
1010
Python::with_gil(|py| {
1111
// into_coroutine requires the 0.13 API
1212
pyo3_asyncio::try_init(py)?;
13-
pyo3_asyncio::tokio::init_current_thread();
13+
14+
let mut builder = tokio::runtime::Builder::new_current_thread();
15+
builder.enable_all();
16+
17+
pyo3_asyncio::tokio::init(builder);
18+
std::thread::spawn(move || {
19+
pyo3_asyncio::tokio::get_runtime().block_on(futures::future::pending::<()>());
20+
});
21+
1422
pyo3_asyncio::tokio::run(py, pyo3_asyncio::testing::main())
1523
})
1624
}

pytests/test_tokio_current_thread_run_forever.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@ mod tokio_run_forever;
22

33
fn main() {
44
pyo3::prepare_freethreaded_python();
5-
pyo3_asyncio::tokio::init_current_thread();
5+
6+
let mut builder = tokio::runtime::Builder::new_current_thread();
7+
builder.enable_all();
8+
9+
pyo3_asyncio::tokio::init(builder);
10+
std::thread::spawn(move || {
11+
pyo3_asyncio::tokio::get_runtime().block_on(futures::future::pending::<()>());
12+
});
613

714
tokio_run_forever::test_main();
815
}

pytests/test_tokio_multi_thread_asyncio.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ fn main() -> pyo3::PyResult<()> {
1010
Python::with_gil(|py| {
1111
// into_coroutine requires the 0.13 API
1212
pyo3_asyncio::try_init(py)?;
13-
pyo3_asyncio::tokio::init_multi_thread();
1413
pyo3_asyncio::tokio::run(py, pyo3_asyncio::testing::main())
1514
})
1615
}

pytests/test_tokio_multi_thread_run_forever.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,5 @@ mod tokio_run_forever;
22

33
fn main() {
44
pyo3::prepare_freethreaded_python();
5-
6-
pyo3_asyncio::tokio::init_multi_thread();
7-
85
tokio_run_forever::test_main();
96
}

pytests/tokio_asyncio/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,16 +124,6 @@ async fn test_other_awaitables() -> PyResult<()> {
124124
.await
125125
}
126126

127-
#[pyo3_asyncio::tokio::test]
128-
fn test_init_tokio_twice() -> PyResult<()> {
129-
// tokio has already been initialized in test main. call these functions to
130-
// make sure they don't cause problems with the other tests.
131-
pyo3_asyncio::tokio::init_multi_thread_once();
132-
pyo3_asyncio::tokio::init_current_thread_once();
133-
134-
Ok(())
135-
}
136-
137127
#[pyo3_asyncio::tokio::test]
138128
fn test_local_future_into_py(event_loop: PyObject) -> PyResult<()> {
139129
tokio::task::LocalSet::new().block_on(pyo3_asyncio::tokio::get_runtime(), async {

src/tokio.rs

Lines changed: 20 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
use std::{future::Future, pin::Pin, thread};
1+
use std::{future::Future, pin::Pin, sync::Mutex};
22

33
use ::tokio::{
44
runtime::{Builder, Runtime},
55
task,
66
};
7-
use futures::future::pending;
8-
use once_cell::{sync::OnceCell, unsync::OnceCell as UnsyncOnceCell};
7+
use once_cell::{
8+
sync::{Lazy, OnceCell},
9+
unsync::OnceCell as UnsyncOnceCell,
10+
};
911
use pyo3::prelude::*;
1012

1113
use crate::generic::{self, Runtime as GenericRuntime, SpawnLocalExt};
@@ -30,10 +32,9 @@ pub use pyo3_asyncio_macros::tokio_main as main;
3032
#[cfg(all(feature = "attributes", feature = "testing"))]
3133
pub use pyo3_asyncio_macros::tokio_test as test;
3234

35+
static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_thread()));
3336
static TOKIO_RUNTIME: OnceCell<Runtime> = OnceCell::new();
3437

35-
const EXPECT_TOKIO_INIT: &str = "Tokio runtime must be initialized";
36-
3738
impl generic::JoinError for task::JoinError {
3839
fn is_panic(&self) -> bool {
3940
task::JoinError::is_panic(self)
@@ -121,79 +122,26 @@ pub fn get_current_loop(py: Python) -> PyResult<&PyAny> {
121122
generic::get_current_loop::<TokioRuntime>(py)
122123
}
123124

124-
/// Initialize the Tokio Runtime with a custom build
125-
pub fn init(runtime: Runtime) {
126-
TOKIO_RUNTIME
127-
.set(runtime)
128-
.expect("Tokio Runtime has already been initialized");
129-
}
130-
131-
fn current_thread() -> Runtime {
132-
Builder::new_current_thread()
133-
.enable_all()
134-
.build()
135-
.expect("Couldn't build the current-thread Tokio runtime")
136-
}
137-
138-
fn start_current_thread() {
139-
thread::spawn(move || {
140-
TOKIO_RUNTIME.get().unwrap().block_on(pending::<()>());
141-
});
142-
}
143-
144-
/// Initialize the Tokio Runtime with current-thread scheduler
145-
///
146-
/// # Panics
147-
/// This function will panic if called a second time. See [`init_current_thread_once`] if you want
148-
/// to avoid this panic.
149-
pub fn init_current_thread() {
150-
init(current_thread());
151-
start_current_thread();
125+
/// Initialize the Tokio runtime with a custom build
126+
pub fn init(builder: Builder) {
127+
*TOKIO_BUILDER.lock().unwrap() = builder
152128
}
153129

154130
/// Get a reference to the current tokio runtime
155131
pub fn get_runtime<'a>() -> &'a Runtime {
156-
TOKIO_RUNTIME.get().expect(EXPECT_TOKIO_INIT)
157-
}
158-
159-
fn multi_thread() -> Runtime {
160-
Builder::new_multi_thread()
161-
.enable_all()
162-
.build()
163-
.expect("Couldn't build the multi-thread Tokio runtime")
164-
}
165-
166-
/// Initialize the Tokio Runtime with the multi-thread scheduler
167-
///
168-
/// # Panics
169-
/// This function will panic if called a second time. See [`init_multi_thread_once`] if you want to
170-
/// avoid this panic.
171-
pub fn init_multi_thread() {
172-
init(multi_thread());
173-
}
174-
175-
/// Ensure that the Tokio Runtime is initialized
176-
///
177-
/// If the runtime has not been initialized already, the multi-thread scheduler
178-
/// is used. Calling this function a second time is a no-op.
179-
pub fn init_multi_thread_once() {
180-
TOKIO_RUNTIME.get_or_init(|| multi_thread());
181-
}
182-
183-
/// Ensure that the Tokio Runtime is initialized
184-
///
185-
/// If the runtime has not been initialized already, the current-thread
186-
/// scheduler is used. Calling this function a second time is a no-op.
187-
pub fn init_current_thread_once() {
188-
let mut initialized = false;
189132
TOKIO_RUNTIME.get_or_init(|| {
190-
initialized = true;
191-
current_thread()
192-
});
133+
TOKIO_BUILDER
134+
.lock()
135+
.unwrap()
136+
.build()
137+
.expect("Unable to build Tokio runtime")
138+
})
139+
}
193140

194-
if initialized {
195-
start_current_thread();
196-
}
141+
fn multi_thread() -> Builder {
142+
let mut builder = Builder::new_multi_thread();
143+
builder.enable_all();
144+
builder
197145
}
198146

199147
/// Run the event loop until the given Future completes
@@ -213,17 +161,10 @@ pub fn init_current_thread_once() {
213161
/// # use std::time::Duration;
214162
/// #
215163
/// # use pyo3::prelude::*;
216-
/// # use tokio::runtime::{Builder, Runtime};
217-
/// #
218-
/// # let runtime = Builder::new_current_thread()
219-
/// # .enable_all()
220-
/// # .build()
221-
/// # .expect("Couldn't build the runtime");
222164
/// #
223165
/// # pyo3::prepare_freethreaded_python();
224166
/// # Python::with_gil(|py| {
225167
/// # pyo3_asyncio::with_runtime(py, || {
226-
/// # pyo3_asyncio::tokio::init_current_thread();
227168
/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?;
228169
/// pyo3_asyncio::tokio::run_until_complete(event_loop, async move {
229170
/// tokio::time::sleep(Duration::from_secs(1)).await;

0 commit comments

Comments
 (0)