1
1
//! Process HTTP connections on the server.
2
2
3
3
use async_std:: future:: { timeout, Future , TimeoutError } ;
4
- use async_std:: io:: { self , BufRead , BufReader } ;
4
+ use async_std:: io:: { self , BufReader } ;
5
5
use async_std:: io:: { Read , Write } ;
6
6
use async_std:: prelude:: * ;
7
7
use async_std:: task:: { Context , Poll } ;
8
8
use futures_core:: ready;
9
- use http_types:: {
10
- headers:: { HeaderName , HeaderValue , CONTENT_TYPE } ,
11
- Body , Method , Request , Response ,
12
- } ;
13
- use std:: fmt;
9
+ use http_types:: headers:: { HeaderName , HeaderValue , CONTENT_TYPE } ;
10
+ use http_types:: { Body , Method , Request , Response } ;
14
11
use std:: str:: FromStr ;
15
12
use std:: time:: Duration ;
16
13
@@ -29,49 +26,34 @@ pub async fn accept<R, W, F, Fut>(
29
26
endpoint : F ,
30
27
) -> Result < ( ) , Exception >
31
28
where
32
- R : Read + Unpin + Send + ' static ,
29
+ R : Read + Unpin + Send + Sync + ' static + Clone ,
33
30
W : Write + Unpin ,
34
- F : for < ' a > Fn ( & ' a mut Request ) -> Fut ,
31
+ F : for < ' a > Fn ( Request ) -> Fut ,
35
32
Fut : Future < Output = Result < Response , Exception > > ,
36
33
{
37
34
// TODO: make configurable
38
35
let timeout_duration = Duration :: from_secs ( 10 ) ;
39
36
const MAX_REQUESTS : usize = 200 ;
40
37
let mut num_requests = 0 ;
41
38
42
- // Decode a request. This may be the first of many since
43
- // the connection is Keep-Alive by default
44
- let decoded = decode ( addr, reader) . await ?;
45
- // Decode returns one of three things;
46
- // * A request with its body reader set to the underlying TCP stream
47
- // * A request with an empty body AND the underlying stream
48
- // * No request (because of the stream closed) and no underlying stream
49
- if let Some ( mut decoded) = decoded {
39
+ // Decode a request. This may be the first of many since the connection is Keep-Alive by default.
40
+ let r = reader. clone ( ) ;
41
+ let req = decode ( addr, r) . await ?;
42
+ if let Some ( mut req) = req {
50
43
loop {
51
- num_requests += 1 ;
52
- if num_requests > MAX_REQUESTS {
53
- // We've exceeded the max number of requests per connection
54
- return Ok ( ( ) ) ;
55
- }
44
+ match num_requests {
45
+ MAX_REQUESTS => return Ok ( ( ) ) ,
46
+ _ => num_requests += 1 ,
47
+ } ;
56
48
57
- // Pass the request to the user defined request handler endpoint.
58
- // Encode the response we get back.
59
49
// TODO: what to do when the endpoint returns Err
60
- let res = endpoint ( decoded . mut_request ( ) ) . await ?;
50
+ let res = endpoint ( req ) . await ?;
61
51
let mut encoder = Encoder :: encode ( res) ;
62
-
63
- // If we have reference to the stream, unwrap it. Otherwise,
64
- // get the underlying stream from the request
65
- let to_decode = decoded. into_reader ( ) ;
66
-
67
- // Copy the response into the writer
68
- // TODO: don't double wrap BufReaders, but instead write a version of
69
- // io::copy that expects a BufReader.
70
52
io:: copy ( & mut encoder, & mut writer) . await ?;
71
53
72
54
// Decode a new request, timing out if this takes longer than the
73
55
// timeout duration.
74
- decoded = match timeout ( timeout_duration, decode ( addr, to_decode ) ) . await {
56
+ req = match timeout ( timeout_duration, decode ( addr, reader . clone ( ) ) ) . await {
75
57
Ok ( Ok ( Some ( r) ) ) => r,
76
58
Ok ( Ok ( None ) ) | Err ( TimeoutError { .. } ) => break , /* EOF or timeout */
77
59
Ok ( Err ( e) ) => return Err ( e) ,
@@ -352,7 +334,7 @@ impl Read for Encoder {
352
334
const HTTP_1_1_VERSION : u8 = 1 ;
353
335
354
336
/// Decode an HTTP request on the server.
355
- async fn decode < R > ( addr : & str , reader : R ) -> Result < Option < DecodedRequest > , Exception >
337
+ async fn decode < R > ( addr : & str , reader : R ) -> Result < Option < Request > , Exception >
356
338
where
357
339
R : Read + Unpin + Send + ' static ,
358
340
{
@@ -403,49 +385,10 @@ where
403
385
// it with a known length, or need to use chunked encoding.
404
386
let len = match req. header ( & CONTENT_TYPE ) {
405
387
Some ( len) => len. last ( ) . unwrap ( ) . as_str ( ) . parse :: < usize > ( ) ?,
406
- None => return Ok ( Some ( DecodedRequest :: WithoutBody ( req, Box :: new ( reader ) ) ) ) ,
388
+ None => return Ok ( Some ( req) ) ,
407
389
} ;
408
390
req. set_body ( Body :: from_reader ( reader) ) ;
409
391
req. set_len ( len) ;
410
392
411
- Ok ( Some ( DecodedRequest :: WithBody ( req) ) )
412
- }
413
-
414
- /// A decoded request
415
- enum DecodedRequest {
416
- /// The TCP connection is inside the request already, so the lifetimes match up.
417
- WithBody ( Request ) ,
418
- /// The TCP connection is *not* inside the request body, so we need to pass
419
- /// it along with it to make the lifetimes match up.
420
- WithoutBody ( Request , Box < dyn BufRead + Unpin + Send + ' static > ) ,
421
- }
422
-
423
- impl fmt:: Debug for DecodedRequest {
424
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
425
- match self {
426
- DecodedRequest :: WithBody ( _) => write ! ( f, "WithBody" ) ,
427
- DecodedRequest :: WithoutBody ( _, _) => write ! ( f, "WithoutBody" ) ,
428
- }
429
- }
430
- }
431
-
432
- impl DecodedRequest {
433
- /// Get a mutable reference to the request
434
- fn mut_request ( & mut self ) -> & mut Request {
435
- match self {
436
- DecodedRequest :: WithBody ( r) => r,
437
- DecodedRequest :: WithoutBody ( r, _) => r,
438
- }
439
- }
440
-
441
- /// Consume self and get access to the underlying reader
442
- ///
443
- /// When the request has a body, the underlying reader is the body.
444
- /// When it does not, the underlying body has been passed alongside the request.
445
- fn into_reader ( self ) -> Box < dyn BufRead + Unpin + Send + ' static > {
446
- match self {
447
- DecodedRequest :: WithBody ( r) => r. into_body ( ) . into_reader ( ) ,
448
- DecodedRequest :: WithoutBody ( _, s) => s,
449
- }
450
- }
393
+ Ok ( Some ( req) )
451
394
}
0 commit comments