Skip to content

Commit a9e711a

Browse files
committed
Refactor a bit
1 parent 6e27563 commit a9e711a

File tree

1 file changed

+64
-22
lines changed

1 file changed

+64
-22
lines changed

src/server.rs

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use async_std::prelude::*;
77
use async_std::task::{Context, Poll};
88
use futures_core::ready;
99
use http_types::{Method, Request, Response};
10+
use std::fmt;
1011
use std::str::FromStr;
1112
use std::time::Duration;
1213

@@ -28,38 +29,43 @@ where
2829
// TODO: make configurable
2930
let timeout_duration = Duration::from_secs(10);
3031
const MAX_REQUESTS: usize = 200;
31-
32-
let req = decode(reader).await?;
3332
let mut num_requests = 0;
34-
if let Some((mut req, stream)) = req {
35-
let mut stream: Option<Box<dyn BufRead + Unpin + Send + 'static>> = match stream {
36-
Some(s) => Some(Box::new(s)),
37-
None => None,
38-
};
33+
34+
// Decode a request. This may be the first of many since
35+
// the connection is Keep-Alive by default
36+
let decoded = decode(reader).await?;
37+
// Decode returns one of three things;
38+
// * A request with its body reader set to the underlying TCP stream
39+
// * A request with an empty body AND the underlying stream
40+
// * No request (because of the stream closed) and no underlying stream
41+
if let Some(mut decoded) = decoded {
3942
loop {
4043
num_requests += 1;
4144
if num_requests > MAX_REQUESTS {
45+
// We've exceeded the max number of requests per connection
4246
return Ok(());
4347
}
4448

49+
// Pass the request to the user defined request handler callback.
50+
// Encode the response we get back.
4551
// TODO: what to do when the callback returns Err
46-
let mut res = encode(callback(&mut req).await?).await?;
47-
let to_decode = match stream {
48-
None => req.into_body(),
49-
Some(s) => s,
50-
};
52+
let mut res = encode(callback(decoded.mut_request()).await?).await?;
53+
54+
// If we have reference to the stream, unwrap it. Otherwise,
55+
// get the underlying stream from the request
56+
let to_decode = decoded.to_stream();
57+
58+
// Copy the response into the writer
5159
io::copy(&mut res, &mut writer).await?;
52-
let (new_request, new_stream) = match timeout(timeout_duration, decode(to_decode)).await
53-
{
60+
61+
// Decode a new request, timing out if this takes longer than the
62+
// timeout duration.
63+
decoded = match timeout(timeout_duration, decode(to_decode)).await {
5464
Ok(Ok(Some(r))) => r,
5565
Ok(Ok(None)) | Err(TimeoutError { .. }) => break, /* EOF or timeout */
5666
Ok(Err(e)) => return Err(e),
5767
};
58-
req = new_request;
59-
stream = match new_stream {
60-
Some(s) => Some(Box::new(s)),
61-
None => None,
62-
};
68+
// Loop back with the new request and stream and start again
6369
}
6470
}
6571

@@ -161,7 +167,7 @@ pub async fn encode(res: Response) -> io::Result<Encoder> {
161167
}
162168

163169
/// Decode an HTTP request on the server.
164-
pub async fn decode<R>(reader: R) -> Result<Option<(Request, Option<BufReader<R>>)>, Exception>
170+
pub async fn decode<R>(reader: R) -> Result<Option<DecodedRequest>, Exception>
165171
where
166172
R: Read + Unpin + Send + 'static,
167173
{
@@ -220,11 +226,47 @@ where
220226
req = req.set_len(len);
221227

222228
// Return the request.
223-
Ok(Some((req, None)))
229+
Ok(Some(DecodedRequest::WithBody(req)))
224230
} else {
225231
return Err("Invalid value for Content-Length".into());
226232
}
227233
} else {
228-
Ok(Some((req, Some(reader))))
234+
Ok(Some(DecodedRequest::WithoutBody(req, Box::new(reader))))
235+
}
236+
}
237+
238+
/// A decoded response
239+
///
240+
/// Either a request with body stream OR a request without a
241+
/// a body stream paired with the underlying stream
242+
pub enum DecodedRequest {
243+
WithBody(Request),
244+
WithoutBody(Request, Box<dyn BufRead + Unpin + Send + 'static>),
245+
}
246+
247+
impl fmt::Debug for DecodedRequest {
248+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249+
match self {
250+
DecodedRequest::WithBody(_) => write!(f, "WithBody"),
251+
DecodedRequest::WithoutBody(_, _) => write!(f, "WithoutBody"),
252+
}
253+
}
254+
}
255+
256+
impl DecodedRequest {
257+
/// Get a mutable reference to the request
258+
fn mut_request(&mut self) -> &mut Request {
259+
match self {
260+
DecodedRequest::WithBody(r) => r,
261+
DecodedRequest::WithoutBody(r, _) => r,
262+
}
263+
}
264+
265+
/// Consume self and get access to the underlying stream
266+
fn to_stream(self) -> Box<dyn BufRead + Unpin + Send + 'static> {
267+
match self {
268+
DecodedRequest::WithBody(r) => r.into_body(),
269+
DecodedRequest::WithoutBody(_, s) => s,
270+
}
229271
}
230272
}

0 commit comments

Comments
 (0)