1
1
//! Process HTTP connections on the server.
2
2
3
- use async_std:: future:: Future ;
3
+ use async_std:: future:: { timeout , Future , TimeoutError } ;
4
4
use async_std:: io:: { self , BufReader } ;
5
5
use async_std:: io:: { Read , Write } ;
6
6
use async_std:: prelude:: * ;
7
7
use async_std:: task:: { Context , Poll } ;
8
8
use futures_core:: ready;
9
9
use http:: { Request , Response , Version } ;
10
- use std:: time:: { Duration , Instant } ;
10
+ use std:: time:: Duration ;
11
11
12
12
use std:: pin:: Pin ;
13
13
@@ -26,35 +26,27 @@ where
26
26
O : Read + Unpin + Send ,
27
27
{
28
28
let req = decode ( reader) . await ?;
29
- if let RequestOrReader :: Request ( mut req) = req {
29
+ if let Some ( mut req) = req {
30
30
let headers = req. headers ( ) ;
31
- let timeout = match ( headers. get ( "Connection" ) , headers. get ( "Keep-Alive" ) ) {
31
+ let timeout_duration = match ( headers. get ( "Connection" ) , headers. get ( "Keep-Alive" ) ) {
32
32
( Some ( connection) , Some ( _v) )
33
33
if connection == http:: header:: HeaderValue :: from_static ( "Keep-Alive" ) =>
34
34
{
35
35
// TODO: parse timeout
36
- Duration :: from_secs ( 5 )
36
+ Duration :: from_secs ( 10 )
37
37
}
38
- _ => Duration :: from_secs ( 5 ) ,
38
+ _ => Duration :: from_secs ( 10 ) ,
39
39
} ;
40
40
41
- let beginning = Instant :: now ( ) ;
42
41
loop {
43
42
// TODO: what to do when the callback returns Err
44
43
let mut res = encode ( callback ( & mut req) . await ?) . await ?;
45
44
io:: copy ( & mut res, writer) . await ?;
46
- let mut stream = req. into_body ( ) . into_reader ( ) . into_inner ( ) ;
47
- req = loop {
48
- match decode ( stream) . await ? {
49
- RequestOrReader :: Request ( r) => break r,
50
- RequestOrReader :: Reader ( r) => {
51
- let now = Instant :: now ( ) ;
52
- if now - beginning > timeout {
53
- return Ok ( ( ) ) ;
54
- }
55
- stream = r;
56
- }
57
- }
45
+ let stream = req. into_body ( ) . into_reader ( ) . into_inner ( ) ;
46
+ req = match timeout ( timeout_duration, decode ( stream) ) . await {
47
+ Ok ( Ok ( Some ( r) ) ) => r,
48
+ Ok ( Ok ( None ) ) | Err ( TimeoutError { .. } ) => break , /* EOF or timeout */
49
+ Ok ( Err ( e) ) => return Err ( e) ,
58
50
} ;
59
51
}
60
52
}
@@ -165,14 +157,8 @@ where
165
157
Ok ( Encoder :: new ( buf, res. into_body ( ) ) )
166
158
}
167
159
168
- #[ derive( Debug ) ]
169
- pub enum RequestOrReader < R : Read > {
170
- Request ( Request < Body < BufReader < R > > > ) ,
171
- Reader ( R ) ,
172
- }
173
-
174
160
/// Decode an HTTP request on the server.
175
- pub async fn decode < R > ( reader : R ) -> Result < RequestOrReader < R > , Exception >
161
+ pub async fn decode < R > ( reader : R ) -> Result < Option < Request < Body < BufReader < R > > > > , Exception >
176
162
where
177
163
R : Read + Unpin + Send ,
178
164
{
@@ -186,7 +172,7 @@ where
186
172
let bytes_read = reader. read_until ( b'\n' , & mut buf) . await ?;
187
173
// No more bytes are yielded from the stream.
188
174
if bytes_read == 0 {
189
- return Ok ( RequestOrReader :: Reader ( reader . into_inner ( ) ) ) ;
175
+ return Ok ( None ) ;
190
176
}
191
177
192
178
// We've hit the end delimiter of the stream.
@@ -232,5 +218,5 @@ where
232
218
} ;
233
219
234
220
// Return the request.
235
- Ok ( RequestOrReader :: Request ( req. body ( body) ?) )
221
+ Ok ( Some ( req. body ( body) ?) )
236
222
}
0 commit comments