Skip to content

Commit 55f2d4f

Browse files
start decoding
1 parent 1d1ad29 commit 55f2d4f

File tree

4 files changed

+330
-22
lines changed

4 files changed

+330
-22
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ futures-core-preview = "0.3.0-alpha.18"
2020
http-types = { path = '../http-types' }
2121
thiserror = "1.0.9"
2222
pin-project-lite = "0.1.1"
23+
byte-pool = "0.2.1"
24+
lazy_static = "1.4.0"
2325

2426
[dev-dependencies]
2527
futures-util = "0.3.0"

src/chunked.rs

Lines changed: 321 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,261 @@
11
use async_std::io::{self, BufRead, Read};
2+
use async_std::sync::Arc;
3+
use byte_pool::{Block, BytePool};
4+
use std::fmt;
25
use std::pin::Pin;
36
use std::task::{Context, Poll};
47

5-
pin_project_lite::pin_project! {
6-
pub struct ChunkedDecoder<R> {
7-
#[pin]
8-
reader: R,
9-
}
8+
const INITIAL_CAPACITY: usize = 1024 * 4;
9+
const MAX_CAPACITY: usize = 512 * 1024 * 1024; // 512 MiB
10+
11+
lazy_static::lazy_static! {
12+
/// The global buffer pool we use for storing incoming data.
13+
pub(crate) static ref POOL: Arc<BytePool> = Arc::new(BytePool::new());
1014
}
1115

1216
/// Decodes a chunked body according to
1317
/// https://tools.ietf.org/html/rfc7230#section-4.1
14-
impl<R> ChunkedDecoder<R> {
15-
pub fn new(reader: R) -> Self {
16-
ChunkedDecoder { reader }
18+
pub struct ChunkedDecoder<R: BufRead> {
19+
/// The underlying stream
20+
inner: R,
21+
/// Buffer for the already read, but not yet parsed data.
22+
buffer: Block<'static>,
23+
/// Position of valid read data into buffer.
24+
current: Position,
25+
/// How many bytes do we need to finishe the currrent element that is being decoded.
26+
decode_needs: usize,
27+
/// Whether we should attempt to decode whatever is currently inside the buffer.
28+
/// False indicates that we know for certain that the buffer is incomplete.
29+
initial_decode: bool,
30+
state: State,
31+
}
32+
33+
impl<R: BufRead> ChunkedDecoder<R> {
34+
pub fn new(inner: R) -> Self {
35+
ChunkedDecoder {
36+
inner,
37+
buffer: POOL.alloc(INITIAL_CAPACITY),
38+
current: Position::default(),
39+
decode_needs: 0,
40+
initial_decode: false, // buffer is empty initially, nothing to decode}
41+
state: State::Init,
42+
}
43+
}
44+
}
45+
46+
fn decode_init(buffer: Block<'static>, pos: &Position, buf: &mut [u8]) -> io::Result<DecodeResult> {
47+
use httparse::Status;
48+
49+
match httparse::parse_chunk_size(&buffer[pos.start..pos.end]) {
50+
Ok(Status::Complete((used, chunk_len))) => {
51+
let new_pos = Position {
52+
start: pos.start + used,
53+
end: pos.end,
54+
};
55+
56+
if chunk_len == 0 {
57+
// TODO: decode last_chunk
58+
decode_trailer(buffer, &new_pos)
59+
} else {
60+
decode_chunk(buffer, &new_pos, buf, 0, chunk_len)
61+
}
62+
}
63+
Ok(Status::Partial) => Ok(DecodeResult::None(buffer)),
64+
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
65+
}
66+
}
67+
68+
fn decode_chunk(
69+
buffer: Block<'static>,
70+
pos: &Position,
71+
buf: &mut [u8],
72+
current: u64,
73+
len: u64,
74+
) -> io::Result<DecodeResult> {
75+
let left_to_read = (len - current) as usize;
76+
let available = std::cmp::min(pos.len(), buf.len());
77+
let to_read = std::cmp::min(left_to_read, available);
78+
79+
buf[..to_read].copy_from_slice(&buffer[pos.start..pos.start + to_read]);
80+
81+
let new_state = if left_to_read - to_read > 0 {
82+
State::Chunk(current + to_read as u64, len)
83+
} else {
84+
State::Init
85+
};
86+
87+
Ok(DecodeResult::Some {
88+
read: to_read,
89+
buffer,
90+
new_state,
91+
new_pos: Position {
92+
start: pos.start + to_read,
93+
end: pos.end,
94+
},
95+
})
96+
}
97+
98+
fn decode_trailer(buffer: Block<'static>, pos: &Position) -> io::Result<DecodeResult> {
99+
use httparse::Status;
100+
101+
// TODO: find a way to emit the actual read headers
102+
103+
// read headers
104+
let mut headers = [httparse::EMPTY_HEADER; 16];
105+
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]));
106+
match httparse::parse_headers(&buffer[pos.start..pos.end], &mut headers) {
107+
Ok(Status::Complete((used, headers))) => {
108+
dbg!(headers);
109+
110+
Ok(DecodeResult::Some {
111+
read: used,
112+
buffer,
113+
new_state: State::Done,
114+
new_pos: Position {
115+
start: pos.start + used,
116+
end: pos.end,
117+
},
118+
})
119+
}
120+
Ok(Status::Partial) => Ok(DecodeResult::None(buffer)),
121+
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
122+
}
123+
}
124+
125+
impl<R: BufRead + Unpin + Send + 'static> ChunkedDecoder<R> {
126+
fn poll_read_init(
127+
self: Pin<&mut Self>,
128+
cx: &mut Context<'_>,
129+
buf: &mut [u8],
130+
) -> Poll<io::Result<usize>> {
131+
self.poll_read_inner(cx, buf, decode_init)
132+
}
133+
134+
fn poll_read_chunk(
135+
self: Pin<&mut Self>,
136+
cx: &mut Context<'_>,
137+
buf: &mut [u8],
138+
current: u64,
139+
len: u64,
140+
) -> Poll<io::Result<usize>> {
141+
self.poll_read_inner(cx, buf, |buffer, pos, buf| {
142+
decode_chunk(buffer, pos, buf, current, len)
143+
})
144+
}
145+
146+
fn poll_read_trailer(
147+
self: Pin<&mut Self>,
148+
cx: &mut Context<'_>,
149+
buf: &mut [u8],
150+
) -> Poll<io::Result<usize>> {
151+
self.poll_read_inner(cx, buf, |buffer, pos, _| decode_trailer(buffer, pos))
152+
}
153+
154+
fn poll_read_inner<F>(
155+
mut self: Pin<&mut Self>,
156+
cx: &mut Context<'_>,
157+
buf: &mut [u8],
158+
mut f: F,
159+
) -> Poll<io::Result<usize>>
160+
where
161+
F: FnMut(Block<'static>, &Position, &mut [u8]) -> io::Result<DecodeResult>,
162+
{
163+
let this = &mut *self;
164+
165+
let mut n = std::mem::replace(&mut this.current, Position::default());
166+
let buffer = std::mem::replace(&mut this.buffer, POOL.alloc(INITIAL_CAPACITY));
167+
168+
let mut buffer = if n.len() > 0 && this.initial_decode {
169+
match f(buffer, &n, buf)? {
170+
DecodeResult::Some {
171+
read,
172+
buffer,
173+
new_pos,
174+
new_state,
175+
} => {
176+
dbg!(std::str::from_utf8(&buf[..read]));
177+
// initial_decode is still true
178+
std::mem::replace(&mut this.buffer, buffer);
179+
std::mem::replace(&mut this.state, new_state);
180+
this.current = new_pos;
181+
return Poll::Ready(Ok(read));
182+
}
183+
DecodeResult::None(buffer) => buffer,
184+
}
185+
} else {
186+
buffer
187+
};
188+
189+
loop {
190+
if n.len() + this.decode_needs >= buffer.capacity() {
191+
if buffer.capacity() + this.decode_needs < MAX_CAPACITY {
192+
buffer.realloc(buffer.capacity() + this.decode_needs);
193+
} else {
194+
std::mem::replace(&mut this.buffer, buffer);
195+
this.current = n;
196+
return Poll::Ready(Err(io::Error::new(
197+
io::ErrorKind::Other,
198+
"incoming data too large",
199+
)));
200+
}
201+
}
202+
203+
let bytes_read = match Pin::new(&mut this.inner).poll_read(cx, &mut buffer[n.end..]) {
204+
Poll::Ready(result) => result?,
205+
Poll::Pending => {
206+
// if we're here, it means that we need more data but there is none yet,
207+
// so no decoding attempts are necessary until we get more data
208+
this.initial_decode = false;
209+
210+
std::mem::replace(&mut this.buffer, buffer);
211+
this.current = n;
212+
return Poll::Pending;
213+
}
214+
};
215+
n.end += bytes_read;
216+
217+
match f(buffer, &n, buf)? {
218+
DecodeResult::Some {
219+
read,
220+
buffer,
221+
new_pos,
222+
new_state,
223+
} => {
224+
dbg!(read);
225+
dbg!(&new_state);
226+
dbg!(std::str::from_utf8(&buf[..read]));
227+
// current buffer might now contain more data inside, so we need to attempt
228+
// to decode it next time
229+
this.initial_decode = true;
230+
std::mem::replace(&mut this.state, new_state);
231+
std::mem::replace(&mut this.buffer, buffer);
232+
this.current = new_pos;
233+
return Poll::Ready(Ok(read));
234+
}
235+
DecodeResult::None(buf) => {
236+
buffer = buf;
237+
238+
if this.buffer.is_empty() || n.is_zero() {
239+
// "logical buffer" is empty, there is nothing to decode on the next step
240+
this.initial_decode = false;
241+
242+
std::mem::replace(&mut this.buffer, buffer);
243+
this.current = n;
244+
return Poll::Ready(Ok(0));
245+
} else if n.len() == 0 {
246+
// "logical buffer" is empty, there is nothing to decode on the next step
247+
this.initial_decode = false;
248+
249+
std::mem::replace(&mut this.buffer, buffer);
250+
this.current = n;
251+
return Poll::Ready(Err(io::Error::new(
252+
io::ErrorKind::UnexpectedEof,
253+
"bytes remaining in stream",
254+
)));
255+
}
256+
}
257+
}
258+
}
17259
}
18260
}
19261

@@ -24,20 +266,81 @@ impl<R: BufRead + Unpin + Send + 'static> Read for ChunkedDecoder<R> {
24266
cx: &mut Context<'_>,
25267
buf: &mut [u8],
26268
) -> Poll<io::Result<usize>> {
27-
let this = self.project();
28-
this.reader.poll_read(cx, buf)
269+
match self.state {
270+
State::Init => {
271+
// Initial read
272+
self.poll_read_init(cx, buf)
273+
}
274+
State::Chunk(current, len) => {
275+
// reading a chunk
276+
self.poll_read_chunk(cx, buf, current, len)
277+
}
278+
State::Trailer => {
279+
// reading the trailer headers
280+
self.poll_read_trailer(cx, buf)
281+
}
282+
State::Done => Poll::Ready(Ok(0)),
283+
}
29284
}
30285
}
31286

32-
impl<R: BufRead + Unpin + Send + 'static> BufRead for ChunkedDecoder<R> {
33-
#[allow(missing_doc_code_examples)]
34-
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'_ [u8]>> {
35-
let this = self.project();
36-
this.reader.poll_fill_buf(cx)
287+
/// A semantically explicit slice of a buffer.
288+
#[derive(Eq, PartialEq, Debug, Copy, Clone, Default)]
289+
struct Position {
290+
start: usize,
291+
end: usize,
292+
}
293+
294+
impl Position {
295+
const fn new(start: usize, end: usize) -> Position {
296+
Position { start, end }
297+
}
298+
299+
fn len(&self) -> usize {
300+
self.end - self.start
301+
}
302+
303+
fn is_zero(&self) -> bool {
304+
self.start == 0 && self.end == 0
37305
}
306+
}
307+
308+
enum DecodeResult {
309+
Some {
310+
/// How much was read
311+
read: usize,
312+
/// Remaining data.
313+
buffer: Block<'static>,
314+
new_pos: Position,
315+
new_state: State,
316+
},
317+
None(Block<'static>),
318+
}
319+
320+
#[derive(Debug)]
321+
enum State {
322+
Init,
323+
Chunk(u64, u64),
324+
Trailer,
325+
Done,
326+
}
38327

39-
fn consume(self: Pin<&mut Self>, amt: usize) {
40-
let this = self.project();
41-
this.reader.consume(amt);
328+
impl fmt::Debug for DecodeResult {
329+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330+
match self {
331+
DecodeResult::Some {
332+
read,
333+
buffer,
334+
new_pos,
335+
new_state,
336+
} => f
337+
.debug_struct("DecodeResult::Some")
338+
.field("read", read)
339+
.field("block", &buffer.len())
340+
.field("new_pos", new_pos)
341+
.field("new_state", new_state)
342+
.finish(),
343+
DecodeResult::None(block) => write!(f, "DecodeResult::None({})", block.len()),
344+
}
42345
}
43346
}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#![forbid(unsafe_code, rust_2018_idioms)]
3232
#![deny(missing_debug_implementations, nonstandard_style)]
3333
// #![warn(missing_docs, missing_doc_code_examples, unreachable_pub)]
34-
#![cfg_attr(test, deny(warnings))]
34+
// #![cfg_attr(test, deny(warnings))]
3535

3636
/// The maximum amount of headers parsed on the server.
3737
const MAX_HEADERS: usize = 128;

0 commit comments

Comments
 (0)