Skip to content

Commit 9c0d2ef

Browse files
committed
Add a start_request function for requests with streaming output bodies.
1 parent cf36c89 commit 9c0d2ef

File tree

1 file changed

+85
-5
lines changed

1 file changed

+85
-5
lines changed

src/http/client.rs

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1-
use super::{body::IncomingBody, Body, Error, Request, Response, Result};
1+
use super::{
2+
body::{BodyForthcoming, IncomingBody, OutgoingBody},
3+
fields::header_map_to_wasi,
4+
Body, Error, HeaderMap, Request, Response, Result,
5+
};
26
use crate::http::request::try_into_outgoing;
37
use crate::http::response::try_from_incoming;
48
use crate::io::{self, AsyncOutputStream, AsyncPollable};
59
use crate::time::Duration;
6-
use wasi::http::types::{OutgoingBody, RequestOptions as WasiRequestOptions};
10+
use wasi::http::types::{
11+
FutureIncomingResponse as WasiFutureIncomingResponse, OutgoingBody as WasiOutgoingBody,
12+
RequestOptions as WasiRequestOptions,
13+
};
714

815
/// An HTTP client.
916
// Empty for now, but permits adding support for RequestOptions soon:
@@ -19,22 +26,27 @@ impl Client {
1926
}
2027

2128
/// Send an HTTP request.
29+
///
30+
/// TODO: Should this automatically add a "Content-Length" header if the
31+
/// body size is known?
32+
///
33+
/// To respond with trailers, use [`Client::start_request`] instead.
2234
pub async fn send<B: Body>(&self, req: Request<B>) -> Result<Response<IncomingBody>> {
2335
// We don't use `body::OutputBody` here because we can report I/O
2436
// errors from the `copy` directly.
2537
let (wasi_req, body) = try_into_outgoing(req)?;
2638
let wasi_body = wasi_req.body().unwrap();
27-
let body_stream = wasi_body.write().unwrap();
39+
let wasi_stream = wasi_body.write().unwrap();
2840

2941
// 1. Start sending the request head
3042
let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap();
3143

3244
// 2. Start sending the request body
33-
io::copy(body, AsyncOutputStream::new(body_stream)).await?;
45+
io::copy(body, AsyncOutputStream::new(wasi_stream)).await?;
3446

3547
// 3. Finish sending the request body
3648
let trailers = None;
37-
OutgoingBody::finish(wasi_body, trailers).unwrap();
49+
WasiOutgoingBody::finish(wasi_body, trailers).unwrap();
3850

3951
// 4. Receive the response
4052
AsyncPollable::new(res.subscribe()).wait_for().await;
@@ -46,6 +58,55 @@ impl Client {
4658
try_from_incoming(res)
4759
}
4860

61+
/// Start sending an HTTP request, and return an `OutgoingBody` stream to
62+
/// write the body to.
63+
///
64+
/// The returned `OutgoingBody` must be consumed by [`Client::finish`] or
65+
/// [`Client::fail`].
66+
pub async fn start_request(
67+
&self,
68+
req: Request<BodyForthcoming>,
69+
) -> Result<(OutgoingBody, FutureIncomingResponse)> {
70+
let (wasi_req, _body_forthcoming) = try_into_outgoing(req)?;
71+
let wasi_body = wasi_req.body().unwrap();
72+
let wasi_stream = wasi_body.write().unwrap();
73+
74+
// Start sending the request head.
75+
let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap();
76+
77+
let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body);
78+
79+
Ok((outgoing_body, FutureIncomingResponse(res)))
80+
}
81+
82+
/// Finish the body, optionally with trailers.
83+
///
84+
/// This is used with [`Client::start_request`].
85+
pub fn finish(body: OutgoingBody, trailers: Option<HeaderMap>) -> Result<()> {
86+
let (stream, body) = body.consume();
87+
88+
// The stream is a child resource of the `OutgoingBody`, so ensure that
89+
// it's dropped first.
90+
drop(stream);
91+
92+
let wasi_trailers = match trailers {
93+
Some(trailers) => Some(header_map_to_wasi(&trailers)?),
94+
None => None,
95+
};
96+
97+
wasi::http::types::OutgoingBody::finish(body, wasi_trailers)
98+
.expect("body length did not match Content-Length header value");
99+
Ok(())
100+
}
101+
102+
/// Consume the `OutgoingBody` and indicate that the body was not
103+
/// completed.
104+
///
105+
/// This is used with [`Client::start_request`].
106+
pub fn fail(body: OutgoingBody) {
107+
let (_stream, _body) = body.consume();
108+
}
109+
49110
/// Set timeout on connecting to HTTP server
50111
pub fn set_connect_timeout(&mut self, d: impl Into<Duration>) {
51112
self.options_mut().connect_timeout = Some(d.into());
@@ -76,6 +137,25 @@ impl Client {
76137
}
77138
}
78139

140+
/// Returned from [`Client::start_request`], this represents a handle to a
141+
/// response which has not arrived yet. Call [`FutureIncomingResponse::get`]
142+
/// to wait for the response.
143+
pub struct FutureIncomingResponse(WasiFutureIncomingResponse);
144+
145+
impl FutureIncomingResponse {
146+
/// Consume this `FutureIncomingResponse`, wait, and return the `Response`.
147+
pub async fn get(self) -> Result<Response<IncomingBody>> {
148+
// Wait for the response.
149+
AsyncPollable::new(self.0.subscribe()).wait_for().await;
150+
151+
// NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
152+
// is to trap if we try and get the response more than once. The final
153+
// `?` is to raise the actual error if there is one.
154+
let res = self.0.get().unwrap().unwrap()?;
155+
try_from_incoming(res)
156+
}
157+
}
158+
79159
#[derive(Default, Debug)]
80160
struct RequestOptions {
81161
connect_timeout: Option<Duration>,

0 commit comments

Comments
 (0)