Skip to content

Commit 015f5b0

Browse files
committed
keep-alive
1 parent 2e269cc commit 015f5b0

File tree

3 files changed

+126
-111
lines changed

3 files changed

+126
-111
lines changed

examples/server.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,28 @@ fn main() -> Result<(), async_h1::Exception> {
99
let mut incoming = listener.incoming();
1010

1111
while let Some(stream) = incoming.next().await {
12-
task::spawn(async move {
12+
// task::spawn(async move {
13+
// let stream = stream?;
14+
// let (reader, writer) = &mut (&stream, &stream);
15+
// let req = server::decode(reader).await?;
16+
// if let server::OptionalRequest::Request(req) = req {
17+
// // dbg!(req);
18+
// let body = Body::from_string("hello chashu".to_owned());
19+
// let mut res = server::encode(http::Response::new(body)).await?;
20+
// io::copy(&mut res, writer).await?;
21+
// }
22+
// Ok::<(), async_h1::Exception>(())
23+
// });
24+
task::spawn(async {
1325
let stream = stream?;
1426
let (reader, writer) = &mut (&stream, &stream);
15-
let req = server::decode(reader).await?;
16-
if req.is_some() {
17-
// dbg!(req);
18-
let body = Body::from_string("hello chashu".to_owned());
19-
let mut res = server::encode(http::Response::new(body)).await?;
20-
io::copy(&mut res, writer).await?;
21-
}
22-
Ok::<(), async_h1::Exception>(())
27+
server::connect(reader, writer, |req| {
28+
async {
29+
let body = Body::from_string("hello chashu".to_owned());
30+
Ok(http::Response::new(body))
31+
}
32+
})
33+
.await
2334
});
2435
}
2536
Ok(())

src/server.rs

Lines changed: 105 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use async_std::future::Future;
44
use async_std::io::{self, BufReader};
55
use async_std::io::{Read, Write};
66
use async_std::prelude::*;
7-
use async_std::task::{sleep, Context, Poll};
7+
use async_std::task::{Context, Poll};
88
use futures_core::ready;
99
use http::{Request, Response, Version};
10+
use std::time::{Duration, Instant};
1011

1112
use std::pin::Pin;
1213

@@ -20,34 +21,43 @@ pub async fn connect<'a, F, Fut, B>(
2021
where
2122
F: Fn(&mut Request<Body<BufReader<&'a mut B>>>) -> Fut,
2223
Fut: Future<Output = Result<Response<Body<&'a mut B>>, Exception>>,
23-
B: Read + Write + Unpin + Send + 'static,
24+
B: Read + Write + Unpin + Send,
2425
{
25-
let decoder = Decoder::new(reader);
26-
let req = decoder.decode().await?;
27-
if let Some(mut req) = req {
28-
// TODO: parse Keep Alive header
29-
30-
let handle = async {
31-
loop {
32-
let mut res = encode(callback(&mut req).await?).await.unwrap();
33-
io::copy(&mut res, writer).await?;
34-
let reader = res.body.into_reader().unwrap();
35-
let decoder = Decoder::new(reader);
36-
match decoder.decode().await? {
37-
Some(r) => req = r,
38-
None => break,
39-
}
26+
let req = decode(reader).await?;
27+
if let OptionalRequest::Request(mut req) = req {
28+
let headers = req.headers();
29+
let timeout = match (headers.get("Connection"), headers.get("Keep-Alive")) {
30+
(Some(connection), Some(_v))
31+
if connection == http::header::HeaderValue::from_static("Keep-Alive") =>
32+
{
33+
// TODO: parse timeout
34+
Duration::from_secs(5)
4035
}
41-
42-
Ok::<(), Exception>(())
43-
};
44-
45-
let timer = async {
46-
sleep(std::time::Duration::from_secs(5)).await;
47-
Ok::<(), Exception>(())
36+
_ => Duration::from_secs(5),
4837
};
4938

50-
handle.race(timer).await?;
39+
let beginning = Instant::now();
40+
loop {
41+
println!("Handling request");
42+
let mut res = encode(callback(&mut req).await?).await.unwrap();
43+
io::copy(&mut res, writer).await?;
44+
let mut stream = res.body.into_reader().unwrap();
45+
req = loop {
46+
match decode(stream).await? {
47+
OptionalRequest::Request(r) => {
48+
break r;
49+
}
50+
OptionalRequest::Stream(r) => {
51+
let now = Instant::now();
52+
if now - beginning > timeout {
53+
return Ok(());
54+
} else {
55+
stream = r;
56+
}
57+
}
58+
}
59+
};
60+
}
5161
}
5262

5363
Ok(())
@@ -123,110 +133,105 @@ impl<R: Read + Unpin> Read for Encoder<R> {
123133
// TODO: return a reader in the response
124134
pub async fn encode<R>(res: Response<Body<R>>) -> io::Result<Encoder<R>>
125135
where
126-
R: Read,
136+
R: Read + Send,
127137
{
128138
let mut buf: Vec<u8> = vec![];
129139

130140
let reason = res.status().canonical_reason().unwrap();
131141
let status = res.status();
132-
write!(&mut buf, "HTTP/1.1 {} {}\r\n", status.as_str(), reason).await?;
142+
std::io::Write::write_fmt(
143+
&mut buf,
144+
format_args!("HTTP/1.1 {} {}\r\n", status.as_str(), reason),
145+
)?;
133146

134147
// If the body isn't streaming, we can set the content-length ahead of time. Else we need to
135148
// send all items in chunks.
136149
if let Some(len) = res.body().len() {
137-
write!(&mut buf, "Content-Length: {}\r\n", len).await?;
150+
std::io::Write::write_fmt(&mut buf, format_args!("Content-Length: {}\r\n", len))?;
138151
} else {
139-
write!(&mut buf, "Transfer-Encoding: chunked\r\n").await?;
152+
std::io::Write::write_fmt(&mut buf, format_args!("Transfer-Encoding: chunked\r\n"))?;
140153
panic!("chunked encoding is not implemented yet");
141154
// See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
142155
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
143156
}
144157

145158
for (header, value) in res.headers() {
146-
write!(
159+
std::io::Write::write_fmt(
147160
&mut buf,
148-
"{}: {}\r\n",
149-
header.as_str(),
150-
value.to_str().unwrap()
151-
)
152-
.await?;
161+
format_args!("{}: {}\r\n", header.as_str(), value.to_str().unwrap()),
162+
)?
153163
}
154164

155-
write!(&mut buf, "\r\n").await?;
165+
std::io::Write::write_fmt(&mut buf, format_args!("\r\n"))?;
156166
Ok(Encoder::new(buf, res.into_body()))
157167
}
158168

159-
struct Decoder<R> {
160-
reader: BufReader<R>,
169+
#[derive(Debug)]
170+
pub enum OptionalRequest<R: Read> {
171+
Request(Request<Body<BufReader<R>>>),
172+
Stream(R),
161173
}
162174

163-
impl<R: Read + Unpin + Send> Decoder<R> {
164-
fn new(reader: R) -> Self {
165-
Decoder {
166-
reader: BufReader::new(reader),
175+
/// Decode an HTTP request on the server.
176+
pub async fn decode<R>(reader: R) -> Result<OptionalRequest<R>, Exception>
177+
where
178+
R: Read + Unpin + Send,
179+
{
180+
let mut reader = BufReader::new(reader);
181+
let mut buf = Vec::new();
182+
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
183+
let mut httparse_req = httparse::Request::new(&mut headers);
184+
185+
// Keep reading bytes from the stream until we hit the end of the stream.
186+
loop {
187+
let bytes_read = reader.read_until(b'\n', &mut buf).await?;
188+
// No more bytes are yielded from the stream.
189+
if bytes_read == 0 {
190+
return Ok(OptionalRequest::Stream(reader.into_inner()));
167191
}
168-
}
169-
170-
/// Decode an HTTP request on the server.
171-
pub async fn decode(mut self) -> Result<Option<Request<Body<BufReader<R>>>>, Exception>
172-
where
173-
R: Read + Unpin + Send,
174-
{
175-
let mut buf = Vec::new();
176-
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
177-
let mut httparse_req = httparse::Request::new(&mut headers);
178-
179-
// Keep reading bytes from the stream until we hit the end of the stream.
180-
loop {
181-
let bytes_read = self.reader.read_until(b'\n', &mut buf).await?;
182-
// No more bytes are yielded from the stream.
183-
if bytes_read == 0 {
184-
return Ok(None);
185-
}
186192

187-
// We've hit the end delimiter of the stream.
188-
let idx = buf.len() - 1;
189-
if idx >= 3 && &buf[idx - 3..=idx] == b"\r\n\r\n" {
190-
break;
191-
}
193+
// We've hit the end delimiter of the stream.
194+
let idx = buf.len() - 1;
195+
if idx >= 3 && &buf[idx - 3..=idx] == b"\r\n\r\n" {
196+
break;
192197
}
198+
}
193199

194-
// Convert our header buf into an httparse instance, and validate.
195-
let status = httparse_req.parse(&buf)?;
196-
if status.is_partial() {
197-
dbg!(String::from_utf8(buf).unwrap());
198-
return Err("Malformed HTTP head".into());
199-
}
200+
// Convert our header buf into an httparse instance, and validate.
201+
let status = httparse_req.parse(&buf)?;
202+
if status.is_partial() {
203+
dbg!(String::from_utf8(buf).unwrap());
204+
return Err("Malformed HTTP head".into());
205+
}
200206

201-
// Convert httparse headers + body into a `http::Request` type.
202-
let mut req = Request::builder();
203-
for header in httparse_req.headers.iter() {
204-
req.header(header.name, header.value);
205-
}
206-
if let Some(method) = httparse_req.method {
207-
req.method(method);
208-
}
209-
if let Some(path) = httparse_req.path {
210-
req.uri(path);
211-
}
212-
if let Some(version) = httparse_req.version {
213-
req.version(match version {
214-
1 => Version::HTTP_11,
215-
_ => return Err("Unsupported HTTP version".into()),
216-
});
217-
}
207+
// Convert httparse headers + body into a `http::Request` type.
208+
let mut req = Request::builder();
209+
for header in httparse_req.headers.iter() {
210+
req.header(header.name, header.value);
211+
}
212+
if let Some(method) = httparse_req.method {
213+
req.method(method);
214+
}
215+
if let Some(path) = httparse_req.path {
216+
req.uri(path);
217+
}
218+
if let Some(version) = httparse_req.version {
219+
req.version(match version {
220+
1 => Version::HTTP_11,
221+
_ => return Err("Unsupported HTTP version".into()),
222+
});
223+
}
218224

219-
// Process the body if `Content-Length` was passed.
220-
let body = match httparse_req
221-
.headers
222-
.iter()
223-
.find(|h| h.name == "Content-Length")
224-
{
225-
Some(_header) => Body::new(self.reader), // TODO: use the header value
226-
None => Body::empty(),
227-
};
225+
// Process the body if `Content-Length` was passed.
226+
let body = match httparse_req
227+
.headers
228+
.iter()
229+
.find(|h| h.name == "Content-Length")
230+
{
231+
Some(_header) => Body::new(reader), // TODO: use the header value
232+
None => Body::empty(),
233+
};
228234

229-
// Return the request.
230-
Ok(Some(req.body(body)?))
231-
}
235+
// Return the request.
236+
Ok(OptionalRequest::Request(req.body(body)?))
232237
}

tests/test.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ fn server() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
1818
let req = server::decode(reader).await?;
1919
dbg!(req);
2020
let body: async_h1::Body<&[u8]> = async_h1::Body::empty();
21-
let res = http::Response::new(body);
22-
let mut res = server::encode(res).await?;
21+
let mut res = server::encode(http::Response::new(body)).await?;
2322
io::copy(&mut res, writer).await?;
2423
}
2524

0 commit comments

Comments
 (0)