Skip to content

Commit 58afb66

Browse files
author
Andrew J Westlake
committed
Added stream conversions under the unstable-streams feature
2 parents 6b08c94 + 2c30686 commit 58afb66

File tree

8 files changed

+490
-11
lines changed

8 files changed

+490
-11
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ jobs:
139139
- name: Build
140140
run: cargo build --features=${{env.features}} --verbose --target ${{ matrix.platform.rust-target }}
141141

142-
# uvloop doesn't compile under Windows or Python 3.11-dev
143-
- if: ${{ matrix.platform.os != 'windows-latest' && matrix.python-version != '3.11-dev' }}
142+
# uvloop doesn't compile under Windows, Python 3.11-dev, and PyPy
143+
- if: ${{ matrix.platform.os != 'windows-latest' && matrix.python-version != '3.11-dev' && !startsWith(matrix.python-version, 'pypy') }}
144144
name: Install pyo3-asyncio test dependencies
145145
run: |
146146
python -m pip install -U uvloop

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ members = ["pyo3-asyncio-macros"]
1818

1919
[features]
2020
async-std-runtime = ["async-std"]
21-
attributes = ["pyo3-asyncio-macros"]
22-
testing = ["clap"]
21+
attributes = ["pyo3-asyncio-macros", "inventory"]
22+
testing = ["clap", "inventory"]
2323
tokio-runtime = ["tokio"]
24-
unstable-streams = []
24+
unstable-streams = ["async-channel"]
2525
default = []
2626

2727
[package.metadata.docs.rs]
@@ -103,14 +103,14 @@ harness = false
103103
required-features = ["tokio-runtime", "testing"]
104104

105105
[dependencies]
106+
async-channel = { version = "1.6", optional = true }
106107
clap = { version = "3.1.5", optional = true }
107108
futures = "0.3"
108-
inventory = "0.2"
109+
inventory = { version = "0.2", optional = true }
109110
once_cell = "1.5"
110111
pin-project-lite = "0.2"
111112
pyo3 = "0.16"
112113
pyo3-asyncio-macros = { path = "pyo3-asyncio-macros", version = "=0.16.0", optional = true }
113-
stability = "0.1"
114114

115115
[dev-dependencies]
116116
pyo3 = { version = "0.16", features = ["macros"] }

pytests/test_async_std_asyncio.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use pyo3::{
1515
};
1616
use pyo3_asyncio::TaskLocals;
1717

18+
#[cfg(feature = "unstable-streams")]
19+
use futures::{StreamExt, TryStreamExt};
20+
1821
#[pyfunction]
1922
fn sleep<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> {
2023
let secs = secs.extract()?;
@@ -169,6 +172,40 @@ async fn test_cancel() -> PyResult<()> {
169172
Ok(())
170173
}
171174

175+
#[cfg(feature = "unstable-streams")]
176+
const ASYNC_STD_TEST_MOD: &str = r#"
177+
import asyncio
178+
179+
async def gen():
180+
for i in range(10):
181+
await asyncio.sleep(0.1)
182+
yield i
183+
"#;
184+
185+
#[cfg(feature = "unstable-streams")]
186+
#[pyo3_asyncio::async_std::test]
187+
async fn test_async_gen_v1() -> PyResult<()> {
188+
let stream = Python::with_gil(|py| {
189+
let test_mod = PyModule::from_code(
190+
py,
191+
ASYNC_STD_TEST_MOD,
192+
"test_rust_coroutine/async_std_test_mod.py",
193+
"async_std_test_mod",
194+
)?;
195+
196+
pyo3_asyncio::async_std::into_stream_v1(test_mod.call_method0("gen")?)
197+
})?;
198+
199+
let vals = stream
200+
.map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
201+
.try_collect::<Vec<i32>>()
202+
.await?;
203+
204+
assert_eq!((0..10).collect::<Vec<i32>>(), vals);
205+
206+
Ok(())
207+
}
208+
172209
#[pyo3_asyncio::async_std::test]
173210
fn test_local_cancel(event_loop: PyObject) -> PyResult<()> {
174211
let locals = Python::with_gil(|py| -> PyResult<TaskLocals> {
@@ -284,6 +321,30 @@ fn cvars_mod(_py: Python, m: &PyModule) -> PyResult<()> {
284321
Ok(())
285322
}
286323

324+
#[cfg(feature = "unstable-streams")]
325+
#[pyo3_asyncio::async_std::test]
326+
async fn test_async_gen_v2() -> PyResult<()> {
327+
let stream = Python::with_gil(|py| {
328+
let test_mod = PyModule::from_code(
329+
py,
330+
ASYNC_STD_TEST_MOD,
331+
"test_rust_coroutine/async_std_test_mod.py",
332+
"async_std_test_mod",
333+
)?;
334+
335+
pyo3_asyncio::async_std::into_stream_v2(test_mod.call_method0("gen")?)
336+
})?;
337+
338+
let vals = stream
339+
.map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
340+
.try_collect::<Vec<i32>>()
341+
.await?;
342+
343+
assert_eq!((0..10).collect::<Vec<i32>>(), vals);
344+
345+
Ok(())
346+
}
347+
287348
const CONTEXTVARS_CODE: &str = r#"
288349
cx = contextvars.ContextVar("cx")
289350

pytests/tokio_asyncio/mod.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ use pyo3::{
1212
};
1313
use pyo3_asyncio::TaskLocals;
1414

15+
#[cfg(feature = "unstable-streams")]
16+
use futures::{StreamExt, TryStreamExt};
17+
1518
use crate::common;
1619

1720
#[pyfunction]
@@ -288,6 +291,64 @@ fn cvars_mod(_py: Python, m: &PyModule) -> PyResult<()> {
288291
Ok(())
289292
}
290293

294+
#[cfg(feature = "unstable-streams")]
295+
const TOKIO_TEST_MOD: &str = r#"
296+
import asyncio
297+
298+
async def gen():
299+
for i in range(10):
300+
await asyncio.sleep(0.1)
301+
yield i
302+
"#;
303+
304+
#[cfg(feature = "unstable-streams")]
305+
#[pyo3_asyncio::tokio::test]
306+
async fn test_async_gen_v1() -> PyResult<()> {
307+
let stream = Python::with_gil(|py| {
308+
let test_mod = PyModule::from_code(
309+
py,
310+
TOKIO_TEST_MOD,
311+
"test_rust_coroutine/tokio_test_mod.py",
312+
"tokio_test_mod",
313+
)?;
314+
315+
pyo3_asyncio::tokio::into_stream_v1(test_mod.call_method0("gen")?)
316+
})?;
317+
318+
let vals = stream
319+
.map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item?.as_ref(py).extract()?) }))
320+
.try_collect::<Vec<i32>>()
321+
.await?;
322+
323+
assert_eq!((0..10).collect::<Vec<i32>>(), vals);
324+
325+
Ok(())
326+
}
327+
328+
#[cfg(feature = "unstable-streams")]
329+
#[pyo3_asyncio::tokio::test]
330+
async fn test_async_gen_v2() -> PyResult<()> {
331+
let stream = Python::with_gil(|py| {
332+
let test_mod = PyModule::from_code(
333+
py,
334+
TOKIO_TEST_MOD,
335+
"test_rust_coroutine/tokio_test_mod.py",
336+
"tokio_test_mod",
337+
)?;
338+
339+
pyo3_asyncio::tokio::into_stream_v2(test_mod.call_method0("gen")?)
340+
})?;
341+
342+
let vals = stream
343+
.map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
344+
.try_collect::<Vec<i32>>()
345+
.await?;
346+
347+
assert_eq!((0..10).collect::<Vec<i32>>(), vals);
348+
349+
Ok(())
350+
}
351+
291352
const CONTEXTVARS_CODE: &str = r#"
292353
cx = contextvars.ContextVar("cx")
293354

src/async_std.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{any::Any, cell::RefCell, future::Future, panic::AssertUnwindSafe, pin::Pin};
22

33
use async_std::task;
4-
use futures::prelude::*;
4+
use futures::FutureExt;
55
use pyo3::prelude::*;
66

77
use crate::{
@@ -488,3 +488,61 @@ where
488488
pub fn into_future(awaitable: &PyAny) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> {
489489
generic::into_future::<AsyncStdRuntime>(awaitable)
490490
}
491+
492+
/// Convert async generator into a stream
493+
///
494+
/// # Availability
495+
///
496+
/// **This API is marked as unstable** and is only available when the
497+
/// `unstable-streams` crate feature is enabled. This comes with no
498+
/// stability guarantees, and could be changed or removed at any time.
499+
#[cfg(feature = "unstable-streams")]
500+
pub fn into_stream_v1<'p>(
501+
gen: &'p PyAny,
502+
) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
503+
generic::into_stream_v1::<AsyncStdRuntime>(gen)
504+
}
505+
506+
/// Convert async generator into a stream
507+
///
508+
/// # Availability
509+
///
510+
/// **This API is marked as unstable** and is only available when the
511+
/// `unstable-streams` crate feature is enabled. This comes with no
512+
/// stability guarantees, and could be changed or removed at any time.
513+
#[cfg(feature = "unstable-streams")]
514+
pub fn into_stream_with_locals_v1<'p>(
515+
locals: TaskLocals,
516+
gen: &'p PyAny,
517+
) -> PyResult<impl futures::Stream<Item = PyResult<PyObject>> + 'static> {
518+
generic::into_stream_with_locals_v1::<AsyncStdRuntime>(locals, gen)
519+
}
520+
521+
/// Convert async generator into a stream
522+
///
523+
/// # Availability
524+
///
525+
/// **This API is marked as unstable** and is only available when the
526+
/// `unstable-streams` crate feature is enabled. This comes with no
527+
/// stability guarantees, and could be changed or removed at any time.
528+
#[cfg(feature = "unstable-streams")]
529+
pub fn into_stream_with_locals_v2<'p>(
530+
locals: TaskLocals,
531+
gen: &'p PyAny,
532+
) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
533+
generic::into_stream_with_locals_v2::<AsyncStdRuntime>(locals, gen)
534+
}
535+
536+
/// Convert async generator into a stream
537+
///
538+
/// # Availability
539+
///
540+
/// **This API is marked as unstable** and is only available when the
541+
/// `unstable-streams` crate feature is enabled. This comes with no
542+
/// stability guarantees, and could be changed or removed at any time.
543+
#[cfg(feature = "unstable-streams")]
544+
pub fn into_stream_v2<'p>(
545+
gen: &'p PyAny,
546+
) -> PyResult<impl futures::Stream<Item = PyObject> + 'static> {
547+
generic::into_stream_v2::<AsyncStdRuntime>(gen)
548+
}

0 commit comments

Comments
 (0)