@@ -7,20 +7,25 @@ use http_types::Response;
77
88const CR : u8 = b'\r' ;
99const LF : u8 = b'\n' ;
10+ const CRLF_LEN : usize = 2 ;
1011
1112/// The encoder state.
1213#[ derive( Debug ) ]
1314enum State {
15+ /// Starting state.
16+ Start ,
1417 /// Streaming out chunks.
15- Streaming ,
18+ EncodeChunks ,
19+ /// No more chunks to stream, mark the end.
20+ EndOfChunks ,
1621 /// Receiving trailers from a channel.
1722 ReceiveTrailers ,
18- /// Streaming out trailers.
23+ /// Streaming out trailers, if we received any .
1924 EncodeTrailers ,
20- /// Writing the final CRLF
25+ /// Writing out the final CRLF.
2126 EndOfStream ,
2227 /// The stream has finished.
23- Done ,
28+ End ,
2429}
2530
2631/// An encoder for chunked encoding.
@@ -36,10 +41,11 @@ impl ChunkedEncoder {
3641 /// Create a new instance.
3742 pub ( crate ) fn new ( ) -> Self {
3843 Self {
39- state : State :: Streaming ,
44+ state : State :: Start ,
4045 bytes_written : 0 ,
4146 }
4247 }
48+
4349 /// Encode an AsyncBufRead using "chunked" framing. This is used for streams
4450 /// whose length is not known up front.
4551 ///
@@ -67,16 +73,48 @@ impl ChunkedEncoder {
6773 ) -> Poll < io:: Result < usize > > {
6874 self . bytes_written = 0 ;
6975 match self . state {
70- State :: Streaming => self . encode_stream ( res, cx, buf) ,
76+ State :: Start => self . init ( res, cx, buf) ,
77+ State :: EncodeChunks => self . encode_chunks ( res, cx, buf) ,
78+ State :: EndOfChunks => self . encode_chunks_eos ( res, cx, buf) ,
7179 State :: ReceiveTrailers => self . encode_trailers ( res, cx, buf) ,
7280 State :: EncodeTrailers => self . encode_trailers ( res, cx, buf) ,
7381 State :: EndOfStream => self . encode_eos ( cx, buf) ,
74- State :: Done => Poll :: Ready ( Ok ( 0 ) ) ,
82+ State :: End => Poll :: Ready ( Ok ( 0 ) ) ,
7583 }
7684 }
7785
86+ /// Switch the internal state to a new state.
87+ fn set_state ( & mut self , state : State ) {
88+ use State :: * ;
89+ log:: trace!( "ChunkedEncoder state: {:?} -> {:?}" , self . state, state) ;
90+
91+ #[ cfg( debug_assertions) ]
92+ match self . state {
93+ Start => assert ! ( matches!( state, EncodeChunks ) ) ,
94+ EncodeChunks => assert ! ( matches!( state, EndOfChunks ) ) ,
95+ EndOfChunks => assert ! ( matches!( state, ReceiveTrailers ) ) ,
96+ ReceiveTrailers => assert ! ( matches!( state, EncodeTrailers | EndOfStream ) ) ,
97+ EncodeTrailers => assert ! ( matches!( state, EndOfStream ) ) ,
98+ EndOfStream => assert ! ( matches!( state, End ) ) ,
99+ End => panic ! ( "No state transitions allowed after the stream has ended" ) ,
100+ }
101+
102+ self . state = state;
103+ }
104+
105+ /// Init encoding.
106+ fn init (
107+ & mut self ,
108+ res : & mut Response ,
109+ cx : & mut Context < ' _ > ,
110+ buf : & mut [ u8 ] ,
111+ ) -> Poll < io:: Result < usize > > {
112+ self . set_state ( State :: EncodeChunks ) ;
113+ self . encode_chunks ( res, cx, buf)
114+ }
115+
78116 /// Stream out data using chunked encoding.
79- fn encode_stream (
117+ fn encode_chunks (
80118 & mut self ,
81119 mut res : & mut Response ,
82120 cx : & mut Context < ' _ > ,
@@ -93,17 +131,11 @@ impl ChunkedEncoder {
93131 } ,
94132 } ;
95133
96- // If the stream doesn't have any more bytes left to read we're done.
134+ // If the stream doesn't have any more bytes left to read we're done
135+ // sending chunks and it's time to move on.
97136 if src. len ( ) == 0 {
98- // Write out the final empty chunk
99- let idx = self . bytes_written ;
100- buf[ idx] = b'0' ;
101- buf[ idx + 1 ] = CR ;
102- buf[ idx + 2 ] = LF ;
103- self . bytes_written += 3 ;
104-
105- self . state = State :: ReceiveTrailers ;
106- return self . receive_trailers ( res, cx, buf) ;
137+ self . set_state ( State :: EndOfChunks ) ;
138+ return self . encode_chunks_eos ( res, cx, buf) ;
107139 }
108140
109141 // Each chunk is prefixed with the length of the data in hex, then a
@@ -114,11 +146,18 @@ impl ChunkedEncoder {
114146 // Calculate the max char count encoding the `len_prefix` statement
115147 // as hex would take. This is done by rounding up `log16(amt + 1)`.
116148 let hex_len = ( ( amt + 1 ) as f64 ) . log ( 16.0 ) . ceil ( ) as usize ;
117- let crlf_len = 2 * 2 ;
118- let buf_upper = buf_len. checked_sub ( hex_len + crlf_len ) . unwrap_or ( 0 ) ;
149+ let framing_len = hex_len + CRLF_LEN * 2 ;
150+ let buf_upper = buf_len. checked_sub ( framing_len ) . unwrap_or ( 0 ) ;
119151 let amt = amt. min ( buf_upper) ;
120152 let len_prefix = format ! ( "{:X}" , amt) . into_bytes ( ) ;
121153
154+ // Request a new buf if the current buf is too small to write any data
155+ // into. Empty frames should only be sent to mark the end of a stream.
156+ if buf. len ( ) <= framing_len {
157+ cx. waker ( ) . wake_by_ref ( ) ;
158+ return Poll :: Ready ( Ok ( self . bytes_written ) ) ;
159+ }
160+
122161 // Write our frame header to the buffer.
123162 let lower = self . bytes_written ;
124163 let upper = self . bytes_written + len_prefix. len ( ) ;
@@ -134,7 +173,7 @@ impl ChunkedEncoder {
134173 Pin :: new ( & mut res) . consume ( amt) ;
135174 self . bytes_written += amt;
136175
137- // Finalize the chunk with a final CRLF.
176+ // Finalize the chunk with a closing CRLF.
138177 let idx = self . bytes_written ;
139178 buf[ idx] = CR ;
140179 buf[ idx + 1 ] = LF ;
@@ -145,6 +184,29 @@ impl ChunkedEncoder {
145184 Poll :: Ready ( Ok ( self . bytes_written ) )
146185 }
147186
187+ fn encode_chunks_eos (
188+ & mut self ,
189+ res : & mut Response ,
190+ cx : & mut Context < ' _ > ,
191+ buf : & mut [ u8 ] ,
192+ ) -> Poll < io:: Result < usize > > {
193+ // Request a new buf if the current buf is too small to write into.
194+ if buf. len ( ) < 3 {
195+ cx. waker ( ) . wake_by_ref ( ) ;
196+ return Poll :: Ready ( Ok ( self . bytes_written ) ) ;
197+ }
198+
199+ // Write out the final empty chunk
200+ let idx = self . bytes_written ;
201+ buf[ idx] = b'0' ;
202+ buf[ idx + 1 ] = CR ;
203+ buf[ idx + 2 ] = LF ;
204+ self . bytes_written += 3 ;
205+
206+ self . set_state ( State :: ReceiveTrailers ) ;
207+ return self . receive_trailers ( res, cx, buf) ;
208+ }
209+
148210 /// Receive trailers sent to the response, and store them in an internal
149211 /// buffer.
150212 fn receive_trailers (
@@ -154,7 +216,7 @@ impl ChunkedEncoder {
154216 buf : & mut [ u8 ] ,
155217 ) -> Poll < io:: Result < usize > > {
156218 // TODO: actually wait for trailers to be received.
157- self . state = State :: EncodeTrailers ;
219+ self . set_state ( State :: EncodeTrailers ) ;
158220 self . encode_trailers ( res, cx, buf)
159221 }
160222
@@ -166,7 +228,7 @@ impl ChunkedEncoder {
166228 buf : & mut [ u8 ] ,
167229 ) -> Poll < io:: Result < usize > > {
168230 // TODO: actually encode trailers here.
169- self . state = State :: EndOfStream ;
231+ self . set_state ( State :: EndOfStream ) ;
170232 self . encode_eos ( cx, buf)
171233 }
172234
@@ -178,8 +240,7 @@ impl ChunkedEncoder {
178240 buf[ idx + 1 ] = LF ;
179241 self . bytes_written += 2 ;
180242
181- log:: trace!( "finished encoding chunked stream" ) ;
182- self . state = State :: Done ;
243+ self . set_state ( State :: End ) ;
183244 return Poll :: Ready ( Ok ( self . bytes_written ) ) ;
184245 }
185246}
0 commit comments