1
1
use std:: marker:: PhantomData ;
2
2
use std:: pin:: Pin ;
3
+ use std:: mem;
3
4
4
5
use crate :: future:: Future ;
5
6
use crate :: stream:: Stream ;
6
- use crate :: task:: { Context , Poll } ;
7
+ use crate :: task:: { Context , Poll , ready } ;
7
8
8
- /// A stream that yields elements by calling an async closure with the previous value as an
9
- /// argument
10
- ///
11
- /// This stream is constructed by [`successor`] function
12
- ///
13
- /// [`successor`]: fn.successor.html
14
- #[ derive( Debug ) ]
15
- pub struct Successors < F , Fut , T >
16
- where
17
- Fut : Future < Output = Option < T > > ,
18
- {
19
- successor : F ,
20
- future : Option < Fut > ,
21
- next : Option < T > ,
22
- _marker : PhantomData < Fut > ,
9
+
10
+
11
+ pin_project_lite:: pin_project! {
12
+ /// A stream that yields elements by calling an async closure with the previous value as an
13
+ /// argument
14
+ ///
15
+ /// This stream is constructed by [`successor`] function
16
+ ///
17
+ /// [`successor`]: fn.successor.html
18
+ #[ derive( Debug ) ]
19
+ pub struct Successors <F , Fut , T >
20
+ where
21
+ Fut : Future <Output = Option <T >>,
22
+ {
23
+ successor: F ,
24
+ #[ pin]
25
+ future: Option <Fut >,
26
+ slot: Option <T >,
27
+ _marker: PhantomData <Fut >,
28
+ }
23
29
}
24
30
25
31
/// Creates a new stream where to produce each new element a closure is called with the previous
40
46
/// });
41
47
///
42
48
/// pin_utils::pin_mut!(s);
49
+ /// assert_eq!(s.next().await, Some(22));
43
50
/// assert_eq!(s.next().await, Some(23));
44
51
/// assert_eq!(s.next().await, Some(24));
45
52
/// assert_eq!(s.next().await, Some(25));
@@ -58,31 +65,20 @@ where
58
65
/// # }) }
59
66
///
60
67
/// ```
61
- pub fn successors < F , Fut , T > ( start : Option < T > , func : F ) -> Successors < F , Fut , T >
68
+ pub fn successors < F , Fut , T > ( first : Option < T > , succ : F ) -> Successors < F , Fut , T >
62
69
where
63
70
F : FnMut ( T ) -> Fut ,
64
71
Fut : Future < Output = Option < T > > ,
65
72
T : Copy ,
66
73
{
67
74
Successors {
68
- successor : func ,
75
+ successor : succ ,
69
76
future : None ,
70
- next : start ,
77
+ slot : first ,
71
78
_marker : PhantomData ,
72
79
}
73
80
}
74
81
75
- impl < F , Fut , T > Successors < F , Fut , T >
76
- where
77
- F : FnMut ( T ) -> Fut ,
78
- Fut : Future < Output = Option < T > > ,
79
- T : Copy ,
80
- {
81
- pin_utils:: unsafe_unpinned!( successor: F ) ;
82
- pin_utils:: unsafe_unpinned!( next: Option <T >) ;
83
- pin_utils:: unsafe_pinned!( future: Option <Fut >) ;
84
- }
85
-
86
82
impl < F , Fut , T > Stream for Successors < F , Fut , T >
87
83
where
88
84
Fut : Future < Output = Option < T > > ,
@@ -91,23 +87,23 @@ where
91
87
{
92
88
type Item = T ;
93
89
94
- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
95
- if self . next . is_none ( ) {
90
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
91
+ let mut this = self . project ( ) ;
92
+
93
+ if this. slot . is_none ( ) {
96
94
return Poll :: Ready ( None ) ;
97
95
}
98
96
99
- match & self . future {
100
- None => {
101
- let x = self . next . unwrap ( ) ;
102
- let fut = ( self . as_mut ( ) . successor ( ) ) ( x) ;
103
- self . as_mut ( ) . future ( ) . set ( Some ( fut) ) ;
104
- }
105
- _ => { }
97
+ if this. future . is_none ( ) {
98
+ let x = this. slot . unwrap ( ) ;
99
+ let fut = ( this. successor ) ( x) ;
100
+ this. future . set ( Some ( fut) ) ;
106
101
}
107
102
108
- let next = futures_core:: ready!( self . as_mut( ) . future( ) . as_pin_mut( ) . unwrap( ) . poll( cx) ) ;
109
- * self . as_mut ( ) . next ( ) = next;
110
- self . as_mut ( ) . future ( ) . set ( None ) ;
103
+ let mut next = ready ! ( this. future. as_mut( ) . as_pin_mut( ) . unwrap( ) . poll( cx) ) ;
104
+
105
+ this. future . set ( None ) ;
106
+ mem:: swap ( this. slot , & mut next) ;
111
107
Poll :: Ready ( next)
112
108
}
113
109
}
0 commit comments