Skip to content

Commit 1cb8294

Browse files
committed
Keep alive working
1 parent 015f5b0 commit 1cb8294

File tree

4 files changed

+31
-35
lines changed

4 files changed

+31
-35
lines changed

src/body.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::pin::Pin;
77

88
/// A streaming HTTP body.
99
pub struct Body<R: AsyncRead> {
10-
reader: Option<R>,
10+
reader: R,
1111
length: Option<usize>,
1212
}
1313

@@ -26,15 +26,15 @@ impl<R: AsyncRead> Body<R> {
2626
/// `Transfer-Encoding: Chunked`.
2727
pub fn new(reader: R) -> Self {
2828
Self {
29-
reader: Some(reader),
29+
reader,
3030
length: None,
3131
}
3232
}
3333

3434
/// Create a new empty body.
35-
pub fn empty() -> Self {
35+
pub fn empty(reader: R) -> Self {
3636
Self {
37-
reader: None,
37+
reader,
3838
length: Some(0),
3939
}
4040
}
@@ -50,7 +50,7 @@ impl<R: AsyncRead> Body<R> {
5050
}
5151

5252
impl<R: AsyncRead + Unpin> Body<R> {
53-
pub fn into_reader(self) -> Option<R> {
53+
pub fn into_reader(self) -> R {
5454
self.reader
5555
}
5656
}
@@ -61,7 +61,7 @@ impl Body<io::Cursor<Vec<u8>>> {
6161
pub fn from_string(string: String) -> Self {
6262
Self {
6363
length: Some(string.len()),
64-
reader: Some(Cursor::new(string.into_bytes())),
64+
reader: Cursor::new(string.into_bytes()),
6565
}
6666
}
6767

@@ -70,7 +70,7 @@ impl Body<io::Cursor<Vec<u8>>> {
7070
pub fn from_bytes(bytes: Vec<u8>) -> Self {
7171
Self {
7272
length: Some(bytes.len()),
73-
reader: Some(Cursor::new(bytes)),
73+
reader: Cursor::new(bytes),
7474
}
7575
}
7676
}
@@ -81,9 +81,6 @@ impl<R: AsyncRead + Unpin> AsyncRead for Body<R> {
8181
cx: &mut Context<'_>,
8282
buf: &mut [u8],
8383
) -> Poll<io::Result<usize>> {
84-
match self.reader.as_mut() {
85-
None => Poll::Ready(Ok(0)),
86-
Some(reader) => Pin::new(&mut *reader).poll_read(cx, buf),
87-
}
84+
Pin::new(&mut self.reader).poll_read(cx, buf)
8885
}
8986
}

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ where
129129
.find(|h| h.name == "Content-Length")
130130
{
131131
Some(_header) => Body::new(reader), // TODO: use the header value
132-
None => Body::empty(),
132+
None => Body::empty(reader),
133133
};
134134

135135
// Return the response.

src/server.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@ use std::pin::Pin;
1313

1414
use crate::{Body, Exception, MAX_HEADERS};
1515

16-
pub async fn connect<'a, F, Fut, B>(
17-
reader: &'a mut B,
18-
writer: &'a mut B,
16+
pub async fn connect<'a, F, Fut, R, W, O: 'a>(
17+
reader: &'a mut R,
18+
writer: &'a mut W,
1919
callback: F,
2020
) -> Result<(), Exception>
2121
where
22-
F: Fn(&mut Request<Body<BufReader<&'a mut B>>>) -> Fut,
23-
Fut: Future<Output = Result<Response<Body<&'a mut B>>, Exception>>,
24-
B: Read + Write + Unpin + Send,
22+
R: Read + Unpin + Send,
23+
W: Write + Unpin,
24+
F: Fn(&mut Request<Body<BufReader<&'a mut R>>>) -> Fut,
25+
Fut: Future<Output = Result<Response<Body<O>>, Exception>>,
26+
O: Read + Unpin + Send,
2527
{
2628
let req = decode(reader).await?;
27-
if let OptionalRequest::Request(mut req) = req {
29+
if let RequestOrReader::Request(mut req) = req {
2830
let headers = req.headers();
2931
let timeout = match (headers.get("Connection"), headers.get("Keep-Alive")) {
3032
(Some(connection), Some(_v))
@@ -38,22 +40,19 @@ where
3840

3941
let beginning = Instant::now();
4042
loop {
41-
println!("Handling request");
42-
let mut res = encode(callback(&mut req).await?).await.unwrap();
43+
// TODO: what to do when the callback returns Err
44+
let mut res = encode(callback(&mut req).await?).await?;
4345
io::copy(&mut res, writer).await?;
44-
let mut stream = res.body.into_reader().unwrap();
46+
let mut stream = req.into_body().into_reader().into_inner();
4547
req = loop {
4648
match decode(stream).await? {
47-
OptionalRequest::Request(r) => {
48-
break r;
49-
}
50-
OptionalRequest::Stream(r) => {
49+
RequestOrReader::Request(r) => break r,
50+
RequestOrReader::Reader(r) => {
5151
let now = Instant::now();
5252
if now - beginning > timeout {
5353
return Ok(());
54-
} else {
55-
stream = r;
5654
}
55+
stream = r;
5756
}
5857
}
5958
};
@@ -167,13 +166,13 @@ where
167166
}
168167

169168
#[derive(Debug)]
170-
pub enum OptionalRequest<R: Read> {
169+
pub enum RequestOrReader<R: Read> {
171170
Request(Request<Body<BufReader<R>>>),
172-
Stream(R),
171+
Reader(R),
173172
}
174173

175174
/// Decode an HTTP request on the server.
176-
pub async fn decode<R>(reader: R) -> Result<OptionalRequest<R>, Exception>
175+
pub async fn decode<R>(reader: R) -> Result<RequestOrReader<R>, Exception>
177176
where
178177
R: Read + Unpin + Send,
179178
{
@@ -187,7 +186,7 @@ where
187186
let bytes_read = reader.read_until(b'\n', &mut buf).await?;
188187
// No more bytes are yielded from the stream.
189188
if bytes_read == 0 {
190-
return Ok(OptionalRequest::Stream(reader.into_inner()));
189+
return Ok(RequestOrReader::Reader(reader.into_inner()));
191190
}
192191

193192
// We've hit the end delimiter of the stream.
@@ -229,9 +228,9 @@ where
229228
.find(|h| h.name == "Content-Length")
230229
{
231230
Some(_header) => Body::new(reader), // TODO: use the header value
232-
None => Body::empty(),
231+
None => Body::empty(reader),
233232
};
234233

235234
// Return the request.
236-
Ok(OptionalRequest::Request(req.body(body)?))
235+
Ok(RequestOrReader::Request(req.body(body)?))
237236
}

tests/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ fn server() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
1717
let (reader, writer) = &mut (&stream, &stream);
1818
let req = server::decode(reader).await?;
1919
dbg!(req);
20-
let body: async_h1::Body<&[u8]> = async_h1::Body::empty();
20+
let body: async_h1::Body<&[u8]> = async_h1::Body::empty(&[]);
2121
let mut res = server::encode(http::Response::new(body)).await?;
2222
io::copy(&mut res, writer).await?;
2323
}

0 commit comments

Comments
 (0)