Skip to content

Commit 3fed77b

Browse files
authored
Merge pull request #10 from yoshuawuyts/keep-alive-server
Basic KeepAlive Server Support
2 parents 8f6cfec + fda7ba9 commit 3fed77b

File tree

6 files changed

+101
-45
lines changed

6 files changed

+101
-45
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ edition = "2018"
1515
httparse = "1.3.3"
1616
http = "0.1.17"
1717
futures-io = "0.3.0"
18-
async-std = "0.99.12"
18+
async-std = { version = "0.99.12", features = ["unstable"] }
1919
futures-core-preview = "0.3.0-alpha.18"
2020

2121
[dev-dependencies]

examples/server.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use async_h1::{server, Body};
22
use async_std::prelude::*;
3-
use async_std::{io, net, task};
3+
use async_std::net;
4+
use async_std::task;
45

56
fn main() -> Result<(), async_h1::Exception> {
67
task::block_on(async {
@@ -9,17 +10,14 @@ fn main() -> Result<(), async_h1::Exception> {
910
let mut incoming = listener.incoming();
1011

1112
while let Some(stream) = incoming.next().await {
12-
task::spawn(async move {
13+
task::spawn(async {
1314
let stream = stream?;
1415
let (reader, writer) = &mut (&stream, &stream);
15-
let req = server::decode(reader).await?;
16-
if req.is_some() {
17-
// dbg!(req);
16+
server::connect(reader, writer, |_| async {
1817
let body = Body::from_string("hello chashu".to_owned());
19-
let mut res = server::encode(http::Response::new(body)).await?;
20-
io::copy(&mut res, writer).await?;
21-
}
22-
Ok::<(), async_h1::Exception>(())
18+
Ok(http::Response::new(body))
19+
})
20+
.await
2321
});
2422
}
2523
Ok(())

src/body.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::pin::Pin;
77

88
/// A streaming HTTP body.
99
pub struct Body<R: AsyncRead> {
10-
reader: Option<R>,
10+
reader: R,
1111
length: Option<usize>,
1212
}
1313

@@ -26,15 +26,15 @@ impl<R: AsyncRead> Body<R> {
2626
/// `Transfer-Encoding: Chunked`.
2727
pub fn new(reader: R) -> Self {
2828
Self {
29-
reader: Some(reader),
29+
reader,
3030
length: None,
3131
}
3232
}
3333

3434
/// Create a new empty body.
35-
pub fn empty() -> Self {
35+
pub fn empty(reader: R) -> Self {
3636
Self {
37-
reader: None,
37+
reader,
3838
length: Some(0),
3939
}
4040
}
@@ -49,13 +49,19 @@ impl<R: AsyncRead> Body<R> {
4949
}
5050
}
5151

52+
impl<R: AsyncRead + Unpin> Body<R> {
53+
pub fn into_reader(self) -> R {
54+
self.reader
55+
}
56+
}
57+
5258
impl Body<io::Cursor<Vec<u8>>> {
5359
/// Create a new instance from a string.
5460
#[inline]
5561
pub fn from_string(string: String) -> Self {
5662
Self {
5763
length: Some(string.len()),
58-
reader: Some(Cursor::new(string.into_bytes())),
64+
reader: Cursor::new(string.into_bytes()),
5965
}
6066
}
6167

@@ -64,7 +70,7 @@ impl Body<io::Cursor<Vec<u8>>> {
6470
pub fn from_bytes(bytes: Vec<u8>) -> Self {
6571
Self {
6672
length: Some(bytes.len()),
67-
reader: Some(Cursor::new(bytes)),
73+
reader: Cursor::new(bytes),
6874
}
6975
}
7076
}
@@ -75,9 +81,6 @@ impl<R: AsyncRead + Unpin> AsyncRead for Body<R> {
7581
cx: &mut Context<'_>,
7682
buf: &mut [u8],
7783
) -> Poll<io::Result<usize>> {
78-
match self.reader.as_mut() {
79-
None => Poll::Ready(Ok(0)),
80-
Some(reader) => Pin::new(&mut *reader).poll_read(cx, buf),
81-
}
84+
Pin::new(&mut self.reader).poll_read(cx, buf)
8285
}
8386
}

src/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ pub async fn encode<R: AsyncRead>(req: Request<Body<R>>) -> Result<Encoder<R>, s
6464
// See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
6565
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
6666
}
67-
6867
for (header, value) in req.headers() {
6968
write!(
7069
&mut buf,
@@ -130,7 +129,7 @@ where
130129
.find(|h| h.name == "Content-Length")
131130
{
132131
Some(_header) => Body::new(reader), // TODO: use the header value
133-
None => Body::empty(),
132+
None => Body::empty(reader),
134133
};
135134

136135
// Return the response.

src/server.rs

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,72 @@
11
//! Process HTTP connections on the server.
22
3+
use async_std::future::Future;
34
use async_std::io::{self, BufReader};
5+
use async_std::io::{Read, Write};
46
use async_std::prelude::*;
57
use async_std::task::{Context, Poll};
68
use futures_core::ready;
7-
use futures_io::AsyncRead;
89
use http::{Request, Response, Version};
10+
use std::time::{Duration, Instant};
911

1012
use std::pin::Pin;
1113

1214
use crate::{Body, Exception, MAX_HEADERS};
1315

16+
pub async fn connect<'a, F, Fut, R, W, O: 'a>(
17+
reader: &'a mut R,
18+
writer: &'a mut W,
19+
callback: F,
20+
) -> Result<(), Exception>
21+
where
22+
R: Read + Unpin + Send,
23+
W: Write + Unpin,
24+
F: Fn(&mut Request<Body<BufReader<&'a mut R>>>) -> Fut,
25+
Fut: Future<Output = Result<Response<Body<O>>, Exception>>,
26+
O: Read + Unpin + Send,
27+
{
28+
let req = decode(reader).await?;
29+
if let RequestOrReader::Request(mut req) = req {
30+
let headers = req.headers();
31+
let timeout = match (headers.get("Connection"), headers.get("Keep-Alive")) {
32+
(Some(connection), Some(_v))
33+
if connection == http::header::HeaderValue::from_static("Keep-Alive") =>
34+
{
35+
// TODO: parse timeout
36+
Duration::from_secs(5)
37+
}
38+
_ => Duration::from_secs(5),
39+
};
40+
41+
let beginning = Instant::now();
42+
loop {
43+
// TODO: what to do when the callback returns Err
44+
let mut res = encode(callback(&mut req).await?).await?;
45+
io::copy(&mut res, writer).await?;
46+
let mut stream = req.into_body().into_reader().into_inner();
47+
req = loop {
48+
match decode(stream).await? {
49+
RequestOrReader::Request(r) => break r,
50+
RequestOrReader::Reader(r) => {
51+
let now = Instant::now();
52+
if now - beginning > timeout {
53+
return Ok(());
54+
}
55+
stream = r;
56+
}
57+
}
58+
};
59+
}
60+
}
61+
62+
Ok(())
63+
}
64+
1465
/// A streaming HTTP encoder.
1566
///
1667
/// This is returned from [`encode`].
1768
#[derive(Debug)]
18-
pub struct Encoder<R: AsyncRead> {
69+
pub struct Encoder<R: Read> {
1970
/// Keep track how far we've indexed into the headers + body.
2071
cursor: usize,
2172
/// HTTP headers to be sent.
@@ -30,7 +81,7 @@ pub struct Encoder<R: AsyncRead> {
3081
body_bytes_read: usize,
3182
}
3283

33-
impl<R: AsyncRead> Encoder<R> {
84+
impl<R: Read> Encoder<R> {
3485
/// Create a new instance.
3586
pub(crate) fn new(headers: Vec<u8>, body: Body<R>) -> Self {
3687
Self {
@@ -44,7 +95,7 @@ impl<R: AsyncRead> Encoder<R> {
4495
}
4596
}
4697

47-
impl<R: AsyncRead + Unpin> AsyncRead for Encoder<R> {
98+
impl<R: Read + Unpin> Read for Encoder<R> {
4899
fn poll_read(
49100
mut self: Pin<&mut Self>,
50101
cx: &mut Context<'_>,
@@ -81,43 +132,49 @@ impl<R: AsyncRead + Unpin> AsyncRead for Encoder<R> {
81132
// TODO: return a reader in the response
82133
pub async fn encode<R>(res: Response<Body<R>>) -> io::Result<Encoder<R>>
83134
where
84-
R: AsyncRead,
135+
R: Read + Send,
85136
{
86137
let mut buf: Vec<u8> = vec![];
87138

88139
let reason = res.status().canonical_reason().unwrap();
89140
let status = res.status();
90-
write!(&mut buf, "HTTP/1.1 {} {}\r\n", status.as_str(), reason).await?;
141+
std::io::Write::write_fmt(
142+
&mut buf,
143+
format_args!("HTTP/1.1 {} {}\r\n", status.as_str(), reason),
144+
)?;
91145

92146
// If the body isn't streaming, we can set the content-length ahead of time. Else we need to
93147
// send all items in chunks.
94148
if let Some(len) = res.body().len() {
95-
write!(&mut buf, "Content-Length: {}\r\n", len).await?;
149+
std::io::Write::write_fmt(&mut buf, format_args!("Content-Length: {}\r\n", len))?;
96150
} else {
97-
write!(&mut buf, "Transfer-Encoding: chunked\r\n").await?;
151+
std::io::Write::write_fmt(&mut buf, format_args!("Transfer-Encoding: chunked\r\n"))?;
98152
panic!("chunked encoding is not implemented yet");
99153
// See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
100154
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
101155
}
102156

103157
for (header, value) in res.headers() {
104-
write!(
158+
std::io::Write::write_fmt(
105159
&mut buf,
106-
"{}: {}\r\n",
107-
header.as_str(),
108-
value.to_str().unwrap()
109-
)
110-
.await?;
160+
format_args!("{}: {}\r\n", header.as_str(), value.to_str().unwrap()),
161+
)?
111162
}
112163

113-
write!(&mut buf, "\r\n").await?;
164+
std::io::Write::write_fmt(&mut buf, format_args!("\r\n"))?;
114165
Ok(Encoder::new(buf, res.into_body()))
115166
}
116167

168+
#[derive(Debug)]
169+
pub enum RequestOrReader<R: Read> {
170+
Request(Request<Body<BufReader<R>>>),
171+
Reader(R),
172+
}
173+
117174
/// Decode an HTTP request on the server.
118-
pub async fn decode<R>(reader: R) -> Result<Option<Request<Body<BufReader<R>>>>, Exception>
175+
pub async fn decode<R>(reader: R) -> Result<RequestOrReader<R>, Exception>
119176
where
120-
R: AsyncRead + Unpin + Send,
177+
R: Read + Unpin + Send,
121178
{
122179
let mut reader = BufReader::new(reader);
123180
let mut buf = Vec::new();
@@ -129,7 +186,7 @@ where
129186
let bytes_read = reader.read_until(b'\n', &mut buf).await?;
130187
// No more bytes are yielded from the stream.
131188
if bytes_read == 0 {
132-
return Ok(None);
189+
return Ok(RequestOrReader::Reader(reader.into_inner()));
133190
}
134191

135192
// We've hit the end delimiter of the stream.
@@ -171,9 +228,9 @@ where
171228
.find(|h| h.name == "Content-Length")
172229
{
173230
Some(_header) => Body::new(reader), // TODO: use the header value
174-
None => Body::empty(),
231+
None => Body::empty(reader),
175232
};
176233

177234
// Return the request.
178-
Ok(Some(req.body(body)?))
235+
Ok(RequestOrReader::Request(req.body(body)?))
179236
}

tests/test.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ fn server() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
1717
let (reader, writer) = &mut (&stream, &stream);
1818
let req = server::decode(reader).await?;
1919
dbg!(req);
20-
let body: async_h1::Body<&[u8]> = async_h1::Body::empty();
21-
let res = http::Response::new(body);
22-
let mut res = server::encode(res).await?;
20+
let body: async_h1::Body<&[u8]> = async_h1::Body::empty(&[]);
21+
let mut res = server::encode(http::Response::new(body)).await?;
2322
io::copy(&mut res, writer).await?;
2423
}
2524

0 commit comments

Comments
 (0)