Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "pyo3-async-runtimes"
description = "PyO3 bridges from Rust runtimes to Python's Asyncio library"
version = "0.22.0"
version = "0.23.0"
authors = [
"Andrew J Westlake <[email protected]>",
"David Hewitt <[email protected]>",
Expand Down Expand Up @@ -120,11 +120,11 @@ futures = "0.3"
inventory = { version = "0.3", optional = true }
once_cell = "1.14"
pin-project-lite = "0.2"
pyo3 = "0.22"
pyo3-async-runtimes-macros = { path = "pyo3-async-runtimes-macros", version = "=0.22.0", optional = true }
pyo3 = "0.23"
pyo3-async-runtimes-macros = { path = "pyo3-async-runtimes-macros", version = "=0.23.0", optional = true }

[dev-dependencies]
pyo3 = { version = "0.22", features = ["macros"] }
pyo3 = { version = "0.23", features = ["macros"] }

[dependencies.async-std]
version = "1.12"
Expand Down
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ Here we initialize the runtime, import Python's `asyncio` library and run the gi
```toml
# Cargo.toml dependencies
[dependencies]
pyo3 = { version = "0.22" }
pyo3-async-runtimes = { version = "0.22", features = ["attributes", "async-std-runtime"] }
pyo3 = { version = "0.23" }
pyo3-async-runtimes = { version = "0.23", features = ["attributes", "async-std-runtime"] }
async-std = "1.13"
```

Expand Down Expand Up @@ -84,8 +84,8 @@ attribute.
```toml
# Cargo.toml dependencies
[dependencies]
pyo3 = { version = "0.22" }
pyo3-async-runtimes = { version = "0.22", features = ["attributes", "tokio-runtime"] }
pyo3 = { version = "0.23" }
pyo3-async-runtimes = { version = "0.23", features = ["attributes", "tokio-runtime"] }
tokio = "1.40"
```

Expand Down Expand Up @@ -130,8 +130,8 @@ For `async-std`:

```toml
[dependencies]
pyo3 = { version = "0.22", features = ["extension-module"] }
pyo3-async-runtimes = { version = "0.22", features = ["async-std-runtime"] }
pyo3 = { version = "0.23", features = ["extension-module"] }
pyo3-async-runtimes = { version = "0.23", features = ["async-std-runtime"] }
async-std = "1.13"
```

Expand All @@ -140,7 +140,7 @@ For `tokio`:
```toml
[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"] }
pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"] }
tokio = "1.40"
```

Expand Down Expand Up @@ -434,8 +434,8 @@ name = "my_async_module"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.22", features = ["extension-module"] }
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"] }
pyo3 = { version = "0.23", features = ["extension-module"] }
pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"] }
async-std = "1.13"
tokio = "1.40"
```
Expand Down Expand Up @@ -494,8 +494,8 @@ event loop before we can install the `uvloop` policy.
```toml
[dependencies]
async-std = "1.13"
pyo3 = "0.22"
pyo3-async-runtimes = { version = "0.22", features = ["async-std-runtime"] }
pyo3 = "0.23"
pyo3-async-runtimes = { version = "0.23", features = ["async-std-runtime"] }
```

```rust no_run
Expand Down
2 changes: 1 addition & 1 deletion pyo3-async-runtimes-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "pyo3-async-runtimes-macros"
description = "Proc Macro Attributes for `pyo3-async-runtimes`"
version = "0.22.0"
version = "0.23.0"
authors = [
"Andrew J Westlake <[email protected]>",
"David Hewitt <[email protected]>",
Expand Down
10 changes: 5 additions & 5 deletions src/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//!
//! ```toml
//! [dependencies.pyo3-async-runtimes]
//! version = "0.22"
//! version = "0.23"
//! features = ["unstable-streams"]
//! ```

Expand Down Expand Up @@ -279,7 +279,7 @@ pub fn future_into_py_with_locals<F, T>(
) -> PyResult<Bound<PyAny>>
where
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
generic::future_into_py_with_locals::<AsyncStdRuntime, F, T>(py, locals, fut)
}
Expand Down Expand Up @@ -325,7 +325,7 @@ where
pub fn future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
where
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
generic::future_into_py::<AsyncStdRuntime, _, T>(py, fut)
}
Expand Down Expand Up @@ -400,7 +400,7 @@ pub fn local_future_into_py_with_locals<F, T>(
) -> PyResult<Bound<PyAny>>
where
F: Future<Output = PyResult<T>> + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
generic::local_future_into_py_with_locals::<AsyncStdRuntime, _, T>(py, locals, fut)
}
Expand Down Expand Up @@ -466,7 +466,7 @@ where
pub fn local_future_into_py<F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
where
F: Future<Output = PyResult<T>> + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
generic::local_future_into_py::<AsyncStdRuntime, _, T>(py, fut)
}
Expand Down
72 changes: 47 additions & 25 deletions src/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
//!
//! ```toml
//! [dependencies.pyo3-async-runtimes]
//! version = "0.22"
//! version = "0.23"
//! features = ["unstable-streams"]
//! ```

use std::{
ffi::CString,
future::Future,
pin::Pin,
sync::{Arc, Mutex},
Expand All @@ -31,6 +32,7 @@ use futures::{channel::mpsc, SinkExt};
use once_cell::sync::OnceCell;
use pin_project_lite::pin_project;
use pyo3::prelude::*;
use pyo3::BoundObject;
#[cfg(feature = "unstable-streams")]
use std::marker::PhantomData;

Expand Down Expand Up @@ -347,8 +349,11 @@ fn set_result(
let none = py.None().into_bound(py);

let (complete, val) = match result {
Ok(val) => (future.getattr("set_result")?, val.into_py(py)),
Err(err) => (future.getattr("set_exception")?, err.into_py(py)),
Ok(val) => (future.getattr("set_result")?, val.into_pyobject(py)?),
Err(err) => (
future.getattr("set_exception")?,
err.into_pyobject(py)?.into_any(),
),
};
call_soon_threadsafe(event_loop, &none, (CheckedCompletor, future, complete, val))?;

Expand Down Expand Up @@ -581,7 +586,7 @@ pub fn future_into_py_with_locals<R, F, T>(
where
R: Runtime + ContextExt,
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
let (cancel_tx, cancel_rx) = oneshot::channel();

Expand Down Expand Up @@ -617,7 +622,10 @@ where
let _ = set_result(
&locals2.event_loop(py),
future_tx1.bind(py),
result.map(|val| val.into_py(py)),
result.and_then(|val| match val.into_pyobject(py) {
Ok(obj) => Ok(obj.into_any().unbind()),
Err(err) => Err(err.into()),
}),
)
.map_err(dump_err(py));
});
Expand Down Expand Up @@ -686,10 +694,10 @@ impl<T> Cancellable<T> {
}
}

impl<F, T> Future for Cancellable<F>
impl<'py, F, T> Future for Cancellable<F>
where
F: Future<Output = PyResult<T>>,
T: IntoPy<PyObject>,
T: IntoPyObject<'py>,
{
type Output = F::Output;

Expand Down Expand Up @@ -844,7 +852,7 @@ pub fn future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
where
R: Runtime + ContextExt,
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
}
Expand Down Expand Up @@ -986,7 +994,7 @@ pub fn local_future_into_py_with_locals<R, F, T>(
where
R: Runtime + SpawnLocalExt + LocalContextExt,
F: Future<Output = PyResult<T>> + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
let (cancel_tx, cancel_rx) = oneshot::channel();

Expand Down Expand Up @@ -1022,7 +1030,10 @@ where
let _ = set_result(
locals2.event_loop.bind(py),
future_tx1.bind(py),
result.map(|val| val.into_py(py)),
result.and_then(|val| match val.into_pyobject(py) {
Ok(obj) => Ok(obj.into_any().unbind()),
Err(err) => Err(err.into()),
}),
)
.map_err(dump_err(py));
});
Expand Down Expand Up @@ -1183,7 +1194,7 @@ pub fn local_future_into_py<R, F, T>(py: Python, fut: F) -> PyResult<Bound<PyAny
where
R: Runtime + ContextExt + SpawnLocalExt + LocalContextExt,
F: Future<Output = PyResult<T>> + 'static,
T: IntoPy<PyObject>,
T: for<'py> IntoPyObject<'py>,
{
local_future_into_py_with_locals::<R, F, T>(py, get_current_locals::<R>(py)?, fut)
}
Expand Down Expand Up @@ -1467,7 +1478,7 @@ where
{
fn send(&mut self, py: Python, locals: TaskLocals, item: PyObject) -> PyResult<PyObject> {
match self.tx.try_send(item.clone_ref(py)) {
Ok(_) => Ok(true.into_py(py)),
Ok(_) => Ok(true.into_pyobject(py)?.into_any().unbind()),
Err(e) => {
if e.is_full() {
let mut tx = self.tx.clone();
Expand All @@ -1476,19 +1487,25 @@ where
future_into_py_with_locals::<R, _, PyObject>(py, locals, async move {
if tx.flush().await.is_err() {
// receiving side disconnected
return Python::with_gil(|py| Ok(false.into_py(py)));
return Python::with_gil(|py| {
Ok(false.into_pyobject(py)?.into_any().unbind())
});
}
if tx.send(item).await.is_err() {
// receiving side disconnected
return Python::with_gil(|py| Ok(false.into_py(py)));
return Python::with_gil(|py| {
Ok(false.into_pyobject(py)?.into_any().unbind())
});
}
Python::with_gil(|py| Ok(true.into_py(py)))
Python::with_gil(|py| {
Ok(true.into_pyobject(py)?.into_any().unbind())
})
})?
.into(),
)
})
} else {
Ok(false.into_py(py))
Ok(false.into_pyobject(py)?.into_any().unbind())
}
}
}
Expand All @@ -1502,15 +1519,20 @@ where
#[pyclass]
struct SenderGlue {
locals: TaskLocals,
tx: Box<dyn Sender>,
tx: Arc<Mutex<dyn Sender>>,
}
#[pymethods]
impl SenderGlue {
pub fn send(&mut self, item: PyObject) -> PyResult<PyObject> {
Python::with_gil(|py| self.tx.send(py, self.locals.clone_ref(py), item))
Python::with_gil(|py| {
self.tx
.lock()
.unwrap()
.send(py, self.locals.clone_ref(py), item)
})
}
pub fn close(&mut self) -> PyResult<()> {
self.tx.close()
self.tx.lock().unwrap().close()
}
}

Expand Down Expand Up @@ -1648,11 +1670,11 @@ where
let py = gen.py();
let glue = GLUE_MOD
.get_or_try_init(|| -> PyResult<PyObject> {
Ok(PyModule::from_code_bound(
Ok(PyModule::from_code(
py,
STREAM_GLUE,
"pyo3_async_runtimes/pyo3_async_runtimes_glue.py",
"pyo3_async_runtimes_glue",
&CString::new(STREAM_GLUE).unwrap(),
&CString::new("pyo3_async_runtimes/pyo3_async_runtimes_glue.py").unwrap(),
&CString::new("pyo3_async_runtimes_glue").unwrap(),
)?
.into())
})?
Expand All @@ -1670,10 +1692,10 @@ where
gen,
SenderGlue {
locals,
tx: Box::new(GenericSender {
tx: Arc::new(Mutex::new(GenericSender {
runtime: PhantomData::<R>,
tx,
}),
})),
},
),
)?,
Expand Down
Loading
Loading