1
1
use crate :: stream:: Fuse ;
2
2
use alloc:: vec:: Vec ;
3
- use core:: mem;
4
3
use core:: pin:: Pin ;
5
4
use futures_core:: stream:: { FusedStream , Stream } ;
6
5
use futures_core:: task:: { Context , Poll } ;
@@ -15,7 +14,6 @@ pin_project! {
15
14
pub struct ReadyChunks <St : Stream > {
16
15
#[ pin]
17
16
stream: Fuse <St >,
18
- items: Vec <St :: Item >,
19
17
cap: usize , // https://github.com/rust-lang/futures-rs/issues/1475
20
18
}
21
19
}
@@ -24,11 +22,7 @@ impl<St: Stream> ReadyChunks<St> {
24
22
pub ( super ) fn new ( stream : St , capacity : usize ) -> Self {
25
23
assert ! ( capacity > 0 ) ;
26
24
27
- Self {
28
- stream : super :: Fuse :: new ( stream) ,
29
- items : Vec :: with_capacity ( capacity) ,
30
- cap : capacity,
31
- }
25
+ Self { stream : super :: Fuse :: new ( stream) , cap : capacity }
32
26
}
33
27
34
28
delegate_access_inner ! ( stream, St , ( . ) ) ;
@@ -40,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> {
40
34
fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
41
35
let mut this = self . project ( ) ;
42
36
37
+ let mut items: Vec < St :: Item > = Vec :: new ( ) ;
38
+
43
39
loop {
44
40
match this. stream . as_mut ( ) . poll_next ( cx) {
45
41
// Flush all collected data if underlying stream doesn't contain
46
42
// more ready values
47
43
Poll :: Pending => {
48
- return if this. items . is_empty ( ) {
49
- Poll :: Pending
50
- } else {
51
- Poll :: Ready ( Some ( mem:: replace ( this. items , Vec :: with_capacity ( * this. cap ) ) ) )
52
- }
44
+ return if items. is_empty ( ) { Poll :: Pending } else { Poll :: Ready ( Some ( items) ) }
53
45
}
54
46
55
47
// Push the ready item into the buffer and check whether it is full.
56
48
// If so, replace our buffer with a new and empty one and return
57
49
// the full one.
58
50
Poll :: Ready ( Some ( item) ) => {
59
- this . items . push ( item ) ;
60
- if this . items . len ( ) >= * this. cap {
61
- return Poll :: Ready ( Some ( mem :: replace (
62
- this . items ,
63
- Vec :: with_capacity ( * this. cap ) ,
64
- ) ) ) ;
51
+ if items. is_empty ( ) {
52
+ items. reserve ( * this. cap ) ;
53
+ }
54
+ items. push ( item ) ;
55
+ if items . len ( ) >= * this. cap {
56
+ return Poll :: Ready ( Some ( items ) ) ;
65
57
}
66
58
}
67
59
68
60
// Since the underlying stream ran out of values, return what we
69
61
// have buffered, if we have anything.
70
62
Poll :: Ready ( None ) => {
71
- let last = if this. items . is_empty ( ) {
72
- None
73
- } else {
74
- let full_buf = mem:: take ( this. items ) ;
75
- Some ( full_buf)
76
- } ;
63
+ let last = if items. is_empty ( ) { None } else { Some ( items) } ;
77
64
78
65
return Poll :: Ready ( last) ;
79
66
}
@@ -82,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> {
82
69
}
83
70
84
71
fn size_hint ( & self ) -> ( usize , Option < usize > ) {
85
- let chunk_len = usize:: from ( !self . items . is_empty ( ) ) ;
86
72
let ( lower, upper) = self . stream . size_hint ( ) ;
87
- let lower = ( lower / self . cap ) . saturating_add ( chunk_len) ;
88
- let upper = match upper {
89
- Some ( x) => x. checked_add ( chunk_len) ,
90
- None => None ,
91
- } ;
73
+ let lower = lower / self . cap ;
92
74
( lower, upper)
93
75
}
94
76
}
95
77
96
78
impl < St : FusedStream > FusedStream for ReadyChunks < St > {
97
79
fn is_terminated ( & self ) -> bool {
98
- self . stream . is_terminated ( ) && self . items . is_empty ( )
80
+ self . stream . is_terminated ( )
99
81
}
100
82
}
101
83
0 commit comments