1
1
//! HTTP body types
2
2
3
- use crate :: io:: { AsyncInputStream , AsyncRead , Cursor , Empty } ;
3
+ use crate :: http:: fields:: header_map_from_wasi;
4
+ use crate :: io:: { AsyncInputStream , AsyncOutputStream , AsyncRead , AsyncWrite , Cursor , Empty } ;
5
+ use crate :: runtime:: AsyncPollable ;
4
6
use core:: fmt;
5
7
use http:: header:: { CONTENT_LENGTH , TRANSFER_ENCODING } ;
6
8
use wasi:: http:: types:: IncomingBody as WasiIncomingBody ;
@@ -116,9 +118,9 @@ impl Body for Empty {
116
118
pub struct IncomingBody {
117
119
kind : BodyKind ,
118
120
// IMPORTANT: the order of these fields here matters. `body_stream` must
119
- // be dropped before `_incoming_body `.
121
+ // be dropped before `incoming_body `.
120
122
body_stream : AsyncInputStream ,
121
- _incoming_body : WasiIncomingBody ,
123
+ incoming_body : WasiIncomingBody ,
122
124
}
123
125
124
126
impl IncomingBody {
@@ -130,9 +132,29 @@ impl IncomingBody {
130
132
Self {
131
133
kind,
132
134
body_stream,
133
- _incoming_body : incoming_body,
135
+ incoming_body,
134
136
}
135
137
}
138
+
139
+ /// Consume this `IncomingBody` and return the trailers, if present.
140
+ pub async fn finish ( self ) -> Result < Option < HeaderMap > , Error > {
141
+ // The stream is a child resource of the `IncomingBody`, so ensure that
142
+ // it's dropped first.
143
+ drop ( self . body_stream ) ;
144
+
145
+ let trailers = WasiIncomingBody :: finish ( self . incoming_body ) ;
146
+
147
+ AsyncPollable :: new ( trailers. subscribe ( ) ) . wait_for ( ) . await ;
148
+
149
+ let trailers = trailers. get ( ) . unwrap ( ) . unwrap ( ) ?;
150
+
151
+ let trailers = match trailers {
152
+ None => None ,
153
+ Some ( trailers) => Some ( header_map_from_wasi ( trailers) ?) ,
154
+ } ;
155
+
156
+ Ok ( trailers)
157
+ }
136
158
}
137
159
138
160
impl AsyncRead for IncomingBody {
@@ -177,3 +199,79 @@ impl From<InvalidContentLength> for Error {
177
199
ErrorVariant :: Other ( e. to_string ( ) ) . into ( )
178
200
}
179
201
}
202
+
203
+ /// The output stream for the body, implementing [`AsyncWrite`]. Call
204
+ /// [`Responder::start_response`] to obtain one. Once the body is complete,
205
+ /// it must be declared finished, using [`OutgoingBody::finish`].
206
+ #[ must_use]
207
+ pub struct OutgoingBody {
208
+ // IMPORTANT: the order of these fields here matters. `stream` must
209
+ // be dropped before `body`.
210
+ stream : AsyncOutputStream ,
211
+ body : wasi:: http:: types:: OutgoingBody ,
212
+ dontdrop : DontDropOutgoingBody ,
213
+ }
214
+
215
+ impl OutgoingBody {
216
+ pub ( crate ) fn new ( stream : AsyncOutputStream , body : wasi:: http:: types:: OutgoingBody ) -> Self {
217
+ Self {
218
+ stream,
219
+ body,
220
+ dontdrop : DontDropOutgoingBody ,
221
+ }
222
+ }
223
+
224
+ pub ( crate ) fn consume ( self ) -> ( AsyncOutputStream , wasi:: http:: types:: OutgoingBody ) {
225
+ let Self {
226
+ stream,
227
+ body,
228
+ dontdrop,
229
+ } = self ;
230
+
231
+ std:: mem:: forget ( dontdrop) ;
232
+
233
+ ( stream, body)
234
+ }
235
+
236
+ /// Return a reference to the underlying `AsyncOutputStream`.
237
+ ///
238
+ /// This usually isn't needed, as `OutgoingBody` implements `AsyncWrite`
239
+ /// too, however it is useful for code that expects to work with
240
+ /// `AsyncOutputStream` specifically.
241
+ pub fn stream ( & mut self ) -> & mut AsyncOutputStream {
242
+ & mut self . stream
243
+ }
244
+ }
245
+
246
+ impl AsyncWrite for OutgoingBody {
247
+ async fn write ( & mut self , buf : & [ u8 ] ) -> crate :: io:: Result < usize > {
248
+ self . stream . write ( buf) . await
249
+ }
250
+
251
+ async fn flush ( & mut self ) -> crate :: io:: Result < ( ) > {
252
+ self . stream . flush ( ) . await
253
+ }
254
+
255
+ fn as_async_output_stream ( & self ) -> Option < & AsyncOutputStream > {
256
+ Some ( & self . stream )
257
+ }
258
+ }
259
+
260
+ /// A utility to ensure that `OutgoingBody` is either finished or failed, and
261
+ /// not implicitly dropped.
262
+ struct DontDropOutgoingBody ;
263
+
264
+ impl Drop for DontDropOutgoingBody {
265
+ fn drop ( & mut self ) {
266
+ unreachable ! ( "`OutgoingBody::drop` called; `OutgoingBody`s should be consumed with `finish` or `fail`." ) ;
267
+ }
268
+ }
269
+
270
+ /// A placeholder for use as the type parameter to [`Response`] to indicate
271
+ /// that the body has not yet started. This is used with
272
+ /// [`Responder::start_response`], which has a `Response<BodyForthcoming>`
273
+ /// argument.
274
+ ///
275
+ /// To instead start the response and obtain the output stream for the body,
276
+ /// use [`Responder::respond`].
277
+ pub struct BodyForthcoming ;
0 commit comments