Skip to content

Commit e84e447

Browse files
authored
Merge pull request #52 from yoshuawuyts/trailer-integration
Trailer integration
2 parents 2601cbb + 5cf5ff5 commit e84e447

File tree

4 files changed

+90
-56
lines changed

4 files changed

+90
-56
lines changed

src/chunked.rs

Lines changed: 85 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ use std::fmt;
22
use std::future::Future;
33
use std::ops::Range;
44
use std::pin::Pin;
5-
use std::str::FromStr;
65
use std::task::{Context, Poll};
76

87
use async_std::io::{self, Read};
9-
use async_std::sync::{channel, Arc, Receiver, Sender};
8+
use async_std::sync::Arc;
109
use byte_pool::{Block, BytePool};
11-
use http_types::headers::{HeaderName, HeaderValue};
10+
use http_types::trailers::{Trailers, TrailersSender};
1211

1312
const INITIAL_CAPACITY: usize = 1024 * 4;
1413
const MAX_CAPACITY: usize = 512 * 1024 * 1024; // 512 MiB
@@ -33,29 +32,20 @@ pub(crate) struct ChunkedDecoder<R: Read> {
3332
/// Current state.
3433
state: State,
3534
/// Trailer channel sender.
36-
trailer_sender: Sender<Vec<(HeaderName, HeaderValue)>>,
37-
/// Trailer channel receiver.
38-
trailer_receiver: Receiver<Vec<(HeaderName, HeaderValue)>>,
35+
trailer_sender: Option<TrailersSender>,
3936
}
4037

4138
impl<R: Read> ChunkedDecoder<R> {
42-
pub(crate) fn new(inner: R) -> Self {
43-
let (sender, receiver) = channel(1);
44-
39+
pub(crate) fn new(inner: R, trailer_sender: TrailersSender) -> Self {
4540
ChunkedDecoder {
4641
inner,
4742
buffer: POOL.alloc(INITIAL_CAPACITY),
4843
current: Range { start: 0, end: 0 },
4944
initial_decode: false, // buffer is empty initially, nothing to decode}
5045
state: State::Init,
51-
trailer_sender: sender,
52-
trailer_receiver: receiver,
46+
trailer_sender: Some(trailer_sender),
5347
}
5448
}
55-
56-
pub(crate) fn trailer(&self) -> Receiver<Vec<(HeaderName, HeaderValue)>> {
57-
self.trailer_receiver.clone()
58-
}
5949
}
6050

6151
impl<R: Read + Unpin> ChunkedDecoder<R> {
@@ -94,7 +84,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
9484

9585
return Ok(DecodeResult::Some {
9686
read,
97-
new_state,
87+
new_state: Some(new_state),
9888
new_pos,
9989
buffer,
10090
pending: false,
@@ -115,7 +105,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
115105

116106
Ok(DecodeResult::Some {
117107
read,
118-
new_state,
108+
new_state: Some(new_state),
119109
new_pos,
120110
buffer,
121111
pending: false,
@@ -124,7 +114,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
124114
Poll::Pending => {
125115
return Ok(DecodeResult::Some {
126116
read: 0,
127-
new_state: State::Chunk(new_current, len),
117+
new_state: Some(State::Chunk(new_current, len)),
128118
new_pos,
129119
buffer,
130120
pending: true,
@@ -155,14 +145,27 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
155145
decode_trailer(buffer, pos)
156146
}
157147
State::TrailerDone(ref mut headers) => {
158-
let headers = std::mem::replace(headers, Vec::new());
159-
let mut fut = Box::pin(self.trailer_sender.send(headers));
160-
match Pin::new(&mut fut).poll(cx) {
148+
let headers = std::mem::replace(headers, Trailers::new());
149+
let sender = self.trailer_sender.take();
150+
let sender =
151+
sender.expect("invalid chunked state, tried sending multiple trailers");
152+
153+
let fut = Box::pin(sender.send(Ok(headers)));
154+
Ok(DecodeResult::Some {
155+
read: 0,
156+
new_state: Some(State::TrailerSending(fut)),
157+
new_pos: pos.clone(),
158+
buffer,
159+
pending: false,
160+
})
161+
}
162+
State::TrailerSending(ref mut fut) => {
163+
match Pin::new(fut).poll(cx) {
161164
Poll::Ready(_) => {}
162165
Poll::Pending => {
163166
return Ok(DecodeResult::Some {
164167
read: 0,
165-
new_state: self.state.clone(),
168+
new_state: None,
166169
new_pos: pos.clone(),
167170
buffer,
168171
pending: true,
@@ -172,15 +175,15 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
172175

173176
Ok(DecodeResult::Some {
174177
read: 0,
175-
new_state: State::Done,
178+
new_state: Some(State::Done),
176179
new_pos: pos.clone(),
177180
buffer,
178181
pending: false,
179182
})
180183
}
181184
State::Done => Ok(DecodeResult::Some {
182185
read: 0,
183-
new_state: State::Done,
186+
new_state: Some(State::Done),
184187
new_pos: pos.clone(),
185188
buffer,
186189
pending: false,
@@ -217,15 +220,23 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
217220
pending,
218221
} => {
219222
this.current = new_pos.clone();
220-
this.state = new_state;
223+
if let Some(state) = new_state {
224+
this.state = state;
225+
}
221226

222227
if pending {
223228
// initial_decode is still true
224229
this.buffer = buffer;
225230
return Poll::Pending;
226231
}
227232

228-
if State::Done == this.state || read > 0 {
233+
if let State::Done = this.state {
234+
// initial_decode is still true
235+
this.buffer = buffer;
236+
return Poll::Ready(Ok(read));
237+
}
238+
239+
if read > 0 {
229240
// initial_decode is still true
230241
this.buffer = buffer;
231242
return Poll::Ready(Ok(read));
@@ -281,11 +292,18 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
281292
// current buffer might now contain more data inside, so we need to attempt
282293
// to decode it next time
283294
this.initial_decode = true;
284-
this.state = new_state;
295+
if let Some(state) = new_state {
296+
this.state = state;
297+
}
285298
this.current = new_pos.clone();
286299
n = new_pos;
287300

288-
if State::Done == this.state || read > 0 {
301+
if let State::Done = this.state {
302+
this.buffer = new_buffer;
303+
return Poll::Ready(Ok(read));
304+
}
305+
306+
if read > 0 {
289307
this.buffer = new_buffer;
290308
return Poll::Ready(Ok(read));
291309
}
@@ -329,7 +347,7 @@ enum DecodeResult {
329347
/// The new range of valid data in `buffer`.
330348
new_pos: Range<usize>,
331349
/// The new state.
332-
new_state: State,
350+
new_state: Option<State>,
333351
/// Should poll return `Pending`.
334352
pending: bool,
335353
},
@@ -338,7 +356,6 @@ enum DecodeResult {
338356
}
339357

340358
/// Decoder state.
341-
#[derive(Debug, PartialEq, Clone)]
342359
enum State {
343360
/// Initial state.
344361
Init,
@@ -349,10 +366,25 @@ enum State {
349366
/// Decoding trailers.
350367
Trailer,
351368
/// Trailers were decoded, are now set to the decoded trailers.
352-
TrailerDone(Vec<(HeaderName, HeaderValue)>),
369+
TrailerDone(Trailers),
370+
TrailerSending(Pin<Box<dyn Future<Output = ()> + 'static + Send + Sync>>),
353371
/// All is said and done.
354372
Done,
355373
}
374+
impl fmt::Debug for State {
375+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376+
use State::*;
377+
match self {
378+
Init => write!(f, "State::Init"),
379+
Chunk(a, b) => write!(f, "State::Chunk({}, {})", a, b),
380+
ChunkEnd => write!(f, "State::ChunkEnd"),
381+
Trailer => write!(f, "State::Trailer"),
382+
TrailerDone(trailers) => write!(f, "State::TrailerDone({:?})", &trailers),
383+
TrailerSending(_) => write!(f, "State::TrailerSending"),
384+
Done => write!(f, "State::Done"),
385+
}
386+
}
387+
}
356388

357389
impl fmt::Debug for DecodeResult {
358390
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -395,7 +427,7 @@ fn decode_init(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<DecodeR
395427
read: 0,
396428
buffer,
397429
new_pos,
398-
new_state,
430+
new_state: Some(new_state),
399431
pending: false,
400432
})
401433
}
@@ -418,7 +450,7 @@ fn decode_chunk_end(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<De
418450
start: pos.start + 2,
419451
end: pos.end,
420452
},
421-
new_state: State::Init,
453+
new_state: Some(State::Init),
422454
pending: false,
423455
});
424456
}
@@ -434,21 +466,16 @@ fn decode_trailer(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<Deco
434466

435467
match httparse::parse_headers(&buffer[pos.start..pos.end], &mut headers) {
436468
Ok(Status::Complete((used, headers))) => {
437-
let headers = headers
438-
.iter()
439-
.map(|header| {
440-
// TODO: error propagation
441-
let name = HeaderName::from_str(header.name).unwrap();
442-
let value =
443-
HeaderValue::from_str(&std::string::String::from_utf8_lossy(header.value))
444-
.unwrap();
445-
(name, value)
446-
})
447-
.collect();
469+
let mut trailers = Trailers::new();
470+
for header in headers {
471+
let value = std::string::String::from_utf8_lossy(header.value).to_string();
472+
trailers.insert(header.name, value).unwrap();
473+
}
474+
448475
Ok(DecodeResult::Some {
449476
read: 0,
450477
buffer,
451-
new_state: State::TrailerDone(headers),
478+
new_state: Some(State::TrailerDone(trailers)),
452479
new_pos: Range {
453480
start: pos.start + used,
454481
end: pos.end,
@@ -481,7 +508,10 @@ mod tests {
481508
\r\n"
482509
.as_bytes(),
483510
);
484-
let mut decoder = ChunkedDecoder::new(input);
511+
512+
let (s, _r) = async_std::sync::channel(1);
513+
let sender = TrailersSender::new(s);
514+
let mut decoder = ChunkedDecoder::new(input, sender);
485515

486516
let mut output = String::new();
487517
decoder.read_to_string(&mut output).await.unwrap();
@@ -509,19 +539,21 @@ mod tests {
509539
\r\n"
510540
.as_bytes(),
511541
);
512-
let mut decoder = ChunkedDecoder::new(input);
542+
let (s, r) = async_std::sync::channel(1);
543+
let sender = TrailersSender::new(s);
544+
let mut decoder = ChunkedDecoder::new(input, sender);
513545

514546
let mut output = String::new();
515547
decoder.read_to_string(&mut output).await.unwrap();
516548
assert_eq!(output, "MozillaDeveloperNetwork");
517549

518-
let trailer = decoder.trailer().recv().await;
550+
let trailer = r.recv().await.unwrap().unwrap();
519551
assert_eq!(
520-
trailer,
521-
Some(vec![(
522-
"Expires".parse().unwrap(),
523-
"Wed, 21 Oct 2015 07:28:00 GMT".parse().unwrap(),
524-
)])
552+
trailer.iter().collect::<Vec<_>>(),
553+
vec![(
554+
&"Expires".parse().unwrap(),
555+
&vec!["Wed, 21 Oct 2015 07:28:00 GMT".parse().unwrap()],
556+
)]
525557
);
526558
});
527559
}

src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,9 @@ where
183183
match transfer_encoding {
184184
Some(encoding) if !encoding.is_empty() => {
185185
if encoding.last().unwrap().as_str() == "chunked" {
186+
let trailers_sender = res.send_trailers();
186187
res.set_body(Body::from_reader(
187-
BufReader::new(ChunkedDecoder::new(reader)),
188+
BufReader::new(ChunkedDecoder::new(reader, trailers_sender)),
188189
None,
189190
));
190191
return Ok(res);

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
//! // tbi
2929
//! ```
3030
31-
#![forbid(unsafe_code, rust_2018_idioms)]
31+
// #![forbid(unsafe_code, rust_2018_idioms)]
3232
#![deny(missing_debug_implementations, nonstandard_style)]
3333
#![warn(missing_docs, missing_doc_code_examples, unreachable_pub)]
3434
#![cfg_attr(test, deny(warnings))]

src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,9 @@ where
403403
match transfer_encoding {
404404
Some(encoding) if !encoding.is_empty() => {
405405
if encoding.last().unwrap().as_str() == "chunked" {
406+
let trailer_sender = req.send_trailers();
406407
req.set_body(Body::from_reader(
407-
BufReader::new(ChunkedDecoder::new(reader)),
408+
BufReader::new(ChunkedDecoder::new(reader, trailer_sender)),
408409
None,
409410
));
410411
return Ok(Some(req));

0 commit comments

Comments
 (0)