Skip to content

Commit a6ec7f6

Browse files
committed
support nested streams
1 parent bdda988 commit a6ec7f6

File tree

4 files changed

+218
-79
lines changed

4 files changed

+218
-79
lines changed

async-stream-impl/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ proc-macro = true
1010
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1111

1212
[dependencies]
13-
proc-macro-hack = "0.5.8"
14-
syn = { version = "0.15.43", features = ["visit-mut"]}
15-
quote = "0.6.13"
13+
proc-macro2 = "1"
14+
syn = { version = "1", features = ["extra-traits", "full", "visit-mut"]}
15+
quote = "1"

async-stream-impl/src/lib.rs

Lines changed: 93 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,54 @@
11
extern crate proc_macro;
22

3-
use proc_macro::TokenStream;
4-
use proc_macro_hack::proc_macro_hack;
3+
use proc_macro::{TokenStream, TokenTree};
4+
use proc_macro2::Span;
55
use quote::quote;
6-
use syn::Token;
7-
use syn::parse::{Parse, ParseStream, Result};
86
use syn::visit_mut::VisitMut;
97

10-
struct AsyncStreamImpl {
11-
yielder: syn::Ident,
12-
stmts: Vec<syn::Stmt>,
13-
}
14-
158
struct Scrub {
169
is_xforming: bool,
1710
is_try: bool,
18-
yielder: syn::Ident,
1911
unit: Box<syn::Expr>,
2012
num_yield: u32,
2113
}
2214

23-
impl Parse for AsyncStreamImpl {
24-
fn parse(input: ParseStream) -> Result<Self> {
25-
let yielder: syn::Ident = input.parse()?;
26-
input.parse::<Token![,]>()?;
27-
28-
let mut stmts = vec![];
15+
#[derive(Debug)]
16+
struct AsyncStreamEnumHack {
17+
macro_ident: syn::Ident,
18+
stmts: Vec<syn::Stmt>,
19+
}
2920

30-
while !input.is_empty() {
31-
stmts.push(input.parse()?);
21+
impl AsyncStreamEnumHack {
22+
fn parse(input: TokenStream) -> Self {
23+
macro_rules! n {
24+
($i:ident) => { $i.next().unwrap() };
3225
}
3326

34-
Ok(AsyncStreamImpl {
35-
yielder,
36-
stmts,
37-
})
27+
let mut input = input.into_iter();
28+
n!(input); // enum
29+
n!(input); // ident
30+
31+
let mut braces = match n!(input) {
32+
TokenTree::Group(group) => group.stream().into_iter(),
33+
_ => unreachable!(),
34+
};
35+
36+
n!(braces); // Dummy
37+
n!(braces); // =
38+
n!(braces); // $crate
39+
n!(braces); // :
40+
n!(braces); // :
41+
n!(braces); // scrub
42+
n!(braces); // !
43+
44+
let inner = n!(braces);
45+
let syn::Block { stmts, .. } = syn::parse(inner.clone().into()).unwrap();
46+
47+
let macro_ident = syn::Ident::new(
48+
&format!("stream_{}", count_bangs(inner.into())),
49+
Span::call_site());
50+
51+
AsyncStreamEnumHack { stmts, macro_ident }
3852
}
3953
}
4054

@@ -55,23 +69,23 @@ impl VisitMut for Scrub {
5569
&self.unit
5670
};
5771

58-
let ident = &self.yielder;
72+
// let ident = &self.yielder;
5973

6074
*i = if self.is_try {
61-
syn::parse_quote! { #ident.send(Ok(#value_expr)).await }
75+
syn::parse_quote! { __yield_tx.send(Ok(#value_expr)).await }
6276
} else {
63-
syn::parse_quote! { #ident.send(#value_expr).await }
77+
syn::parse_quote! { __yield_tx.send(#value_expr).await }
6478
};
6579
}
6680
syn::Expr::Try(try_expr) => {
67-
let ident = &self.yielder;
81+
// let ident = &self.yielder;
6882
let e = &try_expr.expr;
6983

7084
*i = syn::parse_quote! {
7185
match #e {
7286
Ok(v) => v,
7387
Err(e) => {
74-
#ident.send(Err(e)).await;
88+
__yield_tx.send(Err(e)).await;
7589
return;
7690
}
7791
}
@@ -96,74 +110,92 @@ impl VisitMut for Scrub {
96110
}
97111
}
98112

99-
#[proc_macro_hack]
113+
#[proc_macro_derive(AsyncStreamHack)]
100114
pub fn async_stream_impl(input: TokenStream) -> TokenStream {
101-
let AsyncStreamImpl {
102-
yielder,
103-
mut stmts,
104-
} = syn::parse_macro_input!(input as AsyncStreamImpl);
115+
let AsyncStreamEnumHack { macro_ident, mut stmts } =
116+
AsyncStreamEnumHack::parse(input);
105117

106118
let mut scrub = Scrub {
107119
is_xforming: true,
108120
is_try: false,
109-
yielder,
110121
unit: syn::parse_quote!(()),
111122
num_yield: 0,
112123
};
113124

114-
for mut stmt in &mut stmts {
125+
for mut stmt in &mut stmts[..] {
115126
scrub.visit_stmt_mut(&mut stmt);
116127
}
117128

118129
if scrub.num_yield == 0 {
119-
let yielder = &scrub.yielder;
120-
121-
quote!({
122-
if false {
123-
#yielder.send(()).await;
124-
}
125-
126-
#(#stmts)*
130+
quote!(macro_rules! #macro_ident {
131+
() => {{
132+
if false {
133+
__yield_tx.send(()).await;
134+
}
135+
136+
#(#stmts)*
137+
}};
127138
}).into()
128139
} else {
129-
quote!({
130-
#(#stmts)*
140+
quote!(macro_rules! #macro_ident {
141+
() => {{
142+
#(#stmts)*
143+
}};
131144
}).into()
132145
}
133146
}
134147

135-
#[proc_macro_hack]
148+
#[proc_macro_derive(AsyncTryStreamHack)]
136149
pub fn async_try_stream_impl(input: TokenStream) -> TokenStream {
137-
let AsyncStreamImpl {
138-
yielder,
139-
mut stmts,
140-
} = syn::parse_macro_input!(input as AsyncStreamImpl);
150+
let AsyncStreamEnumHack { macro_ident, mut stmts } =
151+
AsyncStreamEnumHack::parse(input);
141152

142153
let mut scrub = Scrub {
143154
is_xforming: true,
144155
is_try: true,
145-
yielder,
146156
unit: syn::parse_quote!(()),
147157
num_yield: 0,
148158
};
149159

150-
for mut stmt in &mut stmts {
160+
for mut stmt in &mut stmts[..] {
151161
scrub.visit_stmt_mut(&mut stmt);
152162
}
153163

154164
if scrub.num_yield == 0 {
155-
let yielder = &scrub.yielder;
156-
157-
quote!({
158-
if false {
159-
#yielder.send(()).await;
160-
}
161-
162-
#(#stmts)*
165+
quote!(macro_rules! #macro_ident {
166+
() => {{
167+
if false {
168+
__yield_tx.send(()).await;
169+
}
170+
171+
#(#stmts)*
172+
}};
163173
}).into()
164174
} else {
165-
quote!({
166-
#(#stmts)*
175+
quote!(macro_rules! #macro_ident {
176+
() => {{
177+
#(#stmts)*
178+
}};
167179
}).into()
168180
}
169181
}
182+
183+
fn count_bangs(input: TokenStream) -> usize {
184+
let mut count = 0;
185+
186+
for token in input {
187+
match token {
188+
TokenTree::Punct(punct) => {
189+
if punct.as_char() == '!' {
190+
count += 1;
191+
}
192+
}
193+
TokenTree::Group(group) => {
194+
count += count_bangs(group.stream());
195+
}
196+
_ => {}
197+
}
198+
}
199+
200+
count
201+
}

async-stream/src/lib.rs

Lines changed: 116 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,21 @@ pub use crate::{
1010
async_stream::AsyncStream,
1111
};
1212

13-
use proc_macro_hack::proc_macro_hack;
14-
15-
#[doc(hidden)]
16-
#[proc_macro_hack]
17-
pub use async_stream_impl::async_stream_impl;
18-
1913
#[doc(hidden)]
20-
#[proc_macro_hack]
21-
pub use async_stream_impl::async_try_stream_impl;
14+
pub use async_stream_impl::{AsyncStreamHack, AsyncTryStreamHack};
2215

2316
/// Asynchronous stream
2417
#[macro_export]
2518
macro_rules! stream {
2619
($($body:tt)*) => {{
2720
let (mut __yield_tx, __yield_rx) = $crate::yielder::pair();
2821
$crate::AsyncStream::new(__yield_rx, async move {
29-
$crate::async_stream_impl!(__yield_tx, $($body)*)
22+
#[derive($crate::AsyncStreamHack)]
23+
enum Dummy {
24+
Value = $crate::scrub! { $($body)* }
25+
}
26+
27+
$crate::dispatch!(($($body)*))
3028
})
3129
}}
3230
}
@@ -37,7 +35,115 @@ macro_rules! try_stream {
3735
($($body:tt)*) => {{
3836
let (mut __yield_tx, __yield_rx) = $crate::yielder::pair();
3937
$crate::AsyncStream::new(__yield_rx, async move {
40-
$crate::async_try_stream_impl!(__yield_tx, $($body)*)
38+
#[derive($crate::AsyncTryStreamHack)]
39+
enum Dummy {
40+
Value = $crate::scrub! { $($body)* }
41+
}
42+
43+
$crate::dispatch!(($($body)*))
4144
})
4245
}}
4346
}
47+
48+
#[macro_export]
49+
macro_rules! scrub {
50+
($($body:tt)*) => {{
51+
0
52+
}};
53+
}
54+
55+
#[doc(hidden)]
56+
#[macro_export]
57+
macro_rules! dispatch {
58+
(() $($bang:tt)*) => {
59+
$crate::count!($($bang)*)
60+
};
61+
((($($first:tt)*) $($rest:tt)*) $($bang:tt)*) => {
62+
$crate::dispatch!(($($first)* $($rest)*) $($bang)*)
63+
};
64+
(([$($first:tt)*] $($rest:tt)*) $($bang:tt)*) => {
65+
$crate::dispatch!(($($first)* $($rest)*) $($bang)*)
66+
};
67+
(({$($first:tt)*} $($rest:tt)*) $($bang:tt)*) => {
68+
$crate::dispatch!(($($first)* $($rest)*) $($bang)*)
69+
};
70+
((! $($rest:tt)*) $($bang:tt)*) => {
71+
$crate::dispatch!(($($rest)*) $($bang)* !)
72+
};
73+
((!= $($rest:tt)*) $($bang:tt)*) => {
74+
$crate::dispatch!(($($rest)*) $($bang)* !)
75+
};
76+
(($first:tt $($rest:tt)*) $($bang:tt)*) => {
77+
$crate::dispatch!(($($rest)*) $($bang)*)
78+
};
79+
}
80+
81+
#[doc(hidden)]
82+
#[macro_export]
83+
macro_rules! count {
84+
() => { stream_0!() };
85+
(!) => { stream_1!() };
86+
(!!) => { stream_2!() };
87+
(!!!) => { stream_3!() };
88+
(!!!!) => { stream_4!() };
89+
(!!!!!) => { stream_5!() };
90+
(!!!!!!) => { stream_6!() };
91+
(!!!!!!!) => { stream_7!() };
92+
(!!!!!!!!) => { stream_8!() };
93+
(!!!!!!!!!) => { stream_9!() };
94+
(!!!!!!!!!!) => { stream_10!() };
95+
(!!!!!!!!!!!) => { stream_11!() };
96+
(!!!!!!!!!!!!) => { stream_12!() };
97+
(!!!!!!!!!!!!!) => { stream_13!() };
98+
(!!!!!!!!!!!!!!) => { stream_14!() };
99+
(!!!!!!!!!!!!!!!) => { stream_15!() };
100+
(!!!!!!!!!!!!!!!!) => { stream_16!() };
101+
(!!!!!!!!!!!!!!!!!) => { stream_17!() };
102+
(!!!!!!!!!!!!!!!!!!) => { stream_18!() };
103+
(!!!!!!!!!!!!!!!!!!!) => { stream_19!() };
104+
(!!!!!!!!!!!!!!!!!!!!) => { stream_20!() };
105+
(!!!!!!!!!!!!!!!!!!!!!) => { stream_21!() };
106+
(!!!!!!!!!!!!!!!!!!!!!!) => { stream_22!() };
107+
(!!!!!!!!!!!!!!!!!!!!!!!) => { stream_23!() };
108+
(!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_24!() };
109+
(!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_25!() };
110+
(!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_26!() };
111+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_27!() };
112+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_28!() };
113+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_29!() };
114+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_30!() };
115+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_31!() };
116+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_32!() };
117+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_33!() };
118+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_34!() };
119+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_35!() };
120+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_36!() };
121+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_37!() };
122+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_38!() };
123+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_39!() };
124+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_40!() };
125+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_41!() };
126+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_42!() };
127+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_43!() };
128+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_44!() };
129+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_45!() };
130+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_46!() };
131+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_47!() };
132+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_48!() };
133+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_49!() };
134+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_50!() };
135+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_51!() };
136+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_52!() };
137+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_53!() };
138+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_54!() };
139+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_55!() };
140+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_56!() };
141+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_57!() };
142+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_58!() };
143+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_59!() };
144+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_60!() };
145+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_61!() };
146+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_62!() };
147+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_63!() };
148+
(!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!) => { stream_64!() };
149+
}

0 commit comments

Comments
 (0)