Skip to content

Commit c1e896f

Browse files
committed
keep alive basics
1 parent 8f6cfec commit c1e896f

File tree

3 files changed

+113
-61
lines changed

3 files changed

+113
-61
lines changed

src/body.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ impl<R: AsyncRead> Body<R> {
4949
}
5050
}
5151

52+
impl<R: AsyncRead + Unpin> Body<R> {
53+
pub fn into_reader(self) -> Option<R> {
54+
self.reader
55+
}
56+
}
57+
5258
impl Body<io::Cursor<Vec<u8>>> {
5359
/// Create a new instance from a string.
5460
#[inline]

src/client.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ pub async fn encode<R: AsyncRead>(req: Request<Body<R>>) -> Result<Encoder<R>, s
6464
// See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
6565
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
6666
}
67-
6867
for (header, value) in req.headers() {
6968
write!(
7069
&mut buf,

src/server.rs

Lines changed: 107 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,57 @@
11
//! Process HTTP connections on the server.
22
3+
use async_std::future::{timeout, Future};
34
use async_std::io::{self, BufReader};
5+
use async_std::io::{Read, Write};
46
use async_std::prelude::*;
57
use async_std::task::{Context, Poll};
68
use futures_core::ready;
7-
use futures_io::AsyncRead;
89
use http::{Request, Response, Version};
910

1011
use std::pin::Pin;
1112

1213
use crate::{Body, Exception, MAX_HEADERS};
1314

15+
pub async fn connect<'a, F, Fut, B>(
16+
reader: &'a mut B,
17+
writer: &'a mut B,
18+
callback: F,
19+
) -> Result<(), Exception>
20+
where
21+
F: Fn(&mut Request<Body<BufReader<&'a mut B>>>) -> Fut,
22+
Fut: Future<Output = Result<Response<Body<&'a mut B>>, Exception>>,
23+
B: Read + Write + Unpin + Send + 'static,
24+
{
25+
let decoder = Decoder::new(reader);
26+
let req = decoder.decode().await?;
27+
if let Some(mut req) = req {
28+
// TODO: parse Keep Alive header
29+
30+
let handle = async {
31+
loop {
32+
let mut res = encode(callback(&mut req).await?).await.unwrap();
33+
io::copy(&mut res, writer).await?;
34+
let reader = res.body.into_reader().unwrap();
35+
let decoder = Decoder::new(reader);
36+
match decoder.decode().await? {
37+
// TODO: no unwrap
38+
Some(r) => req = r,
39+
None => break,
40+
}
41+
}
42+
43+
Ok::<(), Exception>(())
44+
};
45+
}
46+
47+
Ok(())
48+
}
49+
1450
/// A streaming HTTP encoder.
1551
///
1652
/// This is returned from [`encode`].
1753
#[derive(Debug)]
18-
pub struct Encoder<R: AsyncRead> {
54+
pub struct Encoder<R: Read> {
1955
/// Keep track how far we've indexed into the headers + body.
2056
cursor: usize,
2157
/// HTTP headers to be sent.
@@ -30,7 +66,7 @@ pub struct Encoder<R: AsyncRead> {
3066
body_bytes_read: usize,
3167
}
3268

33-
impl<R: AsyncRead> Encoder<R> {
69+
impl<R: Read> Encoder<R> {
3470
/// Create a new instance.
3571
pub(crate) fn new(headers: Vec<u8>, body: Body<R>) -> Self {
3672
Self {
@@ -44,7 +80,7 @@ impl<R: AsyncRead> Encoder<R> {
4480
}
4581
}
4682

47-
impl<R: AsyncRead + Unpin> AsyncRead for Encoder<R> {
83+
impl<R: Read + Unpin> Read for Encoder<R> {
4884
fn poll_read(
4985
mut self: Pin<&mut Self>,
5086
cx: &mut Context<'_>,
@@ -81,7 +117,7 @@ impl<R: AsyncRead + Unpin> AsyncRead for Encoder<R> {
81117
// TODO: return a reader in the response
82118
pub async fn encode<R>(res: Response<Body<R>>) -> io::Result<Encoder<R>>
83119
where
84-
R: AsyncRead,
120+
R: Read,
85121
{
86122
let mut buf: Vec<u8> = vec![];
87123

@@ -114,66 +150,77 @@ where
114150
Ok(Encoder::new(buf, res.into_body()))
115151
}
116152

117-
/// Decode an HTTP request on the server.
118-
pub async fn decode<R>(reader: R) -> Result<Option<Request<Body<BufReader<R>>>>, Exception>
119-
where
120-
R: AsyncRead + Unpin + Send,
121-
{
122-
let mut reader = BufReader::new(reader);
123-
let mut buf = Vec::new();
124-
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
125-
let mut httparse_req = httparse::Request::new(&mut headers);
126-
127-
// Keep reading bytes from the stream until we hit the end of the stream.
128-
loop {
129-
let bytes_read = reader.read_until(b'\n', &mut buf).await?;
130-
// No more bytes are yielded from the stream.
131-
if bytes_read == 0 {
132-
return Ok(None);
133-
}
153+
struct Decoder<R> {
154+
reader: BufReader<R>,
155+
}
134156

135-
// We've hit the end delimiter of the stream.
136-
let idx = buf.len() - 1;
137-
if idx >= 3 && &buf[idx - 3..=idx] == b"\r\n\r\n" {
138-
break;
157+
impl<R: Read + Unpin + Send> Decoder<R> {
158+
fn new(reader: R) -> Self {
159+
Decoder {
160+
reader: BufReader::new(reader),
139161
}
140162
}
141163

142-
// Convert our header buf into an httparse instance, and validate.
143-
let status = httparse_req.parse(&buf)?;
144-
if status.is_partial() {
145-
dbg!(String::from_utf8(buf).unwrap());
146-
return Err("Malformed HTTP head".into());
147-
}
164+
/// Decode an HTTP request on the server.
165+
pub async fn decode(mut self) -> Result<Option<Request<Body<BufReader<R>>>>, Exception>
166+
where
167+
R: Read + Unpin + Send,
168+
{
169+
let mut buf = Vec::new();
170+
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
171+
let mut httparse_req = httparse::Request::new(&mut headers);
172+
173+
// Keep reading bytes from the stream until we hit the end of the stream.
174+
loop {
175+
let bytes_read = self.reader.read_until(b'\n', &mut buf).await?;
176+
// No more bytes are yielded from the stream.
177+
if bytes_read == 0 {
178+
return Ok(None);
179+
}
148180

149-
// Convert httparse headers + body into a `http::Request` type.
150-
let mut req = Request::builder();
151-
for header in httparse_req.headers.iter() {
152-
req.header(header.name, header.value);
153-
}
154-
if let Some(method) = httparse_req.method {
155-
req.method(method);
156-
}
157-
if let Some(path) = httparse_req.path {
158-
req.uri(path);
159-
}
160-
if let Some(version) = httparse_req.version {
161-
req.version(match version {
162-
1 => Version::HTTP_11,
163-
_ => return Err("Unsupported HTTP version".into()),
164-
});
165-
}
181+
// We've hit the end delimiter of the stream.
182+
let idx = buf.len() - 1;
183+
if idx >= 3 && &buf[idx - 3..=idx] == b"\r\n\r\n" {
184+
break;
185+
}
186+
}
166187

167-
// Process the body if `Content-Length` was passed.
168-
let body = match httparse_req
169-
.headers
170-
.iter()
171-
.find(|h| h.name == "Content-Length")
172-
{
173-
Some(_header) => Body::new(reader), // TODO: use the header value
174-
None => Body::empty(),
175-
};
188+
// Convert our header buf into an httparse instance, and validate.
189+
let status = httparse_req.parse(&buf)?;
190+
if status.is_partial() {
191+
dbg!(String::from_utf8(buf).unwrap());
192+
return Err("Malformed HTTP head".into());
193+
}
176194

177-
// Return the request.
178-
Ok(Some(req.body(body)?))
195+
// Convert httparse headers + body into a `http::Request` type.
196+
let mut req = Request::builder();
197+
for header in httparse_req.headers.iter() {
198+
req.header(header.name, header.value);
199+
}
200+
if let Some(method) = httparse_req.method {
201+
req.method(method);
202+
}
203+
if let Some(path) = httparse_req.path {
204+
req.uri(path);
205+
}
206+
if let Some(version) = httparse_req.version {
207+
req.version(match version {
208+
1 => Version::HTTP_11,
209+
_ => return Err("Unsupported HTTP version".into()),
210+
});
211+
}
212+
213+
// Process the body if `Content-Length` was passed.
214+
let body = match httparse_req
215+
.headers
216+
.iter()
217+
.find(|h| h.name == "Content-Length")
218+
{
219+
Some(_header) => Body::new(self.reader), // TODO: use the header value
220+
None => Body::empty(),
221+
};
222+
223+
// Return the request.
224+
Ok(Some(req.body(body)?))
225+
}
179226
}

0 commit comments

Comments
 (0)