Skip to content

Commit a71edb0

Browse files
committed
Split chunked logic into own dir
1 parent 1b4540c commit a71edb0

File tree

4 files changed

+136
-83
lines changed

4 files changed

+136
-83
lines changed
File renamed without changes.

src/chunked/encoder.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use std::pin::Pin;
2+
3+
use async_std::io;
4+
use async_std::io::prelude::*;
5+
use async_std::task::{Context, Poll};
6+
use http_types::Response;
7+
8+
const CR: u8 = b'\r';
9+
const LF: u8 = b'\n';
10+
11+
/// An encoder for chunked encoding.
12+
#[derive(Debug)]
13+
pub(crate) struct ChunkedEncoder {
14+
done: bool,
15+
}
16+
17+
impl ChunkedEncoder {
18+
/// Create a new instance.
19+
pub(crate) fn new() -> Self {
20+
Self { done: false }
21+
}
22+
/// Encode an AsyncBufRead using "chunked" framing. This is used for streams
23+
/// whose length is not known up front.
24+
///
25+
/// # Format
26+
///
27+
/// Each "chunk" uses the following encoding:
28+
///
29+
/// ```txt
30+
/// 1. {byte length of `data` as hex}\r\n
31+
/// 2. {data}\r\n
32+
/// ```
33+
///
34+
/// A chunk stream is finalized by appending the following:
35+
///
36+
/// ```txt
37+
/// 1. 0\r\n
38+
/// 2. {trailing header}\r\n (can be repeated)
39+
/// 3. \r\n
40+
/// ```
41+
pub(crate) fn encode(
42+
&mut self,
43+
mut res: &mut Response,
44+
cx: &mut Context<'_>,
45+
buf: &mut [u8],
46+
) -> Poll<io::Result<usize>> {
47+
let mut bytes_read = 0;
48+
49+
// Return early if we know we're done.
50+
if self.done {
51+
return Poll::Ready(Ok(0));
52+
}
53+
54+
// Get bytes from the underlying stream. If the stream is not ready yet,
55+
// return the header bytes if we have any.
56+
let src = match Pin::new(&mut res).poll_fill_buf(cx) {
57+
Poll::Ready(Ok(n)) => n,
58+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
59+
Poll::Pending => match bytes_read {
60+
0 => return Poll::Pending,
61+
n => return Poll::Ready(Ok(n)),
62+
},
63+
};
64+
65+
// If the stream doesn't have any more bytes left to read we're done.
66+
if src.len() == 0 {
67+
// Write out the final empty chunk
68+
let idx = bytes_read;
69+
buf[idx] = b'0';
70+
buf[idx + 1] = CR;
71+
buf[idx + 2] = LF;
72+
73+
// Write the final CRLF
74+
buf[idx + 3] = CR;
75+
buf[idx + 4] = LF;
76+
bytes_read += 5;
77+
78+
log::trace!("done sending bytes");
79+
self.done = true;
80+
return Poll::Ready(Ok(bytes_read));
81+
}
82+
83+
// Each chunk is prefixed with the length of the data in hex, then a
84+
// CRLF, then the content, then another CRLF. Calculate how many bytes
85+
// each part should be.
86+
let buf_len = buf.len().checked_sub(bytes_read).unwrap_or(0);
87+
let amt = src.len().min(buf_len);
88+
// Calculate the max char count encoding the `len_prefix` statement
89+
// as hex would take. This is done by rounding up `log16(amt + 1)`.
90+
let hex_len = ((amt + 1) as f64).log(16.0).ceil() as usize;
91+
let crlf_len = 2 * 2;
92+
let buf_upper = buf_len.checked_sub(hex_len + crlf_len).unwrap_or(0);
93+
let amt = amt.min(buf_upper);
94+
let len_prefix = format!("{:X}", amt).into_bytes();
95+
96+
// Write our frame header to the buffer.
97+
let lower = bytes_read;
98+
let upper = bytes_read + len_prefix.len();
99+
buf[lower..upper].copy_from_slice(&len_prefix);
100+
buf[upper] = CR;
101+
buf[upper + 1] = LF;
102+
bytes_read += len_prefix.len() + 2;
103+
104+
// Copy the bytes from our source into the output buffer.
105+
let lower = bytes_read;
106+
let upper = bytes_read + amt;
107+
buf[lower..upper].copy_from_slice(&src[0..amt]);
108+
Pin::new(&mut res).consume(amt);
109+
bytes_read += amt;
110+
111+
// Finalize the chunk with a final CRLF.
112+
let idx = bytes_read;
113+
buf[idx] = CR;
114+
buf[idx + 1] = LF;
115+
bytes_read += 2;
116+
117+
// Finally return how many bytes we've written to the buffer.
118+
log::trace!("sending {} bytes", bytes_read);
119+
Poll::Ready(Ok(bytes_read))
120+
}
121+
}

src/chunked/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod decoder;
2+
mod encoder;
3+
4+
pub(crate) use decoder::ChunkedDecoder;
5+
pub(crate) use encoder::ChunkedEncoder;

src/server/encode.rs

Lines changed: 10 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@ use std::pin::Pin;
44

55
use async_std::io;
66
use async_std::io::prelude::*;
7-
use async_std::task::{Context, Poll};
7+
use async_std::task::{ready, Context, Poll};
88
use http_types::Response;
99

10+
use crate::chunked::ChunkedEncoder;
1011
use crate::date::fmt_http_date;
1112

12-
const CR: u8 = b'\r';
13-
const LF: u8 = b'\n';
14-
1513
/// A streaming HTTP encoder.
1614
///
1715
/// This is returned from [`encode`].
@@ -33,6 +31,8 @@ pub(crate) struct Encoder {
3331
/// The amount of bytes read from the body.
3432
/// This is only used in the known-length body encoder.
3533
body_bytes_read: usize,
34+
/// An encoder for chunked encoding.
35+
chunked: ChunkedEncoder,
3636
}
3737

3838
#[derive(Debug)]
@@ -55,6 +55,7 @@ impl Encoder {
5555
head_bytes_read: 0,
5656
body_len: 0,
5757
body_bytes_read: 0,
58+
chunked: ChunkedEncoder::new(),
5859
}
5960
}
6061
}
@@ -190,93 +191,19 @@ impl Encoder {
190191

191192
/// Encode an AsyncBufRead using "chunked" framing. This is used for streams
192193
/// whose length is not known up front.
193-
///
194-
/// # Format
195-
///
196-
/// Each "chunk" uses the following encoding:
197-
///
198-
/// ```txt
199-
/// 1. {byte length of `data` as hex}\r\n
200-
/// 2. {data}\r\n
201-
/// ```
202-
///
203-
/// A chunk stream is finalized by appending the following:
204-
///
205-
/// ```txt
206-
/// 1. 0\r\n
207-
/// 2. {trailing header}\r\n (can be repeated)
208-
/// 3. \r\n
209-
/// ```
210194
fn encode_chunked_body(
211195
&mut self,
212196
cx: &mut Context<'_>,
213197
buf: &mut [u8],
214198
) -> Poll<io::Result<usize>> {
215-
// Get bytes from the underlying stream. If the stream is not ready yet,
216-
// return the header bytes if we have any.
217-
let src = match Pin::new(&mut self.res).poll_fill_buf(cx) {
218-
Poll::Ready(Ok(n)) => n,
219-
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
220-
Poll::Pending => match self.bytes_read {
221-
0 => return Poll::Pending,
222-
n => return Poll::Ready(Ok(n)),
223-
},
224-
};
199+
let buf = &mut buf[self.bytes_read..];
200+
let read = ready!(self.chunked.encode(&mut self.res, cx, buf))?;
225201

226-
// If the stream doesn't have any more bytes left to read we're done.
227-
if src.len() == 0 {
228-
// Write out the final empty chunk
229-
let idx = self.bytes_read;
230-
buf[idx] = b'0';
231-
buf[idx + 1] = CR;
232-
buf[idx + 2] = LF;
233-
234-
// Write the final CRLF
235-
buf[idx + 3] = CR;
236-
buf[idx + 4] = LF;
237-
self.bytes_read += 5;
238-
239-
log::trace!("done sending bytes");
240-
self.state = EncoderState::Done;
241-
return Poll::Ready(Ok(self.bytes_read));
202+
self.bytes_read += read;
203+
if self.bytes_read == 0 {
204+
self.state = EncoderState::Done
242205
}
243206

244-
// Each chunk is prefixed with the length of the data in hex, then a
245-
// CRLF, then the content, then another CRLF. Calculate how many bytes
246-
// each part should be.
247-
let buf_len = buf.len().checked_sub(self.bytes_read).unwrap_or(0);
248-
let amt = src.len().min(buf_len);
249-
// Calculate the max char count encoding the `len_prefix` statement
250-
// as hex would take. This is done by rounding up `log16(amt + 1)`.
251-
let hex_len = ((amt + 1) as f64).log(16.0).ceil() as usize;
252-
let crlf_len = 2 * 2;
253-
let buf_upper = buf_len.checked_sub(hex_len + crlf_len).unwrap_or(0);
254-
let amt = amt.min(buf_upper);
255-
let len_prefix = format!("{:X}", amt).into_bytes();
256-
257-
// Write our frame header to the buffer.
258-
let lower = self.bytes_read;
259-
let upper = self.bytes_read + len_prefix.len();
260-
buf[lower..upper].copy_from_slice(&len_prefix);
261-
buf[upper] = CR;
262-
buf[upper + 1] = LF;
263-
self.bytes_read += len_prefix.len() + 2;
264-
265-
// Copy the bytes from our source into the output buffer.
266-
let lower = self.bytes_read;
267-
let upper = self.bytes_read + amt;
268-
buf[lower..upper].copy_from_slice(&src[0..amt]);
269-
Pin::new(&mut self.res).consume(amt);
270-
self.bytes_read += amt;
271-
272-
// Finalize the chunk with a final CRLF.
273-
let idx = self.bytes_read;
274-
buf[idx] = CR;
275-
buf[idx + 1] = LF;
276-
self.bytes_read += 2;
277-
278-
// Finally return how many bytes we've written to the buffer.
279-
log::trace!("sending {} bytes", self.bytes_read);
280207
Poll::Ready(Ok(self.bytes_read))
281208
}
282209
}

0 commit comments

Comments
 (0)