@@ -22,3 +22,52 @@ impl<S, U> FlattenCompat<S, U> {
22
22
}
23
23
}
24
24
}
25
+
26
+ impl < S , U > Stream for FlattenCompat < S , U >
27
+ where
28
+ S : Stream < Item : IntoStream < IntoStream = U , Item = U :: Item > > + std:: marker:: Unpin ,
29
+ U : Stream + std:: marker:: Unpin ,
30
+ {
31
+ type Item = U :: Item ;
32
+
33
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
34
+ loop {
35
+ if let Some ( ref mut inner) = self . as_mut ( ) . frontiter ( ) {
36
+ if let item @ Some ( _) = futures_core:: ready!( Pin :: new( inner) . poll_next( cx) ) {
37
+ return Poll :: Ready ( item) ;
38
+ }
39
+ }
40
+
41
+ match futures_core:: ready!( Pin :: new( & mut self . stream) . poll_next( cx) ) {
42
+ None => return Poll :: Ready ( None ) ,
43
+ Some ( inner) => * self . as_mut ( ) . frontiter ( ) = Some ( inner. into_stream ( ) ) ,
44
+ }
45
+ }
46
+ }
47
+ }
48
+
49
+ #[ cfg( test) ]
50
+ mod tests {
51
+ use super :: FlattenCompat ;
52
+
53
+ use crate :: prelude:: * ;
54
+ use crate :: task;
55
+
56
+ use std:: collections:: VecDeque ;
57
+
58
+ #[ test]
59
+ fn test_poll_next ( ) -> std:: io:: Result < ( ) > {
60
+ let inner1: VecDeque < u8 > = vec ! [ 1 , 2 , 3 ] . into_iter ( ) . collect ( ) ;
61
+ let inner2: VecDeque < u8 > = vec ! [ 4 , 5 , 6 ] . into_iter ( ) . collect ( ) ;
62
+
63
+ let s: VecDeque < _ > = vec ! [ inner1, inner2] . into_iter ( ) . collect ( ) ;
64
+
65
+ task:: block_on ( async move {
66
+ let flat = FlattenCompat :: new ( s) ;
67
+ let v: Vec < u8 > = flat. collect ( ) . await ;
68
+
69
+ assert_eq ! ( v, vec![ 1 , 2 , 3 , 4 , 5 , 6 ] ) ;
70
+ Ok ( ( ) )
71
+ } )
72
+ }
73
+ }
0 commit comments