Skip to content

Commit b31b917

Browse files
committed
spike implementation
1 parent b4b771b commit b31b917

File tree

2 files changed

+98
-6
lines changed

2 files changed

+98
-6
lines changed

async-stream/src/async_stream.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ use std::task::{Context, Poll};
77

88
pub struct AsyncStream<T, U> {
99
rx: Receiver<T>,
10+
done: bool,
1011
generator: U,
1112
}
1213

1314
impl<T, U> AsyncStream<T, U> {
1415
pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
15-
AsyncStream { rx, generator }
16+
AsyncStream { rx, done: false, generator }
1617
}
1718
}
1819

@@ -22,6 +23,30 @@ where U: Future<Output = ()>
2223
type Item = T;
2324

2425
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
25-
unimplemented!();
26+
unsafe {
27+
let me = Pin::get_unchecked_mut(self);
28+
29+
if me.done {
30+
panic!("poll after done");
31+
}
32+
33+
let mut dst = None;
34+
let res = {
35+
let _enter = me.rx.enter(&mut dst);
36+
Pin::new_unchecked(&mut me.generator).poll(cx)
37+
};
38+
39+
me.done = res.is_ready();
40+
41+
if dst.is_some() {
42+
return Poll::Ready(dst.take());
43+
}
44+
45+
if me.done {
46+
Poll::Ready(None)
47+
} else {
48+
Poll::Pending
49+
}
50+
}
2651
}
2752
}

async-stream/src/yielder.rs

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1+
use std::cell::Cell;
2+
use std::future::Future;
13
use std::marker::PhantomData;
4+
use std::pin::Pin;
5+
use std::ptr;
6+
use std::task::{Context, Poll};
27

38
pub struct Sender<T> {
49
_p: PhantomData<T>,
@@ -8,12 +13,74 @@ pub struct Receiver<T> {
813
_p: PhantomData<T>,
914
}
1015

16+
pub(crate) struct Enter<'a, T> {
17+
_rx: &'a mut Receiver<T>,
18+
}
19+
1120
pub fn pair<T>() -> (Sender<T>, Receiver<T>) {
12-
unimplemented!();
21+
let tx = Sender { _p: PhantomData };
22+
let rx = Receiver { _p: PhantomData };
23+
(tx, rx)
24+
}
25+
26+
// Tracks the pointer to `Option<T>`.
27+
//
28+
// TODO: Ensure wakers match?
29+
thread_local!(static STORE: Cell<*mut ()> = Cell::new(ptr::null_mut()));
30+
31+
// ===== impl Sender =====
32+
33+
impl<T: Unpin + 'static> Sender<T> {
34+
pub fn send(&mut self, value: T) -> impl Future<Output = ()> {
35+
Send { value: Some(value) }
36+
}
37+
}
38+
39+
struct Send<T> {
40+
value: Option<T>,
1341
}
1442

15-
impl<T> Sender<T> {
16-
pub async fn send(&mut self, value: T) {
17-
unimplemented!();
43+
impl<T: Unpin + 'static> Future for Send<T> {
44+
type Output = ();
45+
46+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
47+
if self.value.is_none() {
48+
return Poll::Ready(());
49+
}
50+
51+
STORE.with(|cell| {
52+
unsafe {
53+
let ptr = cell.get() as *mut Option<T>;
54+
let option_ref = ptr.as_mut().expect("invalid usage");
55+
56+
if option_ref.is_none() {
57+
*option_ref = self.value.take();
58+
}
59+
60+
Poll::Pending
61+
}
62+
})
63+
}
64+
}
65+
66+
// ===== impl Receiver =====
67+
68+
impl<T> Receiver<T> {
69+
pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> {
70+
STORE.with(|cell| {
71+
assert!(cell.get().is_null());
72+
73+
cell.set(dst as *mut _ as *mut ());
74+
});
75+
76+
Enter { _rx: self }
77+
}
78+
}
79+
80+
// ===== impl Enter =====
81+
82+
impl<'a, T> Drop for Enter<'a, T> {
83+
fn drop(&mut self) {
84+
STORE.with(|cell| cell.set(ptr::null_mut()))
1885
}
1986
}

0 commit comments

Comments
 (0)