@@ -19,7 +19,7 @@ pub async fn connect<R, W, F, Fut>(
19
19
addr : & str ,
20
20
reader : R ,
21
21
mut writer : W ,
22
- callback : F ,
22
+ endpoint : F ,
23
23
) -> Result < ( ) , Exception >
24
24
where
25
25
R : Read + Unpin + Send + ' static ,
@@ -47,17 +47,20 @@ where
47
47
return Ok ( ( ) ) ;
48
48
}
49
49
50
- // Pass the request to the user defined request handler callback .
50
+ // Pass the request to the user defined request handler endpoint .
51
51
// Encode the response we get back.
52
- // TODO: what to do when the callback returns Err
53
- let mut res = encode ( callback ( decoded. mut_request ( ) ) . await ?) . await ?;
52
+ // TODO: what to do when the endpoint returns Err
53
+ let res = endpoint ( decoded. mut_request ( ) ) . await ?;
54
+ let mut encoder = Encoder :: encode ( res) ;
54
55
55
56
// If we have reference to the stream, unwrap it. Otherwise,
56
57
// get the underlying stream from the request
57
58
let to_decode = decoded. into_reader ( ) ;
58
59
59
60
// Copy the response into the writer
60
- io:: copy ( & mut res, & mut writer) . await ?;
61
+ // TODO: don't double wrap BufReaders, but instead write a version of
62
+ // io::copy that expects a BufReader.
63
+ io:: copy ( & mut encoder, & mut writer) . await ?;
61
64
62
65
// Decode a new request, timing out if this takes longer than the
63
66
// timeout duration.
@@ -81,11 +84,11 @@ pub struct Encoder {
81
84
/// Keep track how far we've indexed into the headers + body.
82
85
cursor : usize ,
83
86
/// HTTP headers to be sent.
84
- headers : Vec < u8 > ,
87
+ head : Option < Vec < u8 > > ,
85
88
/// Check whether we're done sending headers.
86
- headers_done : bool ,
89
+ head_done : bool ,
87
90
/// Response containing the HTTP body to be sent.
88
- response : Response ,
91
+ res : Response ,
89
92
/// Check whether we're done with the body.
90
93
body_done : bool ,
91
94
/// Keep track of how many bytes have been read from the body stream.
@@ -94,16 +97,44 @@ pub struct Encoder {
94
97
95
98
impl Encoder {
96
99
/// Create a new instance.
97
- pub ( crate ) fn new ( headers : Vec < u8 > , response : Response ) -> Self {
100
+ pub ( crate ) fn encode ( res : Response ) -> Self {
98
101
Self {
99
- response ,
100
- headers ,
102
+ res ,
103
+ head : None ,
101
104
cursor : 0 ,
102
- headers_done : false ,
105
+ head_done : false ,
103
106
body_done : false ,
104
107
body_bytes_read : 0 ,
105
108
}
106
109
}
110
+
111
+ fn encode_head ( & mut self ) -> io:: Result < ( ) > {
112
+ let mut head: Vec < u8 > = vec ! [ ] ;
113
+
114
+ let reason = self . res . status ( ) . canonical_reason ( ) ;
115
+ let status = self . res . status ( ) ;
116
+ std:: io:: Write :: write_fmt ( & mut head, format_args ! ( "HTTP/1.1 {} {}\r \n " , status, reason) ) ?;
117
+
118
+ // If the body isn't streaming, we can set the content-length ahead of time. Else we need to
119
+ // send all items in chunks.
120
+ if let Some ( len) = self . res . len ( ) {
121
+ std:: io:: Write :: write_fmt ( & mut head, format_args ! ( "Content-Length: {}\r \n " , len) ) ?;
122
+ } else {
123
+ std:: io:: Write :: write_fmt ( & mut head, format_args ! ( "Transfer-Encoding: chunked\r \n " ) ) ?;
124
+ panic ! ( "chunked encoding is not implemented yet" ) ;
125
+ // See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
126
+ // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
127
+ }
128
+
129
+ for ( header, value) in self . res . headers ( ) . iter ( ) {
130
+ std:: io:: Write :: write_fmt ( & mut head, format_args ! ( "{}: {}\r \n " , header. as_str( ) , value) ) ?
131
+ }
132
+
133
+ std:: io:: Write :: write_fmt ( & mut head, format_args ! ( "\r \n " ) ) ?;
134
+
135
+ self . head = Some ( head) ;
136
+ Ok ( ( ) )
137
+ }
107
138
}
108
139
109
140
impl Read for Encoder {
@@ -112,22 +143,32 @@ impl Read for Encoder {
112
143
cx : & mut Context < ' _ > ,
113
144
buf : & mut [ u8 ] ,
114
145
) -> Poll < io:: Result < usize > > {
146
+ // Encode the headers to a buffer, the first time we poll
147
+ if let None = self . head {
148
+ self . encode_head ( ) ?;
149
+ }
150
+
115
151
// Send the headers. As long as the headers aren't fully sent yet we
116
152
// keep sending more of the headers.
117
153
let mut bytes_read = 0 ;
118
- if !self . headers_done {
119
- let len = std:: cmp:: min ( self . headers . len ( ) - self . cursor , buf. len ( ) ) ;
154
+
155
+ // Read from the serialized headers, url and methods.
156
+ if !self . head_done {
157
+ let head = self . head . as_ref ( ) . unwrap ( ) ;
158
+ let head_len = head. len ( ) ;
159
+ let len = std:: cmp:: min ( head. len ( ) - self . cursor , buf. len ( ) ) ;
120
160
let range = self . cursor ..self . cursor + len;
121
- buf[ 0 ..len] . copy_from_slice ( & mut self . headers [ range] ) ;
161
+ buf[ 0 ..len] . copy_from_slice ( & head [ range] ) ;
122
162
self . cursor += len;
123
- if self . cursor == self . headers . len ( ) {
124
- self . headers_done = true ;
163
+ if self . cursor == head_len {
164
+ self . head_done = true ;
125
165
}
126
166
bytes_read += len;
127
167
}
128
168
169
+ // Read from the AsyncRead impl on the inner Response struct.
129
170
if !self . body_done {
130
- let n = ready ! ( Pin :: new( & mut self . response ) . poll_read( cx, & mut buf[ bytes_read..] ) ) ?;
171
+ let n = ready ! ( Pin :: new( & mut self . res ) . poll_read( cx, & mut buf[ bytes_read..] ) ) ?;
131
172
bytes_read += n;
132
173
self . body_bytes_read += n;
133
174
if bytes_read == 0 {
@@ -139,39 +180,11 @@ impl Read for Encoder {
139
180
}
140
181
}
141
182
142
- /// Encode an HTTP request on the server.
143
- // TODO: return a reader in the response
144
- pub async fn encode ( res : Response ) -> io:: Result < Encoder > {
145
- let mut buf: Vec < u8 > = vec ! [ ] ;
146
-
147
- let reason = res. status ( ) . canonical_reason ( ) ;
148
- let status = res. status ( ) ;
149
- std:: io:: Write :: write_fmt ( & mut buf, format_args ! ( "HTTP/1.1 {} {}\r \n " , status, reason) ) ?;
150
-
151
- // If the body isn't streaming, we can set the content-length ahead of time. Else we need to
152
- // send all items in chunks.
153
- if let Some ( len) = res. len ( ) {
154
- std:: io:: Write :: write_fmt ( & mut buf, format_args ! ( "Content-Length: {}\r \n " , len) ) ?;
155
- } else {
156
- std:: io:: Write :: write_fmt ( & mut buf, format_args ! ( "Transfer-Encoding: chunked\r \n " ) ) ?;
157
- panic ! ( "chunked encoding is not implemented yet" ) ;
158
- // See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
159
- // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
160
- }
161
-
162
- for ( header, value) in res. headers ( ) . iter ( ) {
163
- std:: io:: Write :: write_fmt ( & mut buf, format_args ! ( "{}: {}\r \n " , header. as_str( ) , value) ) ?
164
- }
165
-
166
- std:: io:: Write :: write_fmt ( & mut buf, format_args ! ( "\r \n " ) ) ?;
167
- Ok ( Encoder :: new ( buf, res) )
168
- }
169
-
170
183
/// The number returned from httparse when the request is HTTP 1.1
171
184
const HTTP_1_1_VERSION : u8 = 1 ;
172
185
173
186
/// Decode an HTTP request on the server.
174
- pub async fn decode < R > ( addr : & str , reader : R ) -> Result < Option < DecodedRequest > , Exception >
187
+ async fn decode < R > ( addr : & str , reader : R ) -> Result < Option < DecodedRequest > , Exception >
175
188
where
176
189
R : Read + Unpin + Send + ' static ,
177
190
{
@@ -215,35 +228,22 @@ where
215
228
req = req. set_header ( header. name , std:: str:: from_utf8 ( header. value ) ?) ?;
216
229
}
217
230
218
- // Process the body if `Content-Length` was passed.
219
- if let Some ( content_length) = httparse_req
220
- . headers
221
- . iter ( )
222
- . find ( |h| h. name . eq_ignore_ascii_case ( "Content-Length" ) )
223
- {
224
- let length = std:: str:: from_utf8 ( content_length. value )
225
- . ok ( )
226
- . and_then ( |s| s. parse :: < usize > ( ) . ok ( ) ) ;
227
-
228
- if let Some ( len) = length {
229
- req = req. set_body_reader ( reader) ;
230
- req = req. set_len ( len) ;
231
-
232
- Ok ( Some ( DecodedRequest :: WithBody ( req) ) )
233
- } else {
234
- return Err ( "Invalid value for Content-Length" . into ( ) ) ;
235
- }
236
- } else {
237
- Ok ( Some ( DecodedRequest :: WithoutBody ( req, Box :: new ( reader) ) ) )
238
- }
231
+ // Check for content-length, that determines determines whether we can parse
232
+ // it with a known length, or need to use chunked encoding.
233
+ let len = match req. header ( "Content-Length" ) {
234
+ Some ( len) => len. parse :: < usize > ( ) ?,
235
+ None => return Ok ( Some ( DecodedRequest :: WithoutBody ( req, Box :: new ( reader) ) ) ) ,
236
+ } ;
237
+ req = req. set_body_reader ( reader) . set_len ( len) ;
238
+ Ok ( Some ( DecodedRequest :: WithBody ( req) ) )
239
239
}
240
240
241
- /// A decoded response
242
- ///
243
- /// Either a request with body stream OR a request without a
244
- /// a body stream paired with the underlying stream
245
- pub enum DecodedRequest {
241
+ /// A decoded request
242
+ enum DecodedRequest {
243
+ /// The TCP connection is inside the request already, so the lifetimes match up.
246
244
WithBody ( Request ) ,
245
+ /// The TCP connection is *not* inside the request body, so we need to pass
246
+ /// it along with it to make the lifetimes match up.
247
247
WithoutBody ( Request , Box < dyn BufRead + Unpin + Send + ' static > ) ,
248
248
}
249
249
0 commit comments