Skip to content

Commit 02b261d

Browse files
committed
It compiles! Store the future and poll it instead of creating multiple new ones
1 parent fe3c9ef commit 02b261d

File tree

4 files changed

+104
-60
lines changed

4 files changed

+104
-60
lines changed

src/stream/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@
303303
pub use empty::{empty, Empty};
304304
pub use from_fn::{from_fn, FromFn};
305305
pub use from_iter::{from_iter, FromIter};
306+
pub use successor::{successor, Successor};
306307
pub use once::{once, Once};
307308
pub use repeat::{repeat, Repeat};
308309
pub use repeat_with::{repeat_with, RepeatWith};
@@ -316,6 +317,7 @@ mod from_iter;
316317
mod once;
317318
mod repeat;
318319
mod repeat_with;
320+
mod successor;
319321

320322
cfg_unstable! {
321323
mod double_ended_stream;

src/stream/stream/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ mod partial_cmp;
5757
mod position;
5858
mod scan;
5959
mod skip;
60-
mod successor;
6160
mod skip_while;
6261
mod step_by;
6362
mod take;

src/stream/stream/successor.rs

Lines changed: 0 additions & 59 deletions
This file was deleted.

src/stream/successor.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use std::pin::Pin;
2+
use std::marker::PhantomData;
3+
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
/// A stream that yields elements by calling an async closure with the previous value as an
9+
/// argument
10+
///
11+
/// This stream is constructed by [`successor`] function
12+
///
13+
/// [`successor`]: fn.successor.html
14+
#[derive(Debug)]
15+
pub struct Successor<F, Fut, T>
16+
where Fut: Future<Output=T>
17+
{
18+
successor: F,
19+
future: Option<Fut>,
20+
next: T,
21+
_marker: PhantomData<Fut>
22+
}
23+
24+
/// Creates a new stream where to produce each new element a clousre is called with the previous
25+
/// value.
26+
///
27+
/// #Examples
28+
///
29+
/// ```
30+
/// # fn main() { async_std::task::block_on(async {
31+
/// #
32+
/// use async_std::prelude::*;
33+
/// use async_std::stream;
34+
///
35+
/// let s = stream::successor(22, |val| {
36+
/// async move {
37+
/// val + 1
38+
/// }
39+
/// });
40+
///
41+
/// pin_utils::pin_mut!(s);
42+
/// assert_eq!(s.next().await, Some(1));
43+
/// assert_eq!(s.next().await, Some(2));
44+
/// assert_eq!(s.next().await, Some(3));
45+
/// #
46+
/// # }) }
47+
///
48+
/// ```
49+
pub fn successor<F, Fut, T>(start: T, func: F) -> Successor<F, Fut, T>
50+
where
51+
F: FnMut(T) -> Fut,
52+
Fut: Future<Output = T>,
53+
T: Copy,
54+
{
55+
Successor {
56+
successor: func,
57+
future: None,
58+
next: start,
59+
_marker: PhantomData,
60+
}
61+
}
62+
63+
impl <F, Fut, T> Successor<F, Fut, T>
64+
where
65+
F: FnMut(T) -> Fut,
66+
Fut: Future<Output = T>,
67+
T: Copy,
68+
69+
{
70+
pin_utils::unsafe_unpinned!(successor: F);
71+
pin_utils::unsafe_unpinned!(next: T);
72+
pin_utils::unsafe_pinned!(future: Option<Fut>);
73+
74+
}
75+
76+
impl <F, Fut, T> Stream for Successor<F, Fut, T>
77+
where
78+
Fut: Future<Output = T>,
79+
F: FnMut(T) -> Fut,
80+
T: Copy,
81+
{
82+
type Item = T;
83+
84+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85+
match &self.future {
86+
Some(_) => {
87+
let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
88+
self.as_mut().future().set(None);
89+
90+
Poll::Ready(Some(next))
91+
},
92+
None => {
93+
let x = self.next;
94+
let fut = (self.as_mut().successor())(x);
95+
self.as_mut().future().set(Some(fut));
96+
// Probably can poll the value here?
97+
Poll::Pending
98+
}
99+
}
100+
}
101+
}
102+

0 commit comments

Comments
 (0)