11use std:: {
2- future:: Future ,
2+ future:: { Future , poll_fn } ,
33 io,
44 pin:: Pin ,
5- task:: { Context , Poll , ready } ,
5+ task:: { Context , Poll } ,
66} ;
77
88use bytes:: Bytes ;
@@ -50,14 +50,18 @@ impl SendStream {
5050 ///
5151 /// This operation is cancel-safe.
5252 pub async fn write ( & mut self , buf : & [ u8 ] ) -> Result < usize , WriteError > {
53- Write { stream : self , buf } . await
53+ poll_fn ( |cx| self . execute_poll ( cx , |s| s . write ( buf ) ) ) . await
5454 }
5555
5656 /// Convenience method to write an entire buffer to the stream
5757 ///
5858 /// This operation is *not* cancel-safe.
59- pub async fn write_all ( & mut self , buf : & [ u8 ] ) -> Result < ( ) , WriteError > {
60- WriteAll { stream : self , buf } . await
59+ pub async fn write_all ( & mut self , mut buf : & [ u8 ] ) -> Result < ( ) , WriteError > {
60+ while !buf. is_empty ( ) {
61+ let written = self . write ( buf) . await ?;
62+ buf = & buf[ written..] ;
63+ }
64+ Ok ( ( ) )
6165 }
6266
6367 /// Write chunks to the stream
@@ -68,30 +72,26 @@ impl SendStream {
6872 ///
6973 /// This operation is cancel-safe.
7074 pub async fn write_chunks ( & mut self , bufs : & mut [ Bytes ] ) -> Result < Written , WriteError > {
71- WriteChunks { stream : self , bufs } . await
75+ poll_fn ( |cx| self . execute_poll ( cx , |s| s . write_chunks ( bufs ) ) ) . await
7276 }
7377
7478 /// Convenience method to write a single chunk in its entirety to the stream
7579 ///
7680 /// This operation is *not* cancel-safe.
7781 pub async fn write_chunk ( & mut self , buf : Bytes ) -> Result < ( ) , WriteError > {
78- WriteChunk {
79- stream : self ,
80- buf : [ buf] ,
81- }
82- . await
82+ self . write_all_chunks ( & mut [ buf] ) . await ?;
83+ Ok ( ( ) )
8384 }
8485
8586 /// Convenience method to write an entire list of chunks to the stream
8687 ///
8788 /// This operation is *not* cancel-safe.
88- pub async fn write_all_chunks ( & mut self , bufs : & mut [ Bytes ] ) -> Result < ( ) , WriteError > {
89- WriteAllChunks {
90- stream : self ,
91- bufs,
92- offset : 0 ,
89+ pub async fn write_all_chunks ( & mut self , mut bufs : & mut [ Bytes ] ) -> Result < ( ) , WriteError > {
90+ while !bufs. is_empty ( ) {
91+ let written = self . write_chunks ( bufs) . await ?;
92+ bufs = & mut bufs[ written. chunks ..] ;
9393 }
94- . await
94+ Ok ( ( ) )
9595 }
9696
9797 fn execute_poll < F , R > ( & mut self , cx : & mut Context , write_fn : F ) -> Poll < Result < R , WriteError > >
@@ -315,109 +315,6 @@ impl Future for Stopped<'_> {
315315 }
316316}
317317
318- /// Future produced by [`SendStream::write()`].
319- ///
320- /// [`SendStream::write()`]: crate::SendStream::write
321- struct Write < ' a > {
322- stream : & ' a mut SendStream ,
323- buf : & ' a [ u8 ] ,
324- }
325-
326- impl Future for Write < ' _ > {
327- type Output = Result < usize , WriteError > ;
328- fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
329- let this = self . get_mut ( ) ;
330- let buf = this. buf ;
331- this. stream . execute_poll ( cx, |s| s. write ( buf) )
332- }
333- }
334-
335- /// Future produced by [`SendStream::write_all()`].
336- ///
337- /// [`SendStream::write_all()`]: crate::SendStream::write_all
338- struct WriteAll < ' a > {
339- stream : & ' a mut SendStream ,
340- buf : & ' a [ u8 ] ,
341- }
342-
343- impl Future for WriteAll < ' _ > {
344- type Output = Result < ( ) , WriteError > ;
345- fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
346- let this = self . get_mut ( ) ;
347- loop {
348- if this. buf . is_empty ( ) {
349- return Poll :: Ready ( Ok ( ( ) ) ) ;
350- }
351- let buf = this. buf ;
352- let n = ready ! ( this. stream. execute_poll( cx, |s| s. write( buf) ) ) ?;
353- this. buf = & this. buf [ n..] ;
354- }
355- }
356- }
357-
358- /// Future produced by [`SendStream::write_chunks()`].
359- ///
360- /// [`SendStream::write_chunks()`]: crate::SendStream::write_chunks
361- struct WriteChunks < ' a > {
362- stream : & ' a mut SendStream ,
363- bufs : & ' a mut [ Bytes ] ,
364- }
365-
366- impl Future for WriteChunks < ' _ > {
367- type Output = Result < Written , WriteError > ;
368- fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
369- let this = self . get_mut ( ) ;
370- let bufs = & mut * this. bufs ;
371- this. stream . execute_poll ( cx, |s| s. write_chunks ( bufs) )
372- }
373- }
374-
375- /// Future produced by [`SendStream::write_chunk()`].
376- ///
377- /// [`SendStream::write_chunk()`]: crate::SendStream::write_chunk
378- struct WriteChunk < ' a > {
379- stream : & ' a mut SendStream ,
380- buf : [ Bytes ; 1 ] ,
381- }
382-
383- impl Future for WriteChunk < ' _ > {
384- type Output = Result < ( ) , WriteError > ;
385- fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
386- let this = self . get_mut ( ) ;
387- loop {
388- if this. buf [ 0 ] . is_empty ( ) {
389- return Poll :: Ready ( Ok ( ( ) ) ) ;
390- }
391- let bufs = & mut this. buf [ ..] ;
392- ready ! ( this. stream. execute_poll( cx, |s| s. write_chunks( bufs) ) ) ?;
393- }
394- }
395- }
396-
397- /// Future produced by [`SendStream::write_all_chunks()`].
398- ///
399- /// [`SendStream::write_all_chunks()`]: crate::SendStream::write_all_chunks
400- struct WriteAllChunks < ' a > {
401- stream : & ' a mut SendStream ,
402- bufs : & ' a mut [ Bytes ] ,
403- offset : usize ,
404- }
405-
406- impl Future for WriteAllChunks < ' _ > {
407- type Output = Result < ( ) , WriteError > ;
408- fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
409- let this = self . get_mut ( ) ;
410- loop {
411- if this. offset == this. bufs . len ( ) {
412- return Poll :: Ready ( Ok ( ( ) ) ) ;
413- }
414- let bufs = & mut this. bufs [ this. offset ..] ;
415- let written = ready ! ( this. stream. execute_poll( cx, |s| s. write_chunks( bufs) ) ) ?;
416- this. offset += written. chunks ;
417- }
418- }
419- }
420-
421318/// Errors that arise from writing to a stream
422319#[ derive( Debug , Error , Clone , PartialEq , Eq ) ]
423320pub enum WriteError {
0 commit comments