1
1
//! Process HTTP connections on the server.
2
2
3
3
use async_std:: future:: { timeout, Future , TimeoutError } ;
4
- use async_std:: io:: { self , BufReader } ;
4
+ use async_std:: io:: { self , BufRead , BufReader } ;
5
5
use async_std:: io:: { Read , Write } ;
6
6
use async_std:: prelude:: * ;
7
7
use async_std:: task:: { Context , Poll } ;
8
8
use futures_core:: ready;
9
- use http_types:: { HttpVersion , Method , Request , Response } ;
10
- use std:: convert :: TryFrom ;
9
+ use http_types:: { Method , Request , Response } ;
10
+ use std:: str :: FromStr ;
11
11
use std:: time:: Duration ;
12
12
13
13
use std:: pin:: Pin ;
31
31
32
32
let req = decode ( reader) . await ?;
33
33
let mut num_requests = 0 ;
34
- if let Some ( mut req) = req {
34
+ if let Some ( ( mut req, stream) ) = req {
35
+ let mut stream: Option < Box < dyn BufRead + Unpin + Send + ' static > > = match stream {
36
+ Some ( s) => Some ( Box :: new ( s) ) ,
37
+ None => None ,
38
+ } ;
35
39
loop {
36
40
num_requests += 1 ;
37
41
if num_requests > MAX_REQUESTS {
@@ -40,13 +44,22 @@ where
40
44
41
45
// TODO: what to do when the callback returns Err
42
46
let mut res = encode ( callback ( & mut req) . await ?) . await ?;
43
- let stream = req. into_body ( ) ;
47
+ let to_decode = match stream {
48
+ None => req. into_body ( ) ,
49
+ Some ( s) => s,
50
+ } ;
44
51
io:: copy ( & mut res, & mut writer) . await ?;
45
- req = match timeout ( timeout_duration, decode ( stream) ) . await {
52
+ let ( new_request, new_stream) = match timeout ( timeout_duration, decode ( to_decode) ) . await
53
+ {
46
54
Ok ( Ok ( Some ( r) ) ) => r,
47
55
Ok ( Ok ( None ) ) | Err ( TimeoutError { .. } ) => break , /* EOF or timeout */
48
56
Ok ( Err ( e) ) => return Err ( e) ,
49
57
} ;
58
+ req = new_request;
59
+ stream = match new_stream {
60
+ Some ( s) => Some ( Box :: new ( s) ) ,
61
+ None => None ,
62
+ } ;
50
63
}
51
64
}
52
65
@@ -148,7 +161,7 @@ pub async fn encode(res: Response) -> io::Result<Encoder> {
148
161
}
149
162
150
163
/// Decode an HTTP request on the server.
151
- pub async fn decode < R > ( reader : R ) -> Result < Option < Request > , Exception >
164
+ pub async fn decode < R > ( reader : R ) -> Result < Option < ( Request , Option < BufReader < R > > ) > , Exception >
152
165
where
153
166
R : Read + Unpin + Send + ' static ,
154
167
{
@@ -184,11 +197,10 @@ where
184
197
let uri = httparse_req. path . ok_or_else ( || "No uri found" ) ?;
185
198
let uri = url:: Url :: parse ( uri) ?;
186
199
let version = httparse_req. version . ok_or_else ( || "No version found" ) ?;
187
- let version = match version {
188
- 1 => HttpVersion :: HTTP1_1 ,
189
- _ => return Err ( "Unsupported HTTP version" . into ( ) ) ,
190
- } ;
191
- let mut req = Request :: new ( version, Method :: try_from ( method) ?, uri) ;
200
+ if version != 1 {
201
+ return Err ( "Unsupported HTTP version" . into ( ) ) ;
202
+ }
203
+ let mut req = Request :: new ( Method :: from_str ( method) ?, uri) ;
192
204
for header in httparse_req. headers . iter ( ) {
193
205
req = req. set_header ( header. name , std:: str:: from_utf8 ( header. value ) ?) ?;
194
206
}
@@ -203,14 +215,16 @@ where
203
215
. ok ( )
204
216
. and_then ( |s| s. parse :: < usize > ( ) . ok ( ) ) ;
205
217
206
- if let Some ( _len) = length {
207
- // TODO: set len
218
+ if let Some ( len) = length {
208
219
req = req. set_body ( reader) ;
220
+ req = req. set_len ( len) ;
221
+
222
+ // Return the request.
223
+ Ok ( Some ( ( req, None ) ) )
209
224
} else {
210
225
return Err ( "Invalid value for Content-Length" . into ( ) ) ;
211
226
}
212
- } ;
213
-
214
- // Return the request.
215
- Ok ( Some ( req) )
227
+ } else {
228
+ Ok ( Some ( ( req, Some ( reader) ) ) )
229
+ }
216
230
}
0 commit comments