Skip to content

Commit a2afd7c

Browse files
handle chunked decoding in the client and fix some issues with it
1 parent e4d2911 commit a2afd7c

File tree

6 files changed

+187
-105
lines changed

6 files changed

+187
-105
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pin-project-lite = "0.1.1"
2020
byte-pool = "0.2.1"
2121
lazy_static = "1.4.0"
2222
futures-core = "0.3.1"
23+
log = "0.4"
2324

2425
[dev-dependencies]
2526
pretty_assertions = "0.6.1"

src/chunked.rs

Lines changed: 107 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl<R: Read> ChunkedDecoder<R> {
5959

6060
fn decode_init(buffer: Block<'static>, pos: &Position) -> io::Result<DecodeResult> {
6161
dbg!(pos);
62-
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]).unwrap());
62+
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]));
6363

6464
use httparse::Status;
6565
match httparse::parse_chunk_size(&buffer[pos.start..pos.end]) {
@@ -91,7 +91,7 @@ fn decode_init(buffer: Block<'static>, pos: &Position) -> io::Result<DecodeResul
9191

9292
fn decode_chunk_end(buffer: Block<'static>, pos: &Position) -> io::Result<DecodeResult> {
9393
dbg!(pos);
94-
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]).unwrap());
94+
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]));
9595

9696
if pos.len() < 2 {
9797
return Ok(DecodeResult::None(buffer));
@@ -120,7 +120,7 @@ fn decode_trailer(buffer: Block<'static>, pos: &Position) -> io::Result<DecodeRe
120120
// read headers
121121
let mut headers = [httparse::EMPTY_HEADER; 16];
122122
dbg!(pos);
123-
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]).unwrap());
123+
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]));
124124

125125
match httparse::parse_headers(&buffer[pos.start..pos.end], &mut headers) {
126126
Ok(Status::Complete((used, headers))) => {
@@ -162,61 +162,100 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
162162
current: u64,
163163
len: u64,
164164
) -> io::Result<DecodeResult> {
165+
dbg!("poll_read_chunk");
165166
dbg!(pos);
166-
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]).unwrap());
167+
dbg!(std::str::from_utf8(&buffer[pos.start..pos.end]));
168+
dbg!(len);
169+
dbg!(current);
167170

168171
let mut new_pos = pos.clone();
172+
let remaining = (len - current) as usize;
173+
let mut to_read = std::cmp::min(remaining, buf.len());
174+
175+
let mut new_current = current;
176+
let mut start = 0;
177+
178+
// first drain the buffer
179+
if new_pos.len() > 0 {
180+
let to_read_buf = std::cmp::min(to_read, pos.len());
181+
dbg!(to_read_buf);
182+
dbg!(to_read);
183+
dbg!(new_pos);
184+
dbg!(std::str::from_utf8(
185+
&buffer[new_pos.start..new_pos.start + to_read_buf]
186+
));
187+
buf[..to_read_buf].copy_from_slice(&buffer[new_pos.start..new_pos.start + to_read_buf]);
188+
189+
// to_read -= to_read_buf;
190+
new_pos.start += to_read_buf;
191+
new_current += to_read_buf as u64;
192+
// start += to_read_buf;
193+
194+
let read = new_current as usize - current as usize;
195+
let new_state = if new_current == len {
196+
State::ChunkEnd
197+
} else {
198+
State::Chunk(new_current, len)
199+
};
169200

170-
loop {
171-
let remaining = (len - current) as usize;
172-
let mut to_read = std::cmp::min(remaining, buf.len());
173-
let mut new_current = current;
174-
let mut start = 0;
175-
176-
// first drain the buffer
177-
if pos.len() > 0 {
178-
let to_read_buf = std::cmp::min(to_read, pos.len());
179-
dbg!(std::str::from_utf8(&buffer[pos.start..pos.start + to_read_buf]).unwrap());
180-
buf[..to_read_buf].copy_from_slice(&buffer[pos.start..pos.start + to_read_buf]);
181-
to_read -= to_read_buf;
182-
new_pos.start += to_read_buf;
183-
new_current += to_read_buf as u64;
184-
start += to_read_buf;
185-
}
186-
187-
if to_read > 0 {
188-
dbg!("reading");
189-
dbg!(to_read);
190-
dbg!(start);
191-
let n = match Pin::new(&mut self.inner)
192-
.poll_read(cx, &mut buf[start..start + to_read])
193-
{
194-
Poll::Ready(val) => val?,
195-
Poll::Pending => {
196-
return Ok(DecodeResult::Some {
197-
read: 0,
198-
new_state: self.state.clone(),
199-
new_pos,
200-
buffer,
201-
pending: true,
202-
})
203-
}
204-
};
201+
return Ok(DecodeResult::Some {
202+
read,
203+
new_state,
204+
new_pos,
205+
buffer,
206+
pending: false,
207+
});
208+
}
205209

206-
new_current += n as u64;
207-
}
210+
while to_read > 0 {
211+
dbg!("poll_read_chunk_loop");
212+
213+
dbg!(to_read);
214+
dbg!(start);
215+
dbg!(new_pos);
216+
dbg!(new_current);
217+
218+
dbg!("reading");
219+
let n = match Pin::new(&mut self.inner).poll_read(cx, &mut buf[start..start + to_read])
220+
{
221+
Poll::Ready(val) => val?,
222+
Poll::Pending => {
223+
dbg!("pending");
224+
return Ok(DecodeResult::Some {
225+
read: 0,
226+
new_state: State::Chunk(new_current, len),
227+
new_pos,
228+
buffer,
229+
pending: true,
230+
});
231+
}
232+
};
208233

209-
if new_current == len {
210-
return Ok(DecodeResult::Some {
211-
read: new_current as usize - current as usize,
212-
new_state: State::ChunkEnd,
213-
new_pos,
214-
buffer,
215-
pending: false,
216-
});
217-
}
218-
self.state = State::Chunk(new_current, len);
234+
dbg!(n);
235+
to_read -= n;
236+
new_current += n as u64;
237+
start += n;
219238
}
239+
240+
let read = new_current as usize - current as usize;
241+
let new_state = if new_current == len {
242+
State::ChunkEnd
243+
} else {
244+
State::Chunk(new_current, len)
245+
};
246+
dbg!(read);
247+
dbg!(&new_state);
248+
dbg!(new_current);
249+
dbg!(len);
250+
dbg!(current);
251+
252+
Ok(DecodeResult::Some {
253+
read,
254+
new_state,
255+
new_pos,
256+
buffer,
257+
pending: false,
258+
})
220259
}
221260

222261
fn poll_read_inner(
@@ -226,6 +265,8 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
226265
pos: &Position,
227266
buf: &mut [u8],
228267
) -> io::Result<DecodeResult> {
268+
dbg!(&self.state);
269+
229270
match self.state {
230271
State::Init => {
231272
// Initial read
@@ -289,8 +330,13 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
289330

290331
let mut n = std::mem::replace(&mut this.current, Position::default());
291332
let buffer = std::mem::replace(&mut this.buffer, POOL.alloc(INITIAL_CAPACITY));
292-
let mut needs_read = true;
333+
let mut needs_read = if let State::Chunk(_, _) = this.state {
334+
false // Do not attempt to fill the buffer when we are reading a chunk
335+
} else {
336+
true
337+
};
293338

339+
dbg!(n);
294340
let mut buffer = if n.len() > 0 && this.initial_decode {
295341
match this.poll_read_inner(cx, buffer, &n, buf)? {
296342
DecodeResult::Some {
@@ -329,6 +375,7 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
329375
dbg!("loop");
330376
if n.len() >= buffer.capacity() {
331377
if buffer.capacity() + 1024 <= MAX_CAPACITY {
378+
dbg!("resizing buffer");
332379
buffer.realloc(buffer.capacity() + 1024);
333380
} else {
334381
std::mem::replace(&mut this.buffer, buffer);
@@ -341,6 +388,7 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
341388
}
342389

343390
if needs_read {
391+
dbg!("reading");
344392
let bytes_read = match Pin::new(&mut this.inner).poll_read(cx, &mut buffer[n.end..])
345393
{
346394
Poll::Ready(result) => result?,
@@ -354,6 +402,7 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
354402
return Poll::Pending;
355403
}
356404
};
405+
dbg!(bytes_read);
357406
n.end += bytes_read;
358407
}
359408
match this.poll_read_inner(cx, buffer, &n, buf)? {
@@ -364,6 +413,7 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
364413
new_state,
365414
pending,
366415
} => {
416+
dbg!("some");
367417
// current buffer might now contain more data inside, so we need to attempt
368418
// to decode it next time
369419
this.initial_decode = true;
@@ -386,6 +436,8 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
386436
continue;
387437
}
388438
DecodeResult::None(buf) => {
439+
dbg!("none");
440+
389441
buffer = buf;
390442

391443
if this.buffer.is_empty() || n.is_zero() {
@@ -394,17 +446,10 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
394446

395447
std::mem::replace(&mut this.buffer, buffer);
396448
this.current = n;
397-
return Poll::Ready(Ok(0));
398-
} else if n.len() == 0 {
399-
// "logical buffer" is empty, there is nothing to decode on the next step
400-
this.initial_decode = false;
401449

402-
std::mem::replace(&mut this.buffer, buffer);
403-
this.current = n;
404-
return Poll::Ready(Err(io::Error::new(
405-
io::ErrorKind::UnexpectedEof,
406-
"bytes remaining in stream",
407-
)));
450+
return Poll::Ready(Ok(0));
451+
} else {
452+
needs_read = true;
408453
}
409454
}
410455
}

src/client.rs

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ use async_std::prelude::*;
55
use async_std::task::{Context, Poll};
66
use futures_core::ready;
77
use http_types::{
8-
headers::{HeaderName, HeaderValue, CONTENT_LENGTH},
8+
headers::{HeaderName, HeaderValue, CONTENT_LENGTH, TRANSFER_ENCODING},
99
Body, Request, Response, StatusCode,
1010
};
1111

1212
use std::pin::Pin;
1313
use std::str::FromStr;
1414

15+
use crate::chunked::ChunkedDecoder;
16+
use crate::error::HttpError;
1517
use crate::{Exception, MAX_HEADERS};
1618

1719
/// An HTTP encoder.
@@ -60,12 +62,23 @@ pub async fn encode(req: Request) -> Result<Encoder, std::io::Error> {
6062
}
6163

6264
let val = format!("{} {} HTTP/1.1\r\n", req.method(), url);
65+
log::trace!("> {}", &val);
66+
buf.write_all(val.as_bytes()).await?;
67+
68+
let val = format!(
69+
"Host: {}\r\n",
70+
req.url()
71+
.host_str()
72+
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Missing hostname"))?
73+
);
74+
log::trace!("> {}", &val);
6375
buf.write_all(val.as_bytes()).await?;
6476

6577
// If the body isn't streaming, we can set the content-length ahead of time. Else we need to
6678
// send all items in chunks.
6779
if let Some(len) = req.len() {
6880
let val = format!("Content-Length: {}\r\n", len);
81+
log::trace!("> {}", &val);
6982
buf.write_all(val.as_bytes()).await?;
7083
} else {
7184
// write!(&mut buf, "Transfer-Encoding: chunked\r\n")?;
@@ -76,6 +89,7 @@ pub async fn encode(req: Request) -> Result<Encoder, std::io::Error> {
7689
for (header, values) in req.iter() {
7790
for value in values.iter() {
7891
let val = format!("{}: {}\r\n", header, value);
92+
log::trace!("> {}", &val);
7993
buf.write_all(val.as_bytes()).await?;
8094
}
8195
}
@@ -131,21 +145,39 @@ where
131145
res.insert_header(name, value)?;
132146
}
133147

134-
// Process the body if `Content-Length` was passed.
135-
if let Some(content_length) = res.header(&CONTENT_LENGTH) {
136-
let length = content_length
137-
.last()
138-
.unwrap()
139-
.as_str()
140-
.parse::<usize>()
141-
.ok();
142-
143-
if let Some(len) = length {
144-
res.set_body(Body::from_reader(reader, Some(len)));
145-
} else {
146-
return Err("Invalid value for Content-Length".into());
148+
let content_length = res.header(&CONTENT_LENGTH);
149+
let transfer_encoding = res.header(&TRANSFER_ENCODING);
150+
151+
if content_length.is_some() && transfer_encoding.is_some() {
152+
// This is always an error.
153+
return Err(HttpError::UnexpectedContentLengthHeader.into());
154+
}
155+
156+
// Check for Transfer-Encoding
157+
match transfer_encoding {
158+
Some(encoding) if !encoding.is_empty() => {
159+
if encoding.last().unwrap().as_str() == "chunked" {
160+
res.set_body(Body::from_reader(
161+
BufReader::new(ChunkedDecoder::new(reader)),
162+
None,
163+
));
164+
return Ok(res);
165+
}
166+
// Fall through to Content-Length
147167
}
148-
};
168+
_ => {
169+
// Fall through to Content-Length
170+
}
171+
}
172+
173+
// Check for Content-Length.
174+
match content_length {
175+
Some(len) => {
176+
let len = len.last().unwrap().as_str().parse::<usize>()?;
177+
res.set_body(Body::from_reader(reader.take(len as u64), Some(len)));
178+
}
179+
None => {}
180+
}
149181

150182
// Return the response.
151183
Ok(res)

0 commit comments

Comments
 (0)