Skip to content

Commit b4b771b

Browse files
committed
Initial commit
0 parents  commit b4b771b

File tree

10 files changed

+207
-0
lines changed

10 files changed

+207
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/target
2+
**/*.rs.bk
3+
Cargo.lock

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[workspace]
2+
members = [
3+
"async-stream",
4+
"async-stream-impl",
5+
]

async-stream-impl/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "async-stream-impl"
3+
version = "0.1.0"
4+
authors = ["Carl Lerche <[email protected]>"]
5+
edition = "2018"
6+
7+
[lib]
8+
proc-macro = true
9+
10+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
11+
12+
[dependencies]
13+
proc-macro-hack = "0.5.8"
14+
syn = { version = "0.15.43", features = ["visit-mut"]}
15+
quote = "0.6.13"

async-stream-impl/src/lib.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
extern crate proc_macro;
2+
3+
use proc_macro::TokenStream;
4+
use proc_macro_hack::proc_macro_hack;
5+
use quote::quote;
6+
use syn::Token;
7+
use syn::parse::{Parse, ParseStream, Result};
8+
use syn::visit_mut::VisitMut;
9+
10+
struct AsyncStreamImpl {
11+
stmts: Vec<syn::Stmt>,
12+
}
13+
14+
struct Scrub {
15+
yielder: syn::Ident,
16+
unit: Box<syn::Expr>,
17+
}
18+
19+
impl Parse for AsyncStreamImpl {
20+
fn parse(input: ParseStream) -> Result<Self> {
21+
let yielder: syn::Ident = input.parse()?;
22+
input.parse::<Token![,]>()?;
23+
24+
let mut stmts = vec![];
25+
let mut scrub = Scrub {
26+
yielder,
27+
unit: syn::parse_quote!(()),
28+
};
29+
30+
while !input.is_empty() {
31+
let mut stmt = input.parse()?;
32+
scrub.visit_stmt_mut(&mut stmt);
33+
stmts.push(stmt);
34+
}
35+
36+
Ok(AsyncStreamImpl { stmts })
37+
}
38+
}
39+
40+
impl VisitMut for Scrub {
41+
fn visit_expr_mut(&mut self, i: &mut syn::Expr) {
42+
match i {
43+
syn::Expr::Yield(expr) => {
44+
let value_expr = if let Some(ref e) = expr.expr {
45+
e
46+
} else {
47+
&self.unit
48+
};
49+
50+
let ident = &self.yielder;
51+
*i = syn::parse_quote! { #ident.send(#value_expr).await };
52+
}
53+
expr => syn::visit_mut::visit_expr_mut(self, expr),
54+
}
55+
}
56+
}
57+
58+
#[proc_macro_hack]
59+
pub fn async_stream_impl(input: TokenStream) -> TokenStream {
60+
let AsyncStreamImpl {
61+
stmts,
62+
} = syn::parse_macro_input!(input as AsyncStreamImpl);
63+
64+
quote!(#(#stmts)*).into()
65+
// quote!(#input).into()
66+
}

async-stream/Cargo.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[package]
2+
name = "async-stream"
3+
version = "0.1.0"
4+
authors = ["Carl Lerche <[email protected]>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
async-stream-impl = { path = "../async-stream-impl" }
9+
futures-core-preview = "=0.3.0-alpha.17"
10+
proc-macro-hack = "0.5.8"
11+
12+
[dev-dependencies]
13+
tokio = "=0.2.0-alpha.1"
14+
futures-util-preview = "=0.3.0-alpha.17"

async-stream/examples/tcp_accept.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#![feature(async_await)]
2+
3+
use async_stream::stream;
4+
use futures_util::pin_mut;
5+
use futures_util::stream::StreamExt;
6+
use tokio::net::TcpListener;
7+
8+
#[tokio::main]
9+
async fn main() {
10+
let addr = "127.0.0.1:0".parse().unwrap();
11+
let mut listener = TcpListener::bind(&addr).unwrap();
12+
13+
let incoming = stream! {
14+
loop {
15+
let (socket, _) = listener.accept().await.unwrap();
16+
yield socket;
17+
}
18+
};
19+
pin_mut!(incoming);
20+
21+
while let Some(v) = incoming.next().await {
22+
println!("handle = {:?}", v);
23+
}
24+
}

async-stream/src/async_stream.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use crate::yielder::Receiver;
2+
3+
use futures_core::Stream;
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
pub struct AsyncStream<T, U> {
9+
rx: Receiver<T>,
10+
generator: U,
11+
}
12+
13+
impl<T, U> AsyncStream<T, U> {
14+
pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
15+
AsyncStream { rx, generator }
16+
}
17+
}
18+
19+
impl<T, U> Stream for AsyncStream<T, U>
20+
where U: Future<Output = ()>
21+
{
22+
type Item = T;
23+
24+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
25+
unimplemented!();
26+
}
27+
}

async-stream/src/lib.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#![feature(async_await)]
2+
3+
mod async_stream;
4+
#[doc(hidden)]
5+
pub mod yielder;
6+
7+
// Used by the macro, but not intended to be accessed publically.
8+
#[doc(hidden)]
9+
pub use crate::{
10+
async_stream::AsyncStream,
11+
};
12+
13+
use proc_macro_hack::proc_macro_hack;
14+
15+
#[doc(hidden)]
16+
#[proc_macro_hack]
17+
pub use async_stream_impl::async_stream_impl;
18+
19+
/// Asynchronous stream
20+
#[macro_export]
21+
macro_rules! stream {
22+
($($body:tt)*) => {{
23+
let (mut __yield_tx, __yield_rx) = $crate::yielder::pair();
24+
$crate::AsyncStream::new(__yield_rx, async move {
25+
$crate::async_stream_impl!(__yield_tx, $($body)*)
26+
})
27+
}}
28+
}

async-stream/src/yielder.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use std::marker::PhantomData;
2+
3+
pub struct Sender<T> {
4+
_p: PhantomData<T>,
5+
}
6+
7+
pub struct Receiver<T> {
8+
_p: PhantomData<T>,
9+
}
10+
11+
pub fn pair<T>() -> (Sender<T>, Receiver<T>) {
12+
unimplemented!();
13+
}
14+
15+
impl<T> Sender<T> {
16+
pub async fn send(&mut self, value: T) {
17+
unimplemented!();
18+
}
19+
}

async-stream/tests/basic.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
use async_stream::stream;
2+
3+
#[test]
4+
fn smoke() {
5+
stream!();
6+
}

0 commit comments

Comments
 (0)