1- use bytes:: Bytes ;
21use thiserror:: Error ;
32
43use crate :: { VarInt , connection:: send_buffer:: SendBuffer , frame} ;
@@ -49,11 +48,7 @@ impl Send {
4948 }
5049 }
5150
52- pub ( super ) fn write < S : BytesSource > (
53- & mut self ,
54- source : & mut S ,
55- limit : u64 ,
56- ) -> Result < Written , WriteError > {
51+ pub ( super ) fn write_limit ( & self , limit : u64 ) -> Result < usize , WriteError > {
5752 if !self . is_writable ( ) {
5853 return Err ( WriteError :: ClosedStream ) ;
5954 }
@@ -64,23 +59,7 @@ impl Send {
6459 if budget == 0 {
6560 return Err ( WriteError :: Blocked ) ;
6661 }
67- let mut limit = limit. min ( budget) as usize ;
68-
69- let mut result = Written :: default ( ) ;
70- loop {
71- let ( chunk, chunks_consumed) = source. pop_chunk ( limit) ;
72- result. chunks += chunks_consumed;
73- result. bytes += chunk. len ( ) ;
74-
75- if chunk. is_empty ( ) {
76- break ;
77- }
78-
79- limit -= chunk. len ( ) ;
80- self . pending . write ( chunk) ;
81- }
82-
83- Ok ( result)
62+ Ok ( limit. min ( budget) as usize )
8463 }
8564
8665 /// Update stream state due to a reset sent by the local application
@@ -143,106 +122,6 @@ impl Send {
143122 }
144123}
145124
146- /// A [`BytesSource`] implementation for `&'a mut [Bytes]`
147- ///
148- /// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
149- /// a configured limit.
150- pub ( crate ) struct BytesArray < ' a > {
151- /// The wrapped slice of `Bytes`
152- chunks : & ' a mut [ Bytes ] ,
153- /// The amount of chunks consumed from this source
154- consumed : usize ,
155- }
156-
157- impl < ' a > BytesArray < ' a > {
158- pub ( crate ) fn from_chunks ( chunks : & ' a mut [ Bytes ] ) -> Self {
159- Self {
160- chunks,
161- consumed : 0 ,
162- }
163- }
164- }
165-
166- impl BytesSource for BytesArray < ' _ > {
167- fn pop_chunk ( & mut self , limit : usize ) -> ( Bytes , usize ) {
168- // The loop exists to skip empty chunks while still marking them as
169- // consumed
170- let mut chunks_consumed = 0 ;
171-
172- while self . consumed < self . chunks . len ( ) {
173- let chunk = & mut self . chunks [ self . consumed ] ;
174-
175- if chunk. len ( ) <= limit {
176- let chunk = std:: mem:: take ( chunk) ;
177- self . consumed += 1 ;
178- chunks_consumed += 1 ;
179- if chunk. is_empty ( ) {
180- continue ;
181- }
182- return ( chunk, chunks_consumed) ;
183- } else if limit > 0 {
184- let chunk = chunk. split_to ( limit) ;
185- return ( chunk, chunks_consumed) ;
186- } else {
187- break ;
188- }
189- }
190-
191- ( Bytes :: new ( ) , chunks_consumed)
192- }
193- }
194-
195- /// A [`BytesSource`] implementation for `&[u8]`
196- ///
197- /// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
198- /// created from a reference. This allows to defer the allocation until it is
199- /// known how much data needs to be copied.
200- pub ( crate ) struct ByteSlice < ' a > {
201- /// The wrapped byte slice
202- data : & ' a [ u8 ] ,
203- }
204-
205- impl < ' a > ByteSlice < ' a > {
206- pub ( crate ) fn from_slice ( data : & ' a [ u8 ] ) -> Self {
207- Self { data }
208- }
209- }
210-
211- impl BytesSource for ByteSlice < ' _ > {
212- fn pop_chunk ( & mut self , limit : usize ) -> ( Bytes , usize ) {
213- let limit = limit. min ( self . data . len ( ) ) ;
214- if limit == 0 {
215- return ( Bytes :: new ( ) , 0 ) ;
216- }
217-
218- let chunk = Bytes :: from ( self . data [ ..limit] . to_owned ( ) ) ;
219- self . data = & self . data [ chunk. len ( ) ..] ;
220-
221- let chunks_consumed = usize:: from ( self . data . is_empty ( ) ) ;
222- ( chunk, chunks_consumed)
223- }
224- }
225-
226- /// A source of one or more buffers which can be converted into `Bytes` buffers on demand
227- ///
228- /// The purpose of this data type is to defer conversion as long as possible,
229- /// so that no heap allocation is required in case no data is writable.
230- pub ( super ) trait BytesSource {
231- /// Returns the next chunk from the source of owned chunks.
232- ///
233- /// This method will consume parts of the source.
234- /// Calling it will yield `Bytes` elements up to the configured `limit`.
235- ///
236- /// The method returns a tuple:
237- /// - The first item is the yielded `Bytes` element. The element will be
238- /// empty if the limit is zero or no more data is available.
239- /// - The second item returns how many complete chunks inside the source had
240- /// had been consumed. This can be less than 1, if a chunk inside the
241- /// source had been truncated in order to adhere to the limit. It can also
242- /// be more than 1, if zero-length chunks had been skipped.
243- fn pop_chunk ( & mut self , limit : usize ) -> ( Bytes , usize ) ;
244- }
245-
246125/// Indicates how many bytes and chunks had been transferred in a write operation
247126#[ derive( Debug , Default , PartialEq , Eq , Clone , Copy ) ]
248127pub struct Written {
@@ -303,100 +182,3 @@ pub enum FinishError {
303182 #[ error( "closed stream" ) ]
304183 ClosedStream ,
305184}
306-
307- #[ cfg( test) ]
308- mod tests {
309- use super :: * ;
310-
311- #[ test]
312- fn bytes_array ( ) {
313- let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ" . to_owned ( ) ;
314- for limit in 0 ..full. len ( ) {
315- let mut chunks = [
316- Bytes :: from_static ( b"" ) ,
317- Bytes :: from_static ( b"Hello " ) ,
318- Bytes :: from_static ( b"Wo" ) ,
319- Bytes :: from_static ( b"" ) ,
320- Bytes :: from_static ( b"r" ) ,
321- Bytes :: from_static ( b"ld" ) ,
322- Bytes :: from_static ( b"" ) ,
323- Bytes :: from_static ( b" 12345678" ) ,
324- Bytes :: from_static ( b"9 ABCDE" ) ,
325- Bytes :: from_static ( b"F" ) ,
326- Bytes :: from_static ( b"GHJIJKLMNOPQRSTUVWXYZ" ) ,
327- ] ;
328- let num_chunks = chunks. len ( ) ;
329- let last_chunk_len = chunks[ chunks. len ( ) - 1 ] . len ( ) ;
330-
331- let mut array = BytesArray :: from_chunks ( & mut chunks) ;
332-
333- let mut buf = Vec :: new ( ) ;
334- let mut chunks_popped = 0 ;
335- let mut chunks_consumed = 0 ;
336- let mut remaining = limit;
337- loop {
338- let ( chunk, consumed) = array. pop_chunk ( remaining) ;
339- chunks_consumed += consumed;
340-
341- if !chunk. is_empty ( ) {
342- buf. extend_from_slice ( & chunk) ;
343- remaining -= chunk. len ( ) ;
344- chunks_popped += 1 ;
345- } else {
346- break ;
347- }
348- }
349-
350- assert_eq ! ( & buf[ ..] , & full[ ..limit] ) ;
351-
352- if limit == full. len ( ) {
353- // Full consumption of the last chunk
354- assert_eq ! ( chunks_consumed, num_chunks) ;
355- // Since there are empty chunks, we consume more than there are popped
356- assert_eq ! ( chunks_consumed, chunks_popped + 3 ) ;
357- } else if limit > full. len ( ) - last_chunk_len {
358- // Partial consumption of the last chunk
359- assert_eq ! ( chunks_consumed, num_chunks - 1 ) ;
360- assert_eq ! ( chunks_consumed, chunks_popped + 2 ) ;
361- }
362- }
363- }
364-
365- #[ test]
366- fn byte_slice ( ) {
367- let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ" . to_owned ( ) ;
368- for limit in 0 ..full. len ( ) {
369- let mut array = ByteSlice :: from_slice ( & full[ ..] ) ;
370-
371- let mut buf = Vec :: new ( ) ;
372- let mut chunks_popped = 0 ;
373- let mut chunks_consumed = 0 ;
374- let mut remaining = limit;
375- loop {
376- let ( chunk, consumed) = array. pop_chunk ( remaining) ;
377- chunks_consumed += consumed;
378-
379- if !chunk. is_empty ( ) {
380- buf. extend_from_slice ( & chunk) ;
381- remaining -= chunk. len ( ) ;
382- chunks_popped += 1 ;
383- } else {
384- break ;
385- }
386- }
387-
388- assert_eq ! ( & buf[ ..] , & full[ ..limit] ) ;
389- if limit != 0 {
390- assert_eq ! ( chunks_popped, 1 ) ;
391- } else {
392- assert_eq ! ( chunks_popped, 0 ) ;
393- }
394-
395- if limit == full. len ( ) {
396- assert_eq ! ( chunks_consumed, 1 ) ;
397- } else {
398- assert_eq ! ( chunks_consumed, 0 ) ;
399- }
400- }
401- }
402- }
0 commit comments