@@ -2,13 +2,12 @@ use std::fmt;
2
2
use std:: future:: Future ;
3
3
use std:: ops:: Range ;
4
4
use std:: pin:: Pin ;
5
- use std:: str:: FromStr ;
6
5
use std:: task:: { Context , Poll } ;
7
6
8
7
use async_std:: io:: { self , Read } ;
9
- use async_std:: sync:: { channel , Arc , Receiver , Sender } ;
8
+ use async_std:: sync:: Arc ;
10
9
use byte_pool:: { Block , BytePool } ;
11
- use http_types:: headers :: { HeaderName , HeaderValue } ;
10
+ use http_types:: trailers :: { Trailers , TrailersSender } ;
12
11
13
12
const INITIAL_CAPACITY : usize = 1024 * 4 ;
14
13
const MAX_CAPACITY : usize = 512 * 1024 * 1024 ; // 512 MiB
@@ -33,29 +32,20 @@ pub(crate) struct ChunkedDecoder<R: Read> {
33
32
/// Current state.
34
33
state : State ,
35
34
/// Trailer channel sender.
36
- trailer_sender : Sender < Vec < ( HeaderName , HeaderValue ) > > ,
37
- /// Trailer channel receiver.
38
- trailer_receiver : Receiver < Vec < ( HeaderName , HeaderValue ) > > ,
35
+ trailer_sender : Option < TrailersSender > ,
39
36
}
40
37
41
38
impl < R : Read > ChunkedDecoder < R > {
42
- pub ( crate ) fn new ( inner : R ) -> Self {
43
- let ( sender, receiver) = channel ( 1 ) ;
44
-
39
+ pub ( crate ) fn new ( inner : R , trailer_sender : TrailersSender ) -> Self {
45
40
ChunkedDecoder {
46
41
inner,
47
42
buffer : POOL . alloc ( INITIAL_CAPACITY ) ,
48
43
current : Range { start : 0 , end : 0 } ,
49
44
initial_decode : false , // buffer is empty initially, nothing to decode}
50
45
state : State :: Init ,
51
- trailer_sender : sender,
52
- trailer_receiver : receiver,
46
+ trailer_sender : Some ( trailer_sender) ,
53
47
}
54
48
}
55
-
56
- pub ( crate ) fn trailer ( & self ) -> Receiver < Vec < ( HeaderName , HeaderValue ) > > {
57
- self . trailer_receiver . clone ( )
58
- }
59
49
}
60
50
61
51
impl < R : Read + Unpin > ChunkedDecoder < R > {
@@ -94,7 +84,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
94
84
95
85
return Ok ( DecodeResult :: Some {
96
86
read,
97
- new_state,
87
+ new_state : Some ( new_state ) ,
98
88
new_pos,
99
89
buffer,
100
90
pending : false ,
@@ -115,7 +105,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
115
105
116
106
Ok ( DecodeResult :: Some {
117
107
read,
118
- new_state,
108
+ new_state : Some ( new_state ) ,
119
109
new_pos,
120
110
buffer,
121
111
pending : false ,
@@ -124,7 +114,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
124
114
Poll :: Pending => {
125
115
return Ok ( DecodeResult :: Some {
126
116
read : 0 ,
127
- new_state : State :: Chunk ( new_current, len) ,
117
+ new_state : Some ( State :: Chunk ( new_current, len) ) ,
128
118
new_pos,
129
119
buffer,
130
120
pending : true ,
@@ -155,14 +145,27 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
155
145
decode_trailer ( buffer, pos)
156
146
}
157
147
State :: TrailerDone ( ref mut headers) => {
158
- let headers = std:: mem:: replace ( headers, Vec :: new ( ) ) ;
159
- let mut fut = Box :: pin ( self . trailer_sender . send ( headers) ) ;
160
- match Pin :: new ( & mut fut) . poll ( cx) {
148
+ let headers = std:: mem:: replace ( headers, Trailers :: new ( ) ) ;
149
+ let sender = self . trailer_sender . take ( ) ;
150
+ let sender =
151
+ sender. expect ( "invalid chunked state, tried sending multiple trailers" ) ;
152
+
153
+ let fut = Box :: pin ( sender. send ( Ok ( headers) ) ) ;
154
+ Ok ( DecodeResult :: Some {
155
+ read : 0 ,
156
+ new_state : Some ( State :: TrailerSending ( fut) ) ,
157
+ new_pos : pos. clone ( ) ,
158
+ buffer,
159
+ pending : false ,
160
+ } )
161
+ }
162
+ State :: TrailerSending ( ref mut fut) => {
163
+ match Pin :: new ( fut) . poll ( cx) {
161
164
Poll :: Ready ( _) => { }
162
165
Poll :: Pending => {
163
166
return Ok ( DecodeResult :: Some {
164
167
read : 0 ,
165
- new_state : self . state . clone ( ) ,
168
+ new_state : None ,
166
169
new_pos : pos. clone ( ) ,
167
170
buffer,
168
171
pending : true ,
@@ -172,15 +175,15 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
172
175
173
176
Ok ( DecodeResult :: Some {
174
177
read : 0 ,
175
- new_state : State :: Done ,
178
+ new_state : Some ( State :: Done ) ,
176
179
new_pos : pos. clone ( ) ,
177
180
buffer,
178
181
pending : false ,
179
182
} )
180
183
}
181
184
State :: Done => Ok ( DecodeResult :: Some {
182
185
read : 0 ,
183
- new_state : State :: Done ,
186
+ new_state : Some ( State :: Done ) ,
184
187
new_pos : pos. clone ( ) ,
185
188
buffer,
186
189
pending : false ,
@@ -217,15 +220,23 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
217
220
pending,
218
221
} => {
219
222
this. current = new_pos. clone ( ) ;
220
- this. state = new_state;
223
+ if let Some ( state) = new_state {
224
+ this. state = state;
225
+ }
221
226
222
227
if pending {
223
228
// initial_decode is still true
224
229
this. buffer = buffer;
225
230
return Poll :: Pending ;
226
231
}
227
232
228
- if State :: Done == this. state || read > 0 {
233
+ if let State :: Done = this. state {
234
+ // initial_decode is still true
235
+ this. buffer = buffer;
236
+ return Poll :: Ready ( Ok ( read) ) ;
237
+ }
238
+
239
+ if read > 0 {
229
240
// initial_decode is still true
230
241
this. buffer = buffer;
231
242
return Poll :: Ready ( Ok ( read) ) ;
@@ -281,11 +292,18 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
281
292
// current buffer might now contain more data inside, so we need to attempt
282
293
// to decode it next time
283
294
this. initial_decode = true ;
284
- this. state = new_state;
295
+ if let Some ( state) = new_state {
296
+ this. state = state;
297
+ }
285
298
this. current = new_pos. clone ( ) ;
286
299
n = new_pos;
287
300
288
- if State :: Done == this. state || read > 0 {
301
+ if let State :: Done = this. state {
302
+ this. buffer = new_buffer;
303
+ return Poll :: Ready ( Ok ( read) ) ;
304
+ }
305
+
306
+ if read > 0 {
289
307
this. buffer = new_buffer;
290
308
return Poll :: Ready ( Ok ( read) ) ;
291
309
}
@@ -329,7 +347,7 @@ enum DecodeResult {
329
347
/// The new range of valid data in `buffer`.
330
348
new_pos : Range < usize > ,
331
349
/// The new state.
332
- new_state : State ,
350
+ new_state : Option < State > ,
333
351
/// Should poll return `Pending`.
334
352
pending : bool ,
335
353
} ,
@@ -338,7 +356,6 @@ enum DecodeResult {
338
356
}
339
357
340
358
/// Decoder state.
341
- #[ derive( Debug , PartialEq , Clone ) ]
342
359
enum State {
343
360
/// Initial state.
344
361
Init ,
@@ -349,10 +366,25 @@ enum State {
349
366
/// Decoding trailers.
350
367
Trailer ,
351
368
/// Trailers were decoded, are now set to the decoded trailers.
352
- TrailerDone ( Vec < ( HeaderName , HeaderValue ) > ) ,
369
+ TrailerDone ( Trailers ) ,
370
+ TrailerSending ( Pin < Box < dyn Future < Output = ( ) > + ' static + Send + Sync > > ) ,
353
371
/// All is said and done.
354
372
Done ,
355
373
}
374
+ impl fmt:: Debug for State {
375
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
376
+ use State :: * ;
377
+ match self {
378
+ Init => write ! ( f, "State::Init" ) ,
379
+ Chunk ( a, b) => write ! ( f, "State::Chunk({}, {})" , a, b) ,
380
+ ChunkEnd => write ! ( f, "State::ChunkEnd" ) ,
381
+ Trailer => write ! ( f, "State::Trailer" ) ,
382
+ TrailerDone ( trailers) => write ! ( f, "State::TrailerDone({:?})" , & trailers) ,
383
+ TrailerSending ( _) => write ! ( f, "State::TrailerSending" ) ,
384
+ Done => write ! ( f, "State::Done" ) ,
385
+ }
386
+ }
387
+ }
356
388
357
389
impl fmt:: Debug for DecodeResult {
358
390
fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
@@ -395,7 +427,7 @@ fn decode_init(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<DecodeR
395
427
read : 0 ,
396
428
buffer,
397
429
new_pos,
398
- new_state,
430
+ new_state : Some ( new_state ) ,
399
431
pending : false ,
400
432
} )
401
433
}
@@ -418,7 +450,7 @@ fn decode_chunk_end(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<De
418
450
start : pos. start + 2 ,
419
451
end : pos. end ,
420
452
} ,
421
- new_state : State :: Init ,
453
+ new_state : Some ( State :: Init ) ,
422
454
pending : false ,
423
455
} ) ;
424
456
}
@@ -434,21 +466,16 @@ fn decode_trailer(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<Deco
434
466
435
467
match httparse:: parse_headers ( & buffer[ pos. start ..pos. end ] , & mut headers) {
436
468
Ok ( Status :: Complete ( ( used, headers) ) ) => {
437
- let headers = headers
438
- . iter ( )
439
- . map ( |header| {
440
- // TODO: error propagation
441
- let name = HeaderName :: from_str ( header. name ) . unwrap ( ) ;
442
- let value =
443
- HeaderValue :: from_str ( & std:: string:: String :: from_utf8_lossy ( header. value ) )
444
- . unwrap ( ) ;
445
- ( name, value)
446
- } )
447
- . collect ( ) ;
469
+ let mut trailers = Trailers :: new ( ) ;
470
+ for header in headers {
471
+ let value = std:: string:: String :: from_utf8_lossy ( header. value ) . to_string ( ) ;
472
+ trailers. insert ( header. name , value) . unwrap ( ) ;
473
+ }
474
+
448
475
Ok ( DecodeResult :: Some {
449
476
read : 0 ,
450
477
buffer,
451
- new_state : State :: TrailerDone ( headers ) ,
478
+ new_state : Some ( State :: TrailerDone ( trailers ) ) ,
452
479
new_pos : Range {
453
480
start : pos. start + used,
454
481
end : pos. end ,
@@ -481,7 +508,10 @@ mod tests {
481
508
\r \n "
482
509
. as_bytes ( ) ,
483
510
) ;
484
- let mut decoder = ChunkedDecoder :: new ( input) ;
511
+
512
+ let ( s, _r) = async_std:: sync:: channel ( 1 ) ;
513
+ let sender = TrailersSender :: new ( s) ;
514
+ let mut decoder = ChunkedDecoder :: new ( input, sender) ;
485
515
486
516
let mut output = String :: new ( ) ;
487
517
decoder. read_to_string ( & mut output) . await . unwrap ( ) ;
@@ -509,19 +539,21 @@ mod tests {
509
539
\r \n "
510
540
. as_bytes ( ) ,
511
541
) ;
512
- let mut decoder = ChunkedDecoder :: new ( input) ;
542
+ let ( s, r) = async_std:: sync:: channel ( 1 ) ;
543
+ let sender = TrailersSender :: new ( s) ;
544
+ let mut decoder = ChunkedDecoder :: new ( input, sender) ;
513
545
514
546
let mut output = String :: new ( ) ;
515
547
decoder. read_to_string ( & mut output) . await . unwrap ( ) ;
516
548
assert_eq ! ( output, "MozillaDeveloperNetwork" ) ;
517
549
518
- let trailer = decoder . trailer ( ) . recv ( ) . await ;
550
+ let trailer = r . recv ( ) . await . unwrap ( ) . unwrap ( ) ;
519
551
assert_eq ! (
520
- trailer,
521
- Some ( vec![ (
522
- "Expires" . parse( ) . unwrap( ) ,
523
- "Wed, 21 Oct 2015 07:28:00 GMT" . parse( ) . unwrap( ) ,
524
- ) ] )
552
+ trailer. iter ( ) . collect :: < Vec <_>> ( ) ,
553
+ vec![ (
554
+ & "Expires" . parse( ) . unwrap( ) ,
555
+ & vec! [ "Wed, 21 Oct 2015 07:28:00 GMT" . parse( ) . unwrap( ) ] ,
556
+ ) ]
525
557
) ;
526
558
} ) ;
527
559
}
0 commit comments