Skip to content

Commit d4cf159

Browse files
committed
Switch from FutureIncomingResponse to impl Future.
1 parent 0b2476d commit d4cf159

File tree

1 file changed

+30
-21
lines changed

1 file changed

+30
-21
lines changed

src/http/client.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ use super::{
66
use crate::http::request::try_into_outgoing;
77
use crate::http::response::try_from_incoming;
88
use crate::io::{self, AsyncOutputStream, AsyncPollable};
9+
use crate::runtime::WaitFor;
910
use crate::time::Duration;
11+
use std::future::Future;
12+
use std::pin::Pin;
13+
use std::task::{Context, Poll};
1014
use wasi::http::types::{
1115
FutureIncomingResponse as WasiFutureIncomingResponse, OutgoingBody as WasiOutgoingBody,
1216
RequestOptions as WasiRequestOptions,
@@ -66,7 +70,10 @@ impl Client {
6670
pub async fn start_request(
6771
&self,
6872
req: Request<BodyForthcoming>,
69-
) -> Result<(OutgoingBody, FutureIncomingResponse)> {
73+
) -> Result<(
74+
OutgoingBody,
75+
impl Future<Output = Result<Response<IncomingBody>>>,
76+
)> {
7077
let (wasi_req, _body_forthcoming) = try_into_outgoing(req)?;
7178
let wasi_body = wasi_req.body().unwrap();
7279
let wasi_stream = wasi_body.write().unwrap();
@@ -76,7 +83,28 @@ impl Client {
7683

7784
let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body);
7885

79-
Ok((outgoing_body, FutureIncomingResponse(res)))
86+
struct IncomingResponseFuture {
87+
subscription: WaitFor,
88+
wasi: WasiFutureIncomingResponse,
89+
}
90+
impl Future for IncomingResponseFuture {
91+
type Output = Result<Response<IncomingBody>>;
92+
93+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
94+
match pin_project(self.subscription).poll(cx) {
95+
Poll::Pending => Poll::Pending,
96+
Poll::Ready(response) => Poll::Ready(try_from_incoming(response)),
97+
}
98+
}
99+
}
100+
101+
let subscription = AsyncPollable::new(res.subscribe()).wait_for();
102+
let future = IncomingResponseFuture {
103+
subscription,
104+
wasi: res,
105+
};
106+
107+
Ok((outgoing_body, future))
80108
}
81109

82110
/// Finish the body, optionally with trailers.
@@ -137,25 +165,6 @@ impl Client {
137165
}
138166
}
139167

140-
/// Returned from [`Client::start_request`], this represents a handle to a
141-
/// response which has not arrived yet. Call [`FutureIncomingResponse::get`]
142-
/// to wait for the response.
143-
pub struct FutureIncomingResponse(WasiFutureIncomingResponse);
144-
145-
impl FutureIncomingResponse {
146-
/// Consume this `FutureIncomingResponse`, wait, and return the `Response`.
147-
pub async fn get(self) -> Result<Response<IncomingBody>> {
148-
// Wait for the response.
149-
AsyncPollable::new(self.0.subscribe()).wait_for().await;
150-
151-
// NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
152-
// is to trap if we try and get the response more than once. The final
153-
// `?` is to raise the actual error if there is one.
154-
let res = self.0.get().unwrap().unwrap()?;
155-
try_from_incoming(res)
156-
}
157-
}
158-
159168
#[derive(Default, Debug)]
160169
struct RequestOptions {
161170
connect_timeout: Option<Duration>,

0 commit comments

Comments
 (0)