-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathtest_tokio_current_thread_uvloop.rs
More file actions
53 lines (42 loc) · 1.68 KB
/
test_tokio_current_thread_uvloop.rs
File metadata and controls
53 lines (42 loc) · 1.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#[cfg(not(target_os = "windows"))]
fn main() -> pyo3::PyResult<()> {
use pyo3::{prelude::*, types::PyType};
Python::initialize();
let mut builder = tokio::runtime::Builder::new_current_thread();
builder.enable_all();
pyo3_async_runtimes::tokio::init(builder);
std::thread::spawn(move || {
#[allow(deprecated)]
pyo3_async_runtimes::tokio::get_runtime().block_on(futures_util::future::pending::<()>());
});
Python::attach(|py| {
// uvloop not supported on the free-threaded build yet
// https://github.com/MagicStack/uvloop/issues/642
let sysconfig = py.import("sysconfig")?;
let is_freethreaded = sysconfig.call_method1("get_config_var", ("Py_GIL_DISABLED",))?;
if is_freethreaded.is_truthy()? {
return Ok(());
}
// uvloop not yet supported on 3.14
if py.version_info() >= (3, 14) {
return Ok(());
}
let uvloop = py.import("uvloop")?;
uvloop.call_method0("install")?;
// store a reference for the assertion
let uvloop: Py<PyAny> = uvloop.into();
pyo3_async_runtimes::tokio::run(py, async move {
// verify that we are on a uvloop.Loop
Python::attach(|py| -> PyResult<()> {
assert!(pyo3_async_runtimes::tokio::get_current_loop(py)?
.is_instance(uvloop.bind(py).getattr("Loop")?.cast::<PyType>().unwrap())?);
Ok(())
})?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("test test_tokio_current_thread_uvloop ... ok");
Ok(())
})
})
}
#[cfg(target_os = "windows")]
fn main() {}