Skip to content

Commit 973ca73

Browse files
basics work
1 parent 55f2d4f commit 973ca73

File tree

2 files changed

+174
-20
lines changed

2 files changed

+174
-20
lines changed

src/chunked.rs

Lines changed: 173 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
use async_std::io::{self, BufRead, Read};
2-
use async_std::sync::Arc;
3-
use byte_pool::{Block, BytePool};
41
use std::fmt;
2+
use std::future::Future;
53
use std::pin::Pin;
4+
use std::str::FromStr;
65
use std::task::{Context, Poll};
76

7+
use async_std::io::{self, BufRead, Read};
8+
use async_std::sync::{channel, Arc, Receiver, Sender};
9+
use byte_pool::{Block, BytePool};
10+
use http_types::headers::{HeaderName, HeaderValue};
11+
812
const INITIAL_CAPACITY: usize = 1024 * 4;
913
const MAX_CAPACITY: usize = 512 * 1024 * 1024; // 512 MiB
1014

@@ -27,20 +31,33 @@ pub struct ChunkedDecoder<R: BufRead> {
2731
/// Whether we should attempt to decode whatever is currently inside the buffer.
2832
/// False indicates that we know for certain that the buffer is incomplete.
2933
initial_decode: bool,
34+
/// Current state.
3035
state: State,
36+
/// Trailer channel sender.
37+
trailer_sender: Sender<(HeaderName, HeaderValue)>,
38+
/// Trailer channel receiver.
39+
trailer_receiver: Receiver<(HeaderName, HeaderValue)>,
3140
}
3241

3342
impl<R: BufRead> ChunkedDecoder<R> {
3443
pub fn new(inner: R) -> Self {
44+
let (sender, receiver) = channel(1);
45+
3546
ChunkedDecoder {
3647
inner,
3748
buffer: POOL.alloc(INITIAL_CAPACITY),
3849
current: Position::default(),
3950
decode_needs: 0,
4051
initial_decode: false, // buffer is empty initially, nothing to decode}
4152
state: State::Init,
53+
trailer_sender: sender,
54+
trailer_receiver: receiver,
4255
}
4356
}
57+
58+
pub fn trailer(&self) -> Receiver<(HeaderName, HeaderValue)> {
59+
self.trailer_receiver.clone()
60+
}
4461
}
4562

4663
fn decode_init(buffer: Block<'static>, pos: &Position, buf: &mut [u8]) -> io::Result<DecodeResult> {
@@ -78,39 +95,79 @@ fn decode_chunk(
7895

7996
buf[..to_read].copy_from_slice(&buffer[pos.start..pos.start + to_read]);
8097

98+
dbg!(to_read);
99+
dbg!(std::str::from_utf8(&buf[..to_read]));
100+
101+
let new_pos = Position {
102+
start: pos.start + to_read,
103+
end: pos.end,
104+
};
81105
let new_state = if left_to_read - to_read > 0 {
82106
State::Chunk(current + to_read as u64, len)
83107
} else {
84-
State::Init
108+
// read one \r\n
109+
State::ChunkEnd
85110
};
86111

87112
Ok(DecodeResult::Some {
88113
read: to_read,
89114
buffer,
90115
new_state,
91-
new_pos: Position {
92-
start: pos.start + to_read,
93-
end: pos.end,
94-
},
116+
new_pos,
95117
})
96118
}
97119

120+
fn decode_chunk_end(
121+
buffer: Block<'static>,
122+
pos: &Position,
123+
buf: &mut [u8],
124+
) -> io::Result<DecodeResult> {
125+
if pos.len() < 2 {
126+
return Ok(DecodeResult::None(buffer));
127+
}
128+
129+
if &buffer[pos.start..pos.start + 2] == b"\r\n" {
130+
// valid chunk end move on to a new header
131+
return decode_init(
132+
buffer,
133+
&Position {
134+
start: pos.start + 2,
135+
end: pos.end,
136+
},
137+
buf,
138+
);
139+
}
140+
141+
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]));
142+
143+
Err(io::Error::from(io::ErrorKind::InvalidData))
144+
}
145+
98146
fn decode_trailer(buffer: Block<'static>, pos: &Position) -> io::Result<DecodeResult> {
99147
use httparse::Status;
100148

101-
// TODO: find a way to emit the actual read headers
102-
103149
// read headers
104150
let mut headers = [httparse::EMPTY_HEADER; 16];
105151
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]));
106152
match httparse::parse_headers(&buffer[pos.start..pos.end], &mut headers) {
107153
Ok(Status::Complete((used, headers))) => {
108154
dbg!(headers);
109-
155+
dbg!(used);
156+
let headers = headers
157+
.iter()
158+
.map(|header| {
159+
// TODO: error propagation
160+
let name = HeaderName::from_str(header.name).unwrap();
161+
let value =
162+
HeaderValue::from_str(std::str::from_utf8(header.value).unwrap()).unwrap();
163+
(name, value)
164+
})
165+
.collect();
166+
dbg!(&headers);
110167
Ok(DecodeResult::Some {
111-
read: used,
168+
read: 0,
112169
buffer,
113-
new_state: State::Done,
170+
new_state: State::TrailerDone(headers),
114171
new_pos: Position {
115172
start: pos.start + used,
116173
end: pos.end,
@@ -143,6 +200,14 @@ impl<R: BufRead + Unpin + Send + 'static> ChunkedDecoder<R> {
143200
})
144201
}
145202

203+
fn poll_read_chunk_end(
204+
self: Pin<&mut Self>,
205+
cx: &mut Context<'_>,
206+
buf: &mut [u8],
207+
) -> Poll<io::Result<usize>> {
208+
self.poll_read_inner(cx, buf, decode_chunk_end)
209+
}
210+
146211
fn poll_read_trailer(
147212
self: Pin<&mut Self>,
148213
cx: &mut Context<'_>,
@@ -178,7 +243,11 @@ impl<R: BufRead + Unpin + Send + 'static> ChunkedDecoder<R> {
178243
std::mem::replace(&mut this.buffer, buffer);
179244
std::mem::replace(&mut this.state, new_state);
180245
this.current = new_pos;
181-
return Poll::Ready(Ok(read));
246+
247+
if State::Done == this.state || read > 0 {
248+
return Poll::Ready(Ok(read));
249+
}
250+
return Poll::Pending;
182251
}
183252
DecodeResult::None(buffer) => buffer,
184253
}
@@ -217,7 +286,7 @@ impl<R: BufRead + Unpin + Send + 'static> ChunkedDecoder<R> {
217286
match f(buffer, &n, buf)? {
218287
DecodeResult::Some {
219288
read,
220-
buffer,
289+
buffer: new_buffer,
221290
new_pos,
222291
new_state,
223292
} => {
@@ -228,9 +297,14 @@ impl<R: BufRead + Unpin + Send + 'static> ChunkedDecoder<R> {
228297
// to decode it next time
229298
this.initial_decode = true;
230299
std::mem::replace(&mut this.state, new_state);
231-
std::mem::replace(&mut this.buffer, buffer);
232300
this.current = new_pos;
233-
return Poll::Ready(Ok(read));
301+
if State::Done == this.state || read > 0 {
302+
std::mem::replace(&mut this.buffer, new_buffer);
303+
return Poll::Ready(Ok(read));
304+
}
305+
306+
buffer = new_buffer;
307+
continue;
234308
}
235309
DecodeResult::None(buf) => {
236310
buffer = buf;
@@ -262,7 +336,7 @@ impl<R: BufRead + Unpin + Send + 'static> ChunkedDecoder<R> {
262336
impl<R: BufRead + Unpin + Send + 'static> Read for ChunkedDecoder<R> {
263337
#[allow(missing_doc_code_examples)]
264338
fn poll_read(
265-
self: Pin<&mut Self>,
339+
mut self: Pin<&mut Self>,
266340
cx: &mut Context<'_>,
267341
buf: &mut [u8],
268342
) -> Poll<io::Result<usize>> {
@@ -275,10 +349,22 @@ impl<R: BufRead + Unpin + Send + 'static> Read for ChunkedDecoder<R> {
275349
// reading a chunk
276350
self.poll_read_chunk(cx, buf, current, len)
277351
}
352+
State::ChunkEnd => self.poll_read_chunk_end(cx, buf),
278353
State::Trailer => {
279354
// reading the trailer headers
280355
self.poll_read_trailer(cx, buf)
281356
}
357+
State::TrailerDone(ref mut headers) => {
358+
dbg!("Done");
359+
let headers = std::mem::replace(headers, Vec::new());
360+
for (name, value) in headers.into_iter() {
361+
dbg!(&name);
362+
let mut fut = self.trailer_sender.send((name, value));
363+
async_std::task::ready!(unsafe { Pin::new_unchecked(&mut fut) }.poll(cx));
364+
}
365+
self.state = State::Done;
366+
Poll::Ready(Ok(0))
367+
}
282368
State::Done => Poll::Ready(Ok(0)),
283369
}
284370
}
@@ -317,11 +403,13 @@ enum DecodeResult {
317403
None(Block<'static>),
318404
}
319405

320-
#[derive(Debug)]
406+
#[derive(Debug, PartialEq)]
321407
enum State {
322408
Init,
323409
Chunk(u64, u64),
410+
ChunkEnd,
324411
Trailer,
412+
TrailerDone(Vec<(HeaderName, HeaderValue)>),
325413
Done,
326414
}
327415

@@ -344,3 +432,69 @@ impl fmt::Debug for DecodeResult {
344432
}
345433
}
346434
}
435+
436+
#[cfg(test)]
437+
mod tests {
438+
use super::*;
439+
use async_std::prelude::*;
440+
441+
#[test]
442+
fn test_chunked_wiki() {
443+
async_std::task::block_on(async move {
444+
let input = async_std::io::Cursor::new(
445+
"4\r\n\
446+
Wiki\r\n\
447+
5\r\n\
448+
pedia\r\n\
449+
E\r\n in\r\n\
450+
\r\n\
451+
chunks.\r\n\
452+
0\r\n\
453+
\r\n"
454+
.as_bytes(),
455+
);
456+
let mut decoder = ChunkedDecoder::new(input);
457+
458+
let mut output = String::new();
459+
decoder.read_to_string(&mut output).await.unwrap();
460+
assert_eq!(
461+
output,
462+
"Wikipedia in\r\n\
463+
\r\n\
464+
chunks."
465+
);
466+
});
467+
}
468+
469+
#[test]
470+
fn test_chunked_mdn() {
471+
async_std::task::block_on(async move {
472+
let input = async_std::io::Cursor::new(
473+
"7\r\n\
474+
Mozilla\r\n\
475+
9\r\n\
476+
Developer\r\n\
477+
7\r\n\
478+
Network\r\n\
479+
0\r\n\
480+
Expires: Wed, 21 Oct 2015 07:28:00 GMT\r\n\
481+
\r\n"
482+
.as_bytes(),
483+
);
484+
let mut decoder = ChunkedDecoder::new(input);
485+
486+
let mut output = String::new();
487+
decoder.read_to_string(&mut output).await.unwrap();
488+
assert_eq!(output, "MozillaDeveloperNetwork");
489+
490+
let trailer = decoder.trailer().recv().await;
491+
assert_eq!(
492+
trailer,
493+
Some((
494+
"Expires".parse().unwrap(),
495+
"Wed, 21 Oct 2015 07:28:00 GMT".parse().unwrap(),
496+
))
497+
);
498+
});
499+
}
500+
}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
//! // tbi
2929
//! ```
3030
31-
#![forbid(unsafe_code, rust_2018_idioms)]
31+
// #![forbid(unsafe_code, rust_2018_idioms)]
3232
#![deny(missing_debug_implementations, nonstandard_style)]
3333
// #![warn(missing_docs, missing_doc_code_examples, unreachable_pub)]
3434
// #![cfg_attr(test, deny(warnings))]

0 commit comments

Comments
 (0)