1
1
//! Process HTTP connections on the server.
2
2
3
- use async_std:: future:: Future ;
3
+ use async_std:: future:: { timeout , Future , TimeoutError } ;
4
4
use async_std:: io:: { self , BufReader } ;
5
5
use async_std:: io:: { Read , Write } ;
6
6
use async_std:: prelude:: * ;
7
7
use async_std:: task:: { Context , Poll } ;
8
8
use futures_core:: ready;
9
9
use http:: { Request , Response , Version } ;
10
- use std:: time:: { Duration , Instant } ;
10
+ use std:: time:: Duration ;
11
11
12
12
use std:: pin:: Pin ;
13
13
@@ -25,36 +25,27 @@ where
25
25
Fut : Future < Output = Result < Response < Body < O > > , Exception > > ,
26
26
O : Read + Unpin + Send ,
27
27
{
28
+ // TODO: make configurable
29
+ let timeout_duration = Duration :: from_secs ( 10 ) ;
30
+ const MAX_REQUESTS : usize = 200 ;
31
+
28
32
let req = decode ( reader) . await ?;
29
- if let RequestOrReader :: Request ( mut req) = req {
30
- let headers = req. headers ( ) ;
31
- let timeout = match ( headers. get ( "Connection" ) , headers. get ( "Keep-Alive" ) ) {
32
- ( Some ( connection) , Some ( _v) )
33
- if connection == http:: header:: HeaderValue :: from_static ( "Keep-Alive" ) =>
34
- {
35
- // TODO: parse timeout
36
- Duration :: from_secs ( 5 )
33
+ let mut num_requests = 0 ;
34
+ if let Some ( mut req) = req {
35
+ loop {
36
+ num_requests += 1 ;
37
+ if num_requests > MAX_REQUESTS {
38
+ return Ok ( ( ) ) ;
37
39
}
38
- _ => Duration :: from_secs ( 5 ) ,
39
- } ;
40
40
41
- let beginning = Instant :: now ( ) ;
42
- loop {
43
41
// TODO: what to do when the callback returns Err
44
42
let mut res = encode ( callback ( & mut req) . await ?) . await ?;
45
43
io:: copy ( & mut res, writer) . await ?;
46
- let mut stream = req. into_body ( ) . into_reader ( ) . into_inner ( ) ;
47
- req = loop {
48
- match decode ( stream) . await ? {
49
- RequestOrReader :: Request ( r) => break r,
50
- RequestOrReader :: Reader ( r) => {
51
- let now = Instant :: now ( ) ;
52
- if now - beginning > timeout {
53
- return Ok ( ( ) ) ;
54
- }
55
- stream = r;
56
- }
57
- }
44
+ let stream = req. into_body ( ) . into_reader ( ) . into_inner ( ) ;
45
+ req = match timeout ( timeout_duration, decode ( stream) ) . await {
46
+ Ok ( Ok ( Some ( r) ) ) => r,
47
+ Ok ( Ok ( None ) ) | Err ( TimeoutError { .. } ) => break , /* EOF or timeout */
48
+ Ok ( Err ( e) ) => return Err ( e) ,
58
49
} ;
59
50
}
60
51
}
@@ -165,14 +156,8 @@ where
165
156
Ok ( Encoder :: new ( buf, res. into_body ( ) ) )
166
157
}
167
158
168
- #[ derive( Debug ) ]
169
- pub enum RequestOrReader < R : Read > {
170
- Request ( Request < Body < BufReader < R > > > ) ,
171
- Reader ( R ) ,
172
- }
173
-
174
159
/// Decode an HTTP request on the server.
175
- pub async fn decode < R > ( reader : R ) -> Result < RequestOrReader < R > , Exception >
160
+ pub async fn decode < R > ( reader : R ) -> Result < Option < Request < Body < BufReader < R > > > > , Exception >
176
161
where
177
162
R : Read + Unpin + Send ,
178
163
{
@@ -186,7 +171,7 @@ where
186
171
let bytes_read = reader. read_until ( b'\n' , & mut buf) . await ?;
187
172
// No more bytes are yielded from the stream.
188
173
if bytes_read == 0 {
189
- return Ok ( RequestOrReader :: Reader ( reader . into_inner ( ) ) ) ;
174
+ return Ok ( None ) ;
190
175
}
191
176
192
177
// We've hit the end delimiter of the stream.
@@ -232,5 +217,5 @@ where
232
217
} ;
233
218
234
219
// Return the request.
235
- Ok ( RequestOrReader :: Request ( req. body ( body) ?) )
220
+ Ok ( Some ( req. body ( body) ?) )
236
221
}
0 commit comments