Skip to content

Commit 32b8bea

Browse files
committed
new encoder for both client and server
1 parent 3cbf6ea commit 32b8bea

File tree

11 files changed

+262
-595
lines changed

11 files changed

+262
-595
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ edition = "2018"
1515
httparse = "1.3.3"
1616
async-std = { version = "1.6.0", features = ["unstable"] }
1717
http-types = "2.0.0"
18-
pin-project-lite = "0.1.1"
1918
byte-pool = "0.2.1"
2019
lazy_static = "1.4.0"
2120
futures-core = "0.3.1"
2221
log = "0.4"
22+
pin-project = "1.0.2"
2323

2424
[dev-dependencies]
2525
pretty_assertions = "0.6.1"

src/body_encoder.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use std::io;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
use async_std::io::Read;
6+
use http_types::Body;
7+
use pin_project::pin_project;
8+
9+
use crate::chunked::ChunkedEncoder;
10+
11+
#[pin_project(project=BodyEncoderProjection)]
12+
#[derive(Debug)]
13+
pub(crate) enum BodyEncoder {
14+
Chunked(#[pin] ChunkedEncoder<Body>),
15+
Fixed(#[pin] Body),
16+
}
17+
18+
impl BodyEncoder {
19+
pub(crate) fn new(body: Body) -> Self {
20+
match body.len() {
21+
Some(_) => Self::Fixed(body),
22+
None => Self::Chunked(ChunkedEncoder::new(body)),
23+
}
24+
}
25+
}
26+
27+
impl Read for BodyEncoder {
28+
fn poll_read(
29+
self: Pin<&mut Self>,
30+
cx: &mut Context<'_>,
31+
buf: &mut [u8],
32+
) -> Poll<io::Result<usize>> {
33+
match self.project() {
34+
BodyEncoderProjection::Chunked(encoder) => encoder.poll_read(cx, buf),
35+
BodyEncoderProjection::Fixed(body) => body.poll_read(cx, buf),
36+
}
37+
}
38+
}

src/chunked/encoder.rs

Lines changed: 33 additions & 226 deletions
Original file line numberDiff line numberDiff line change
@@ -3,250 +3,57 @@ use std::pin::Pin;
33
use async_std::io;
44
use async_std::io::prelude::*;
55
use async_std::task::{Context, Poll};
6-
use http_types::Response;
7-
8-
const CR: u8 = b'\r';
9-
const LF: u8 = b'\n';
10-
const CRLF_LEN: usize = 2;
11-
12-
/// The encoder state.
13-
#[derive(Debug)]
14-
enum State {
15-
/// Starting state.
16-
Start,
17-
/// Streaming out chunks.
18-
EncodeChunks,
19-
/// No more chunks to stream, mark the end.
20-
EndOfChunks,
21-
/// Receiving trailers from a channel.
22-
ReceiveTrailers,
23-
/// Streaming out trailers, if we received any.
24-
EncodeTrailers,
25-
/// Writing out the final CRLF.
26-
EndOfStream,
27-
/// The stream has finished.
28-
End,
29-
}
6+
use futures_core::ready;
307

318
/// An encoder for chunked encoding.
329
#[derive(Debug)]
33-
pub(crate) struct ChunkedEncoder {
34-
/// How many bytes we've written to the buffer so far.
35-
bytes_written: usize,
36-
/// The internal encoder state.
37-
state: State,
10+
pub(crate) struct ChunkedEncoder<R> {
11+
reader: R,
12+
done: bool,
3813
}
3914

40-
impl ChunkedEncoder {
15+
impl<R: Read + Unpin> ChunkedEncoder<R> {
4116
/// Create a new instance.
42-
pub(crate) fn new() -> Self {
17+
pub(crate) fn new(reader: R) -> Self {
4318
Self {
44-
state: State::Start,
45-
bytes_written: 0,
46-
}
47-
}
48-
49-
/// Encode an AsyncBufRead using "chunked" framing. This is used for streams
50-
/// whose length is not known up front.
51-
///
52-
/// # Format
53-
///
54-
/// Each "chunk" uses the following encoding:
55-
///
56-
/// ```txt
57-
/// 1. {byte length of `data` as hex}\r\n
58-
/// 2. {data}\r\n
59-
/// ```
60-
///
61-
/// A chunk stream is finalized by appending the following:
62-
///
63-
/// ```txt
64-
/// 1. 0\r\n
65-
/// 2. {trailing header}\r\n (can be repeated)
66-
/// 3. \r\n
67-
/// ```
68-
pub(crate) fn encode(
69-
&mut self,
70-
res: &mut Response,
71-
cx: &mut Context<'_>,
72-
buf: &mut [u8],
73-
) -> Poll<io::Result<usize>> {
74-
self.bytes_written = 0;
75-
let res = self.run(res, cx, buf);
76-
log::trace!("ChunkedEncoder {} bytes written", self.bytes_written);
77-
res
78-
}
79-
80-
/// Execute the right method for the current state.
81-
fn run(
82-
&mut self,
83-
res: &mut Response,
84-
cx: &mut Context<'_>,
85-
buf: &mut [u8],
86-
) -> Poll<io::Result<usize>> {
87-
match self.state {
88-
State::Start => self.dispatch(State::EncodeChunks, res, cx, buf),
89-
State::EncodeChunks => self.encode_chunks(res, cx, buf),
90-
State::EndOfChunks => self.encode_chunks_eos(res, cx, buf),
91-
State::ReceiveTrailers => self.receive_trailers(res, cx, buf),
92-
State::EncodeTrailers => self.encode_trailers(res, cx, buf),
93-
State::EndOfStream => self.encode_eos(res, cx, buf),
94-
State::End => Poll::Ready(Ok(self.bytes_written)),
19+
reader,
20+
done: false,
9521
}
9622
}
23+
}
9724

98-
/// Switch the internal state to a new state.
99-
fn dispatch(
100-
&mut self,
101-
state: State,
102-
res: &mut Response,
103-
cx: &mut Context<'_>,
104-
buf: &mut [u8],
105-
) -> Poll<io::Result<usize>> {
106-
use State::*;
107-
log::trace!("ChunkedEncoder state: {:?} -> {:?}", self.state, state);
108-
109-
#[cfg(debug_assertions)]
110-
match self.state {
111-
Start => assert!(matches!(state, EncodeChunks)),
112-
EncodeChunks => assert!(matches!(state, EndOfChunks)),
113-
EndOfChunks => assert!(matches!(state, ReceiveTrailers)),
114-
ReceiveTrailers => assert!(matches!(state, EncodeTrailers | EndOfStream)),
115-
EncodeTrailers => assert!(matches!(state, EndOfStream)),
116-
EndOfStream => assert!(matches!(state, End)),
117-
End => panic!("No state transitions allowed after the ChunkedEncoder has ended"),
118-
}
119-
120-
self.state = state;
121-
self.run(res, cx, buf)
25+
fn max_bytes_to_read(buf_len: usize) -> usize {
26+
if buf_len < 6 {
27+
panic!("buffers of length {} are too small for this implementation. if this is a problem for you, please open an issue", buf_len);
12228
}
29+
let max_bytes_of_hex_framing = // the maximum number of bytes that the hex representation of remaining bytes might take
30+
(((buf_len - 5) as f64).log2() / 4f64).floor() as usize;
31+
buf_len - 5 - max_bytes_of_hex_framing
32+
}
12333

124-
/// Stream out data using chunked encoding.
125-
fn encode_chunks(
126-
&mut self,
127-
mut res: &mut Response,
34+
impl<R: Read + Unpin> Read for ChunkedEncoder<R> {
35+
fn poll_read(
36+
mut self: Pin<&mut Self>,
12837
cx: &mut Context<'_>,
12938
buf: &mut [u8],
13039
) -> Poll<io::Result<usize>> {
131-
// Get bytes from the underlying stream. If the stream is not ready yet,
132-
// return the header bytes if we have any.
133-
let src = match Pin::new(&mut res).poll_fill_buf(cx) {
134-
Poll::Ready(Ok(n)) => n,
135-
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
136-
Poll::Pending => match self.bytes_written {
137-
0 => return Poll::Pending,
138-
n => return Poll::Ready(Ok(n)),
139-
},
140-
};
141-
142-
// If the stream doesn't have any more bytes left to read we're done
143-
// sending chunks and it's time to move on.
144-
if src.len() == 0 {
145-
return self.dispatch(State::EndOfChunks, res, cx, buf);
40+
if self.done {
41+
return Poll::Ready(Ok(0));
14642
}
43+
let mut reader = &mut self.reader;
14744

148-
// Each chunk is prefixed with the length of the data in hex, then a
149-
// CRLF, then the content, then another CRLF. Calculate how many bytes
150-
// each part should be.
151-
let buf_len = buf.len().saturating_sub(self.bytes_written);
152-
let msg_len = src.len().min(buf_len);
153-
// Calculate the max char count encoding the `len_prefix` statement
154-
// as hex would take. This is done by rounding up `log16(amt + 1)`.
155-
let hex_len = ((msg_len + 1) as f64).log(16.0).ceil() as usize;
156-
let framing_len = hex_len + CRLF_LEN * 2;
157-
let buf_upper = buf_len.saturating_sub(framing_len);
158-
let msg_len = msg_len.min(buf_upper);
159-
let len_prefix = format!("{:X}", msg_len).into_bytes();
45+
let max_bytes_to_read = max_bytes_to_read(buf.len());
16046

161-
// Request a new buf if the current buf is too small to write any data
162-
// into. Empty frames should only be sent to mark the end of a stream.
163-
if buf.len() <= framing_len {
164-
cx.waker().wake_by_ref();
165-
return Poll::Ready(Ok(self.bytes_written));
47+
let bytes = ready!(Pin::new(&mut reader).poll_read(cx, &mut buf[..max_bytes_to_read]))?;
48+
if bytes == 0 {
49+
self.done = true;
16650
}
167-
168-
// Write our frame header to the buffer.
169-
let lower = self.bytes_written;
170-
let upper = self.bytes_written + len_prefix.len();
171-
buf[lower..upper].copy_from_slice(&len_prefix);
172-
buf[upper] = CR;
173-
buf[upper + 1] = LF;
174-
self.bytes_written += len_prefix.len() + 2;
175-
176-
// Copy the bytes from our source into the output buffer.
177-
let lower = self.bytes_written;
178-
let upper = self.bytes_written + msg_len;
179-
buf[lower..upper].copy_from_slice(&src[0..msg_len]);
180-
Pin::new(&mut res).consume(msg_len);
181-
self.bytes_written += msg_len;
182-
183-
// Finalize the chunk with a closing CRLF.
184-
let idx = self.bytes_written;
185-
buf[idx] = CR;
186-
buf[idx + 1] = LF;
187-
self.bytes_written += CRLF_LEN;
188-
189-
// Finally return how many bytes we've written to the buffer.
190-
Poll::Ready(Ok(self.bytes_written))
191-
}
192-
193-
fn encode_chunks_eos(
194-
&mut self,
195-
res: &mut Response,
196-
cx: &mut Context<'_>,
197-
buf: &mut [u8],
198-
) -> Poll<io::Result<usize>> {
199-
// Request a new buf if the current buf is too small to write into.
200-
if buf.len() < 3 {
201-
cx.waker().wake_by_ref();
202-
return Poll::Ready(Ok(self.bytes_written));
203-
}
204-
205-
// Write out the final empty chunk
206-
let idx = self.bytes_written;
207-
buf[idx] = b'0';
208-
buf[idx + 1] = CR;
209-
buf[idx + 2] = LF;
210-
self.bytes_written += 1 + CRLF_LEN;
211-
212-
self.dispatch(State::ReceiveTrailers, res, cx, buf)
213-
}
214-
215-
/// Receive trailers sent to the response, and store them in an internal
216-
/// buffer.
217-
fn receive_trailers(
218-
&mut self,
219-
res: &mut Response,
220-
cx: &mut Context<'_>,
221-
buf: &mut [u8],
222-
) -> Poll<io::Result<usize>> {
223-
// TODO: actually wait for trailers to be received.
224-
self.dispatch(State::EncodeTrailers, res, cx, buf)
225-
}
226-
227-
/// Send trailers to the buffer.
228-
fn encode_trailers(
229-
&mut self,
230-
res: &mut Response,
231-
cx: &mut Context<'_>,
232-
buf: &mut [u8],
233-
) -> Poll<io::Result<usize>> {
234-
// TODO: actually encode trailers here.
235-
self.dispatch(State::EndOfStream, res, cx, buf)
236-
}
237-
238-
/// Encode the end of the stream.
239-
fn encode_eos(
240-
&mut self,
241-
res: &mut Response,
242-
cx: &mut Context<'_>,
243-
buf: &mut [u8],
244-
) -> Poll<io::Result<usize>> {
245-
let idx = self.bytes_written;
246-
// Write the final CRLF
247-
buf[idx] = CR;
248-
buf[idx + 1] = LF;
249-
self.bytes_written += CRLF_LEN;
250-
self.dispatch(State::End, res, cx, buf)
51+
let start = format!("{:X}\r\n", bytes);
52+
let start_length = start.as_bytes().len();
53+
let total = bytes + start_length + 2;
54+
buf.copy_within(..bytes, start_length);
55+
buf[..start_length].copy_from_slice(start.as_bytes());
56+
buf[total - 2..total].copy_from_slice(b"\r\n");
57+
Poll::Ready(Ok(total))
25158
}
25259
}

0 commit comments

Comments
 (0)