@@ -19,7 +19,7 @@ lazy_static::lazy_static! {
19
19
20
20
/// Decodes a chunked body according to
21
21
/// https://tools.ietf.org/html/rfc7230#section-4.1
22
- pub struct ChunkedDecoder < R : Read > {
22
+ pub ( crate ) struct ChunkedDecoder < R : Read > {
23
23
/// The underlying stream
24
24
inner : R ,
25
25
/// Buffer for the already read, but not yet parsed data.
@@ -38,7 +38,7 @@ pub struct ChunkedDecoder<R: Read> {
38
38
}
39
39
40
40
impl < R : Read > ChunkedDecoder < R > {
41
- pub fn new ( inner : R ) -> Self {
41
+ pub ( crate ) fn new ( inner : R ) -> Self {
42
42
let ( sender, receiver) = channel ( 1 ) ;
43
43
44
44
ChunkedDecoder {
@@ -52,96 +52,11 @@ impl<R: Read> ChunkedDecoder<R> {
52
52
}
53
53
}
54
54
55
- pub fn trailer ( & self ) -> Receiver < Vec < ( HeaderName , HeaderValue ) > > {
55
+ pub ( crate ) fn trailer ( & self ) -> Receiver < Vec < ( HeaderName , HeaderValue ) > > {
56
56
self . trailer_receiver . clone ( )
57
57
}
58
58
}
59
59
60
- fn decode_init ( buffer : Block < ' static > , pos : & Position ) -> io:: Result < DecodeResult > {
61
- use httparse:: Status ;
62
- match httparse:: parse_chunk_size ( & buffer[ pos. start ..pos. end ] ) {
63
- Ok ( Status :: Complete ( ( used, chunk_len) ) ) => {
64
- let new_pos = Position {
65
- start : pos. start + used,
66
- end : pos. end ,
67
- } ;
68
-
69
- let new_state = if chunk_len == 0 {
70
- State :: Trailer
71
- } else {
72
- State :: Chunk ( 0 , chunk_len)
73
- } ;
74
-
75
- Ok ( DecodeResult :: Some {
76
- read : 0 ,
77
- buffer,
78
- new_pos,
79
- new_state,
80
- pending : false ,
81
- } )
82
- }
83
- Ok ( Status :: Partial ) => Ok ( DecodeResult :: None ( buffer) ) ,
84
- Err ( err) => Err ( io:: Error :: new ( io:: ErrorKind :: Other , err. to_string ( ) ) ) ,
85
- }
86
- }
87
-
88
- fn decode_chunk_end ( buffer : Block < ' static > , pos : & Position ) -> io:: Result < DecodeResult > {
89
- if pos. len ( ) < 2 {
90
- return Ok ( DecodeResult :: None ( buffer) ) ;
91
- }
92
-
93
- if & buffer[ pos. start ..pos. start + 2 ] == b"\r \n " {
94
- // valid chunk end move on to a new header
95
- return Ok ( DecodeResult :: Some {
96
- read : 0 ,
97
- buffer,
98
- new_pos : Position {
99
- start : pos. start + 2 ,
100
- end : pos. end ,
101
- } ,
102
- new_state : State :: Init ,
103
- pending : false ,
104
- } ) ;
105
- }
106
-
107
- Err ( io:: Error :: from ( io:: ErrorKind :: InvalidData ) )
108
- }
109
-
110
- fn decode_trailer ( buffer : Block < ' static > , pos : & Position ) -> io:: Result < DecodeResult > {
111
- use httparse:: Status ;
112
-
113
- // read headers
114
- let mut headers = [ httparse:: EMPTY_HEADER ; 16 ] ;
115
-
116
- match httparse:: parse_headers ( & buffer[ pos. start ..pos. end ] , & mut headers) {
117
- Ok ( Status :: Complete ( ( used, headers) ) ) => {
118
- let headers = headers
119
- . iter ( )
120
- . map ( |header| {
121
- // TODO: error propagation
122
- let name = HeaderName :: from_str ( header. name ) . unwrap ( ) ;
123
- let value =
124
- HeaderValue :: from_str ( & std:: string:: String :: from_utf8_lossy ( header. value ) )
125
- . unwrap ( ) ;
126
- ( name, value)
127
- } )
128
- . collect ( ) ;
129
- Ok ( DecodeResult :: Some {
130
- read : 0 ,
131
- buffer,
132
- new_state : State :: TrailerDone ( headers) ,
133
- new_pos : Position {
134
- start : pos. start + used,
135
- end : pos. end ,
136
- } ,
137
- pending : false ,
138
- } )
139
- }
140
- Ok ( Status :: Partial ) => Ok ( DecodeResult :: None ( buffer) ) ,
141
- Err ( err) => Err ( io:: Error :: new ( io:: ErrorKind :: Other , err. to_string ( ) ) ) ,
142
- }
143
- }
144
-
145
60
impl < R : Read + Unpin > ChunkedDecoder < R > {
146
61
fn poll_read_chunk (
147
62
& mut self ,
@@ -299,17 +214,17 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
299
214
pending,
300
215
} => {
301
216
this. current = new_pos. clone ( ) ;
302
- std :: mem :: replace ( & mut this. state , new_state) ;
217
+ this. state = new_state;
303
218
304
219
if pending {
305
220
// initial_decode is still true
306
- std :: mem :: replace ( & mut this. buffer , buffer) ;
221
+ this. buffer = buffer;
307
222
return Poll :: Pending ;
308
223
}
309
224
310
225
if State :: Done == this. state || read > 0 {
311
226
// initial_decode is still true
312
- std :: mem :: replace ( & mut this. buffer , buffer) ;
227
+ this. buffer = buffer;
313
228
return Poll :: Ready ( Ok ( read) ) ;
314
229
}
315
230
@@ -328,7 +243,7 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
328
243
if buffer. capacity ( ) + 1024 <= MAX_CAPACITY {
329
244
buffer. realloc ( buffer. capacity ( ) + 1024 ) ;
330
245
} else {
331
- std :: mem :: replace ( & mut this. buffer , buffer) ;
246
+ this. buffer = buffer;
332
247
this. current = n;
333
248
return Poll :: Ready ( Err ( io:: Error :: new (
334
249
io:: ErrorKind :: Other ,
@@ -345,8 +260,7 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
345
260
// if we're here, it means that we need more data but there is none yet,
346
261
// so no decoding attempts are necessary until we get more data
347
262
this. initial_decode = false ;
348
-
349
- std:: mem:: replace ( & mut this. buffer , buffer) ;
263
+ this. buffer = buffer;
350
264
this. current = n;
351
265
return Poll :: Pending ;
352
266
}
@@ -364,17 +278,17 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
364
278
// current buffer might now contain more data inside, so we need to attempt
365
279
// to decode it next time
366
280
this. initial_decode = true ;
367
- std :: mem :: replace ( & mut this. state , new_state) ;
281
+ this. state = new_state;
368
282
this. current = new_pos;
369
283
n = new_pos;
370
284
371
285
if State :: Done == this. state || read > 0 {
372
- std :: mem :: replace ( & mut this. buffer , new_buffer) ;
286
+ this. buffer = new_buffer;
373
287
return Poll :: Ready ( Ok ( read) ) ;
374
288
}
375
289
376
290
if pending {
377
- std :: mem :: replace ( & mut this. buffer , new_buffer) ;
291
+ this. buffer = new_buffer;
378
292
return Poll :: Pending ;
379
293
}
380
294
@@ -388,8 +302,7 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
388
302
if this. buffer . is_empty ( ) || n. is_zero ( ) {
389
303
// "logical buffer" is empty, there is nothing to decode on the next step
390
304
this. initial_decode = false ;
391
-
392
- std:: mem:: replace ( & mut this. buffer , buffer) ;
305
+ this. buffer = buffer;
393
306
this. current = n;
394
307
395
308
return Poll :: Ready ( Ok ( 0 ) ) ;
@@ -464,6 +377,91 @@ impl fmt::Debug for DecodeResult {
464
377
}
465
378
}
466
379
380
+ fn decode_init ( buffer : Block < ' static > , pos : & Position ) -> io:: Result < DecodeResult > {
381
+ use httparse:: Status ;
382
+ match httparse:: parse_chunk_size ( & buffer[ pos. start ..pos. end ] ) {
383
+ Ok ( Status :: Complete ( ( used, chunk_len) ) ) => {
384
+ let new_pos = Position {
385
+ start : pos. start + used,
386
+ end : pos. end ,
387
+ } ;
388
+
389
+ let new_state = if chunk_len == 0 {
390
+ State :: Trailer
391
+ } else {
392
+ State :: Chunk ( 0 , chunk_len)
393
+ } ;
394
+
395
+ Ok ( DecodeResult :: Some {
396
+ read : 0 ,
397
+ buffer,
398
+ new_pos,
399
+ new_state,
400
+ pending : false ,
401
+ } )
402
+ }
403
+ Ok ( Status :: Partial ) => Ok ( DecodeResult :: None ( buffer) ) ,
404
+ Err ( err) => Err ( io:: Error :: new ( io:: ErrorKind :: Other , err. to_string ( ) ) ) ,
405
+ }
406
+ }
407
+
408
+ fn decode_chunk_end ( buffer : Block < ' static > , pos : & Position ) -> io:: Result < DecodeResult > {
409
+ if pos. len ( ) < 2 {
410
+ return Ok ( DecodeResult :: None ( buffer) ) ;
411
+ }
412
+
413
+ if & buffer[ pos. start ..pos. start + 2 ] == b"\r \n " {
414
+ // valid chunk end move on to a new header
415
+ return Ok ( DecodeResult :: Some {
416
+ read : 0 ,
417
+ buffer,
418
+ new_pos : Position {
419
+ start : pos. start + 2 ,
420
+ end : pos. end ,
421
+ } ,
422
+ new_state : State :: Init ,
423
+ pending : false ,
424
+ } ) ;
425
+ }
426
+
427
+ Err ( io:: Error :: from ( io:: ErrorKind :: InvalidData ) )
428
+ }
429
+
430
+ fn decode_trailer ( buffer : Block < ' static > , pos : & Position ) -> io:: Result < DecodeResult > {
431
+ use httparse:: Status ;
432
+
433
+ // read headers
434
+ let mut headers = [ httparse:: EMPTY_HEADER ; 16 ] ;
435
+
436
+ match httparse:: parse_headers ( & buffer[ pos. start ..pos. end ] , & mut headers) {
437
+ Ok ( Status :: Complete ( ( used, headers) ) ) => {
438
+ let headers = headers
439
+ . iter ( )
440
+ . map ( |header| {
441
+ // TODO: error propagation
442
+ let name = HeaderName :: from_str ( header. name ) . unwrap ( ) ;
443
+ let value =
444
+ HeaderValue :: from_str ( & std:: string:: String :: from_utf8_lossy ( header. value ) )
445
+ . unwrap ( ) ;
446
+ ( name, value)
447
+ } )
448
+ . collect ( ) ;
449
+ Ok ( DecodeResult :: Some {
450
+ read : 0 ,
451
+ buffer,
452
+ new_state : State :: TrailerDone ( headers) ,
453
+ new_pos : Position {
454
+ start : pos. start + used,
455
+ end : pos. end ,
456
+ } ,
457
+ pending : false ,
458
+ } )
459
+ }
460
+ Ok ( Status :: Partial ) => Ok ( DecodeResult :: None ( buffer) ) ,
461
+ Err ( err) => Err ( io:: Error :: new ( io:: ErrorKind :: Other , err. to_string ( ) ) ) ,
462
+ }
463
+ }
464
+
467
465
#[ cfg( test) ]
468
466
mod tests {
469
467
use super :: * ;
0 commit comments