Skip to content

Commit 0a5ff9e

Browse files
committed
initial try_stream impl
1 parent 7f38236 commit 0a5ff9e

File tree

4 files changed

+125
-18
lines changed

4 files changed

+125
-18
lines changed

async-stream-impl/src/lib.rs

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use syn::visit_mut::VisitMut;
1010
struct AsyncStreamImpl {
1111
yielder: syn::Ident,
1212
stmts: Vec<syn::Stmt>,
13-
num_yield: u32,
1413
}
1514

1615
struct Scrub {
16+
is_try: bool,
1717
yielder: syn::Ident,
1818
unit: Box<syn::Expr>,
1919
num_yield: u32,
@@ -25,42 +25,51 @@ impl Parse for AsyncStreamImpl {
2525
input.parse::<Token![,]>()?;
2626

2727
let mut stmts = vec![];
28-
let mut scrub = Scrub {
29-
yielder,
30-
unit: syn::parse_quote!(()),
31-
num_yield: 0,
32-
};
3328

3429
while !input.is_empty() {
35-
let mut stmt = input.parse()?;
36-
scrub.visit_stmt_mut(&mut stmt);
37-
stmts.push(stmt);
30+
stmts.push(input.parse()?);
3831
}
3932

40-
let Scrub { yielder, num_yield, .. } = scrub;
41-
4233
Ok(AsyncStreamImpl {
4334
yielder,
4435
stmts,
45-
num_yield,
4636
})
4737
}
4838
}
4939

5040
impl VisitMut for Scrub {
5141
fn visit_expr_mut(&mut self, i: &mut syn::Expr) {
5242
match i {
53-
syn::Expr::Yield(expr) => {
43+
syn::Expr::Yield(yield_expr) => {
5444
self.num_yield += 1;
5545

56-
let value_expr = if let Some(ref e) = expr.expr {
46+
let value_expr = if let Some(ref e) = yield_expr.expr {
5747
e
5848
} else {
5949
&self.unit
6050
};
6151

6252
let ident = &self.yielder;
63-
*i = syn::parse_quote! { #ident.send(#value_expr).await };
53+
54+
*i = if self.is_try {
55+
syn::parse_quote! { #ident.send(Ok(#value_expr)).await }
56+
} else {
57+
syn::parse_quote! { #ident.send(#value_expr).await }
58+
};
59+
}
60+
syn::Expr::Try(try_expr) => {
61+
let ident = &self.yielder;
62+
let e = &try_expr.expr;
63+
64+
*i = syn::parse_quote! {
65+
match #e {
66+
Ok(v) => v,
67+
Err(e) => {
68+
#ident.send(Err(e)).await;
69+
return;
70+
}
71+
}
72+
};
6473
}
6574
expr => syn::visit_mut::visit_expr_mut(self, expr),
6675
}
@@ -71,11 +80,58 @@ impl VisitMut for Scrub {
7180
pub fn async_stream_impl(input: TokenStream) -> TokenStream {
7281
let AsyncStreamImpl {
7382
yielder,
74-
stmts,
75-
num_yield,
83+
mut stmts,
7684
} = syn::parse_macro_input!(input as AsyncStreamImpl);
7785

78-
if num_yield == 0 {
86+
let mut scrub = Scrub {
87+
is_try: false,
88+
yielder,
89+
unit: syn::parse_quote!(()),
90+
num_yield: 0,
91+
};
92+
93+
for mut stmt in &mut stmts {
94+
scrub.visit_stmt_mut(&mut stmt);
95+
}
96+
97+
if scrub.num_yield == 0 {
98+
let yielder = &scrub.yielder;
99+
100+
quote!({
101+
if false {
102+
#yielder.send(()).await;
103+
}
104+
105+
#(#stmts)*
106+
}).into()
107+
} else {
108+
quote!({
109+
#(#stmts)*
110+
}).into()
111+
}
112+
}
113+
114+
#[proc_macro_hack]
115+
pub fn async_try_stream_impl(input: TokenStream) -> TokenStream {
116+
let AsyncStreamImpl {
117+
yielder,
118+
mut stmts,
119+
} = syn::parse_macro_input!(input as AsyncStreamImpl);
120+
121+
let mut scrub = Scrub {
122+
is_try: true,
123+
yielder,
124+
unit: syn::parse_quote!(()),
125+
num_yield: 0,
126+
};
127+
128+
for mut stmt in &mut stmts {
129+
scrub.visit_stmt_mut(&mut stmt);
130+
}
131+
132+
if scrub.num_yield == 0 {
133+
let yielder = &scrub.yielder;
134+
79135
quote!({
80136
if false {
81137
#yielder.send(()).await;

async-stream/src/lib.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ use proc_macro_hack::proc_macro_hack;
1616
#[proc_macro_hack]
1717
pub use async_stream_impl::async_stream_impl;
1818

19+
#[doc(hidden)]
20+
#[proc_macro_hack]
21+
pub use async_stream_impl::async_try_stream_impl;
22+
1923
/// Asynchronous stream
2024
#[macro_export]
2125
macro_rules! stream {
@@ -26,3 +30,14 @@ macro_rules! stream {
2630
})
2731
}}
2832
}
33+
34+
/// Asynchronous fallible stream
35+
#[macro_export]
36+
macro_rules! try_stream {
37+
($($body:tt)*) => {{
38+
let (mut __yield_tx, __yield_rx) = $crate::yielder::pair();
39+
$crate::AsyncStream::new(__yield_rx, async move {
40+
$crate::async_try_stream_impl!(__yield_tx, $($body)*)
41+
})
42+
}}
43+
}
File renamed without changes.

async-stream/tests/try_stream.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#![feature(async_await)]
2+
3+
use async_stream::try_stream;
4+
5+
use tokio::prelude::*;
6+
7+
#[tokio::test]
8+
async fn single_err() {
9+
let s = try_stream! {
10+
if true {
11+
Err("hello")?;
12+
} else {
13+
yield "world";
14+
}
15+
16+
unreachable!();
17+
};
18+
19+
let values: Vec<_> = s.collect().await;
20+
assert_eq!(1, values.len());
21+
assert_eq!(Err("hello"), values[0]);
22+
}
23+
24+
#[tokio::test]
25+
async fn yield_then_err() {
26+
let s = try_stream! {
27+
yield "hello";
28+
Err("world")?;
29+
unreachable!();
30+
};
31+
32+
let values: Vec<_> = s.collect().await;
33+
assert_eq!(2, values.len());
34+
assert_eq!(Ok("hello"), values[0]);
35+
assert_eq!(Err("world"), values[1]);
36+
}

0 commit comments

Comments
 (0)