Skip to content

Commit 2bdae95

Browse files
committed
Setup trailers encoding
1 parent a71edb0 commit 2bdae95

File tree

1 file changed

+93
-29
lines changed

1 file changed

+93
-29
lines changed

src/chunked/encoder.rs

Lines changed: 93 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,37 @@ use http_types::Response;
88
const CR: u8 = b'\r';
99
const LF: u8 = b'\n';
1010

11+
/// The encoder state.
12+
#[derive(Debug)]
13+
enum State {
14+
/// Streaming out chunks.
15+
Streaming,
16+
/// Receiving trailers from a channel.
17+
ReceiveTrailers,
18+
/// Streaming out trailers.
19+
EncodeTrailers,
20+
/// Writing the final CRLF
21+
EndOfStream,
22+
/// The stream has finished.
23+
Done,
24+
}
25+
1126
/// An encoder for chunked encoding.
1227
#[derive(Debug)]
1328
pub(crate) struct ChunkedEncoder {
14-
done: bool,
29+
/// How many bytes we've written to the buffer so far.
30+
bytes_written: usize,
31+
/// The internal encoder state.
32+
state: State,
1533
}
1634

1735
impl ChunkedEncoder {
1836
/// Create a new instance.
1937
pub(crate) fn new() -> Self {
20-
Self { done: false }
38+
Self {
39+
state: State::Streaming,
40+
bytes_written: 0,
41+
}
2142
}
2243
/// Encode an AsyncBufRead using "chunked" framing. This is used for streams
2344
/// whose length is not known up front.
@@ -40,23 +61,33 @@ impl ChunkedEncoder {
4061
/// ```
4162
pub(crate) fn encode(
4263
&mut self,
43-
mut res: &mut Response,
64+
res: &mut Response,
4465
cx: &mut Context<'_>,
4566
buf: &mut [u8],
4667
) -> Poll<io::Result<usize>> {
47-
let mut bytes_read = 0;
48-
49-
// Return early if we know we're done.
50-
if self.done {
51-
return Poll::Ready(Ok(0));
68+
self.bytes_written = 0;
69+
match self.state {
70+
State::Streaming => self.encode_stream(res, cx, buf),
71+
State::ReceiveTrailers => self.encode_trailers(res, cx, buf),
72+
State::EncodeTrailers => self.encode_trailers(res, cx, buf),
73+
State::EndOfStream => self.encode_eos(cx, buf),
74+
State::Done => Poll::Ready(Ok(0)),
5275
}
76+
}
5377

78+
/// Stream out data using chunked encoding.
79+
fn encode_stream(
80+
&mut self,
81+
mut res: &mut Response,
82+
cx: &mut Context<'_>,
83+
buf: &mut [u8],
84+
) -> Poll<io::Result<usize>> {
5485
// Get bytes from the underlying stream. If the stream is not ready yet,
5586
// return the header bytes if we have any.
5687
let src = match Pin::new(&mut res).poll_fill_buf(cx) {
5788
Poll::Ready(Ok(n)) => n,
5889
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
59-
Poll::Pending => match bytes_read {
90+
Poll::Pending => match self.bytes_written {
6091
0 => return Poll::Pending,
6192
n => return Poll::Ready(Ok(n)),
6293
},
@@ -65,25 +96,20 @@ impl ChunkedEncoder {
6596
// If the stream doesn't have any more bytes left to read we're done.
6697
if src.len() == 0 {
6798
// Write out the final empty chunk
68-
let idx = bytes_read;
99+
let idx = self.bytes_written;
69100
buf[idx] = b'0';
70101
buf[idx + 1] = CR;
71102
buf[idx + 2] = LF;
103+
self.bytes_written += 3;
72104

73-
// Write the final CRLF
74-
buf[idx + 3] = CR;
75-
buf[idx + 4] = LF;
76-
bytes_read += 5;
77-
78-
log::trace!("done sending bytes");
79-
self.done = true;
80-
return Poll::Ready(Ok(bytes_read));
105+
self.state = State::ReceiveTrailers;
106+
return self.receive_trailers(res, cx, buf);
81107
}
82108

83109
// Each chunk is prefixed with the length of the data in hex, then a
84110
// CRLF, then the content, then another CRLF. Calculate how many bytes
85111
// each part should be.
86-
let buf_len = buf.len().checked_sub(bytes_read).unwrap_or(0);
112+
let buf_len = buf.len().checked_sub(self.bytes_written).unwrap_or(0);
87113
let amt = src.len().min(buf_len);
88114
// Calculate the max char count encoding the `len_prefix` statement
89115
// as hex would take. This is done by rounding up `log16(amt + 1)`.
@@ -94,28 +120,66 @@ impl ChunkedEncoder {
94120
let len_prefix = format!("{:X}", amt).into_bytes();
95121

96122
// Write our frame header to the buffer.
97-
let lower = bytes_read;
98-
let upper = bytes_read + len_prefix.len();
123+
let lower = self.bytes_written;
124+
let upper = self.bytes_written + len_prefix.len();
99125
buf[lower..upper].copy_from_slice(&len_prefix);
100126
buf[upper] = CR;
101127
buf[upper + 1] = LF;
102-
bytes_read += len_prefix.len() + 2;
128+
self.bytes_written += len_prefix.len() + 2;
103129

104130
// Copy the bytes from our source into the output buffer.
105-
let lower = bytes_read;
106-
let upper = bytes_read + amt;
131+
let lower = self.bytes_written;
132+
let upper = self.bytes_written + amt;
107133
buf[lower..upper].copy_from_slice(&src[0..amt]);
108134
Pin::new(&mut res).consume(amt);
109-
bytes_read += amt;
135+
self.bytes_written += amt;
110136

111137
// Finalize the chunk with a final CRLF.
112-
let idx = bytes_read;
138+
let idx = self.bytes_written;
113139
buf[idx] = CR;
114140
buf[idx + 1] = LF;
115-
bytes_read += 2;
141+
self.bytes_written += 2;
116142

117143
// Finally return how many bytes we've written to the buffer.
118-
log::trace!("sending {} bytes", bytes_read);
119-
Poll::Ready(Ok(bytes_read))
144+
log::trace!("sending {} bytes", self.bytes_written);
145+
Poll::Ready(Ok(self.bytes_written))
146+
}
147+
148+
/// Receive trailers sent to the response, and store them in an internal
149+
/// buffer.
150+
fn receive_trailers(
151+
&mut self,
152+
res: &mut Response,
153+
cx: &mut Context<'_>,
154+
buf: &mut [u8],
155+
) -> Poll<io::Result<usize>> {
156+
// TODO: actually wait for trailers to be received.
157+
self.state = State::EncodeTrailers;
158+
self.encode_trailers(res, cx, buf)
159+
}
160+
161+
/// Send trailers to the buffer.
162+
fn encode_trailers(
163+
&mut self,
164+
_res: &mut Response,
165+
cx: &mut Context<'_>,
166+
buf: &mut [u8],
167+
) -> Poll<io::Result<usize>> {
168+
// TODO: actually encode trailers here.
169+
self.state = State::EndOfStream;
170+
self.encode_eos(cx, buf)
171+
}
172+
173+
/// Encode the end of the stream.
174+
fn encode_eos(&mut self, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
175+
let idx = self.bytes_written;
176+
// Write the final CRLF
177+
buf[idx] = CR;
178+
buf[idx + 1] = LF;
179+
self.bytes_written += 2;
180+
181+
log::trace!("finished encoding chunked stream");
182+
self.state = State::Done;
183+
return Poll::Ready(Ok(self.bytes_written));
120184
}
121185
}

0 commit comments

Comments
 (0)