Skip to content

Commit 0d1ed82

Browse files
authored
Merge pull request #84 from http-rs/rework-chunked-encoder
Rework chunked encoder
2 parents eb4ebf2 + 1c4753d commit 0d1ed82

File tree

9 files changed

+310
-175
lines changed

9 files changed

+310
-175
lines changed

examples/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async fn accept(addr: String, stream: TcpStream) -> http_types::Result<()> {
3030
async_h1::accept(&addr, stream.clone(), |_req| async move {
3131
let mut res = Response::new(StatusCode::Ok);
3232
res.insert_header("Content-Type", "text/plain")?;
33-
res.set_body("Hello");
33+
res.set_body("Hello world");
3434
Ok(res)
3535
})
3636
.await?;
File renamed without changes.

src/chunked/encoder.rs

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
use std::pin::Pin;
2+
3+
use async_std::io;
4+
use async_std::io::prelude::*;
5+
use async_std::task::{Context, Poll};
6+
use http_types::Response;
7+
8+
const CR: u8 = b'\r';
9+
const LF: u8 = b'\n';
10+
const CRLF_LEN: usize = 2;
11+
12+
/// The encoder state.
13+
#[derive(Debug)]
14+
enum State {
15+
/// Starting state.
16+
Start,
17+
/// Streaming out chunks.
18+
EncodeChunks,
19+
/// No more chunks to stream, mark the end.
20+
EndOfChunks,
21+
/// Receiving trailers from a channel.
22+
ReceiveTrailers,
23+
/// Streaming out trailers, if we received any.
24+
EncodeTrailers,
25+
/// Writing out the final CRLF.
26+
EndOfStream,
27+
/// The stream has finished.
28+
End,
29+
}
30+
31+
/// An encoder for chunked encoding.
32+
#[derive(Debug)]
33+
pub(crate) struct ChunkedEncoder {
34+
/// How many bytes we've written to the buffer so far.
35+
bytes_written: usize,
36+
/// The internal encoder state.
37+
state: State,
38+
}
39+
40+
impl ChunkedEncoder {
41+
/// Create a new instance.
42+
pub(crate) fn new() -> Self {
43+
Self {
44+
state: State::Start,
45+
bytes_written: 0,
46+
}
47+
}
48+
49+
/// Encode an AsyncBufRead using "chunked" framing. This is used for streams
50+
/// whose length is not known up front.
51+
///
52+
/// # Format
53+
///
54+
/// Each "chunk" uses the following encoding:
55+
///
56+
/// ```txt
57+
/// 1. {byte length of `data` as hex}\r\n
58+
/// 2. {data}\r\n
59+
/// ```
60+
///
61+
/// A chunk stream is finalized by appending the following:
62+
///
63+
/// ```txt
64+
/// 1. 0\r\n
65+
/// 2. {trailing header}\r\n (can be repeated)
66+
/// 3. \r\n
67+
/// ```
68+
pub(crate) fn encode(
69+
&mut self,
70+
res: &mut Response,
71+
cx: &mut Context<'_>,
72+
buf: &mut [u8],
73+
) -> Poll<io::Result<usize>> {
74+
self.bytes_written = 0;
75+
match self.state {
76+
State::Start => self.init(res, cx, buf),
77+
State::EncodeChunks => self.encode_chunks(res, cx, buf),
78+
State::EndOfChunks => self.encode_chunks_eos(res, cx, buf),
79+
State::ReceiveTrailers => self.encode_trailers(res, cx, buf),
80+
State::EncodeTrailers => self.encode_trailers(res, cx, buf),
81+
State::EndOfStream => self.encode_eos(cx, buf),
82+
State::End => Poll::Ready(Ok(0)),
83+
}
84+
}
85+
86+
/// Switch the internal state to a new state.
87+
fn set_state(&mut self, state: State) {
88+
use State::*;
89+
log::trace!("ChunkedEncoder state: {:?} -> {:?}", self.state, state);
90+
91+
#[cfg(debug_assertions)]
92+
match self.state {
93+
Start => assert!(matches!(state, EncodeChunks)),
94+
EncodeChunks => assert!(matches!(state, EndOfChunks)),
95+
EndOfChunks => assert!(matches!(state, ReceiveTrailers)),
96+
ReceiveTrailers => assert!(matches!(state, EncodeTrailers | EndOfStream)),
97+
EncodeTrailers => assert!(matches!(state, EndOfStream)),
98+
EndOfStream => assert!(matches!(state, End)),
99+
End => panic!("No state transitions allowed after the stream has ended"),
100+
}
101+
102+
self.state = state;
103+
}
104+
105+
/// Init encoding.
106+
fn init(
107+
&mut self,
108+
res: &mut Response,
109+
cx: &mut Context<'_>,
110+
buf: &mut [u8],
111+
) -> Poll<io::Result<usize>> {
112+
self.set_state(State::EncodeChunks);
113+
self.encode_chunks(res, cx, buf)
114+
}
115+
116+
/// Stream out data using chunked encoding.
117+
fn encode_chunks(
118+
&mut self,
119+
mut res: &mut Response,
120+
cx: &mut Context<'_>,
121+
buf: &mut [u8],
122+
) -> Poll<io::Result<usize>> {
123+
// Get bytes from the underlying stream. If the stream is not ready yet,
124+
// return the header bytes if we have any.
125+
let src = match Pin::new(&mut res).poll_fill_buf(cx) {
126+
Poll::Ready(Ok(n)) => n,
127+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
128+
Poll::Pending => match self.bytes_written {
129+
0 => return Poll::Pending,
130+
n => return Poll::Ready(Ok(n)),
131+
},
132+
};
133+
134+
// If the stream doesn't have any more bytes left to read we're done
135+
// sending chunks and it's time to move on.
136+
if src.len() == 0 {
137+
self.set_state(State::EndOfChunks);
138+
return self.encode_chunks_eos(res, cx, buf);
139+
}
140+
141+
// Each chunk is prefixed with the length of the data in hex, then a
142+
// CRLF, then the content, then another CRLF. Calculate how many bytes
143+
// each part should be.
144+
let buf_len = buf.len().checked_sub(self.bytes_written).unwrap_or(0);
145+
let msg_len = src.len().min(buf_len);
146+
// Calculate the max char count encoding the `len_prefix` statement
147+
// as hex would take. This is done by rounding up `log16(amt + 1)`.
148+
let hex_len = ((msg_len + 1) as f64).log(16.0).ceil() as usize;
149+
let framing_len = hex_len + CRLF_LEN * 2;
150+
let buf_upper = buf_len.checked_sub(framing_len).unwrap_or(0);
151+
let msg_len = msg_len.min(buf_upper);
152+
let len_prefix = format!("{:X}", msg_len).into_bytes();
153+
154+
// Request a new buf if the current buf is too small to write any data
155+
// into. Empty frames should only be sent to mark the end of a stream.
156+
if buf.len() <= framing_len {
157+
cx.waker().wake_by_ref();
158+
return Poll::Ready(Ok(self.bytes_written));
159+
}
160+
161+
// Write our frame header to the buffer.
162+
let lower = self.bytes_written;
163+
let upper = self.bytes_written + len_prefix.len();
164+
buf[lower..upper].copy_from_slice(&len_prefix);
165+
buf[upper] = CR;
166+
buf[upper + 1] = LF;
167+
self.bytes_written += len_prefix.len() + 2;
168+
169+
// Copy the bytes from our source into the output buffer.
170+
let lower = self.bytes_written;
171+
let upper = self.bytes_written + msg_len;
172+
buf[lower..upper].copy_from_slice(&src[0..msg_len]);
173+
Pin::new(&mut res).consume(msg_len);
174+
self.bytes_written += msg_len;
175+
176+
// Finalize the chunk with a closing CRLF.
177+
let idx = self.bytes_written;
178+
buf[idx] = CR;
179+
buf[idx + 1] = LF;
180+
self.bytes_written += CRLF_LEN;
181+
182+
// Finally return how many bytes we've written to the buffer.
183+
log::trace!("sending {} bytes", self.bytes_written);
184+
Poll::Ready(Ok(self.bytes_written))
185+
}
186+
187+
fn encode_chunks_eos(
188+
&mut self,
189+
res: &mut Response,
190+
cx: &mut Context<'_>,
191+
buf: &mut [u8],
192+
) -> Poll<io::Result<usize>> {
193+
// Request a new buf if the current buf is too small to write into.
194+
if buf.len() < 3 {
195+
cx.waker().wake_by_ref();
196+
return Poll::Ready(Ok(self.bytes_written));
197+
}
198+
199+
// Write out the final empty chunk
200+
let idx = self.bytes_written;
201+
buf[idx] = b'0';
202+
buf[idx + 1] = CR;
203+
buf[idx + 2] = LF;
204+
self.bytes_written += 1 + CRLF_LEN;
205+
206+
self.set_state(State::ReceiveTrailers);
207+
return self.receive_trailers(res, cx, buf);
208+
}
209+
210+
/// Receive trailers sent to the response, and store them in an internal
211+
/// buffer.
212+
fn receive_trailers(
213+
&mut self,
214+
res: &mut Response,
215+
cx: &mut Context<'_>,
216+
buf: &mut [u8],
217+
) -> Poll<io::Result<usize>> {
218+
// TODO: actually wait for trailers to be received.
219+
self.set_state(State::EncodeTrailers);
220+
self.encode_trailers(res, cx, buf)
221+
}
222+
223+
/// Send trailers to the buffer.
224+
fn encode_trailers(
225+
&mut self,
226+
_res: &mut Response,
227+
cx: &mut Context<'_>,
228+
buf: &mut [u8],
229+
) -> Poll<io::Result<usize>> {
230+
// TODO: actually encode trailers here.
231+
self.set_state(State::EndOfStream);
232+
self.encode_eos(cx, buf)
233+
}
234+
235+
/// Encode the end of the stream.
236+
fn encode_eos(&mut self, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
237+
let idx = self.bytes_written;
238+
// Write the final CRLF
239+
buf[idx] = CR;
240+
buf[idx + 1] = LF;
241+
self.bytes_written += CRLF_LEN;
242+
243+
self.set_state(State::End);
244+
return Poll::Ready(Ok(self.bytes_written));
245+
}
246+
}

src/chunked/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod decoder;
2+
mod encoder;
3+
4+
pub(crate) use decoder::ChunkedDecoder;
5+
pub(crate) use encoder::ChunkedEncoder;

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
//! for i in 0usize..2 {
4141
//! println!("making request {}/2", i + 1);
4242
//! let url = Url::parse(&format!("http://{}/foo", peer_addr)).unwrap();
43-
//! let req = Request::new(Method::Get, dbg!(url));
43+
//! let req = Request::new(Method::Get, url);
4444
//! let res = client::connect(stream.clone(), req).await?;
4545
//! println!("{:?}", res);
4646
//! }

0 commit comments

Comments
 (0)