Skip to content

Commit 88d8bba

Browse files
committed
Allow sending OutgoingRequest bodies
Signed-off-by: Ryan Levick <[email protected]>
1 parent dc9c8be commit 88d8bba

File tree

3 files changed

+113
-49
lines changed

3 files changed

+113
-49
lines changed

sdk/rust/src/http.rs

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ use std::collections::HashMap;
55

66
#[doc(inline)]
77
pub use conversions::IntoResponse;
8-
9-
use self::conversions::TryFromIncomingResponse;
10-
11-
use super::wit::wasi::http::types;
128
#[doc(inline)]
139
pub use types::{
1410
Error, Fields, Headers, IncomingRequest, IncomingResponse, Method, OutgoingBody,
1511
OutgoingRequest, OutgoingResponse, Scheme, StatusCode, Trailers,
1612
};
1713

14+
use self::conversions::{TryFromIncomingResponse, TryIntoOutgoingRequest};
15+
use super::wit::wasi::http::types;
16+
use crate::wit::wasi::io::streams;
17+
use futures::SinkExt;
18+
1819
/// A unified request object that can represent both incoming and outgoing requests.
1920
///
2021
/// This should be used in favor of `IncomingRequest` and `OutgoingRequest` when there
@@ -438,12 +439,12 @@ impl IncomingRequest {
438439
/// # Panics
439440
///
440441
/// Panics if the body was already consumed.
441-
pub fn into_body_stream(self) -> impl futures::Stream<Item = anyhow::Result<Vec<u8>>> {
442+
pub fn into_body_stream(self) -> impl futures::Stream<Item = Result<Vec<u8>, streams::Error>> {
442443
executor::incoming_body(self.consume().expect("request body was already consumed"))
443444
}
444445

445446
/// Return a `Vec<u8>` of the body or fails
446-
pub async fn into_body(self) -> anyhow::Result<Vec<u8>> {
447+
pub async fn into_body(self) -> Result<Vec<u8>, streams::Error> {
447448
use futures::TryStreamExt;
448449
let mut stream = self.into_body_stream();
449450
let mut body = Vec::new();
@@ -460,12 +461,12 @@ impl IncomingResponse {
460461
/// # Panics
461462
///
462463
/// Panics if the body was already consumed.
463-
pub fn into_body_stream(self) -> impl futures::Stream<Item = anyhow::Result<Vec<u8>>> {
464+
pub fn into_body_stream(self) -> impl futures::Stream<Item = Result<Vec<u8>, streams::Error>> {
464465
executor::incoming_body(self.consume().expect("response body was already consumed"))
465466
}
466467

467468
/// Return a `Vec<u8>` of the body or fails
468-
pub async fn into_body(self) -> anyhow::Result<Vec<u8>> {
469+
pub async fn into_body(self) -> Result<Vec<u8>, streams::Error> {
469470
use futures::TryStreamExt;
470471
let mut stream = self.into_body_stream();
471472
let mut body = Vec::new();
@@ -487,6 +488,17 @@ impl OutgoingResponse {
487488
}
488489
}
489490

491+
impl OutgoingRequest {
492+
/// Construct a `Sink` which writes chunks to the body of the specified response.
493+
///
494+
/// # Panics
495+
///
496+
/// Panics if the body was already taken.
497+
pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = Error> {
498+
executor::outgoing_body(self.write().expect("request body was already taken"))
499+
}
500+
}
501+
490502
/// The out param for setting an `OutgoingResponse`
491503
pub struct ResponseOutparam(types::ResponseOutparam);
492504

@@ -511,7 +523,6 @@ impl ResponseOutparam {
511523
response: OutgoingResponse,
512524
buffer: Vec<u8>,
513525
) -> Result<(), Error> {
514-
use futures::SinkExt;
515526
let mut body = response.take_body();
516527
self.set(response);
517528
body.send(buffer).await
@@ -526,18 +537,26 @@ impl ResponseOutparam {
526537
/// Send an outgoing request
527538
pub async fn send<I, O>(request: I) -> Result<O, SendError>
528539
where
529-
I: TryInto<OutgoingRequest>,
540+
I: TryIntoOutgoingRequest,
530541
I::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
531542
O: TryFromIncomingResponse,
532543
O::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
533544
{
534-
let response = executor::outgoing_request_send(
535-
request
536-
.try_into()
537-
.map_err(|e| SendError::RequestConversion(e.into()))?,
538-
)
539-
.await
540-
.map_err(SendError::Http)?;
545+
let (request, body_buffer) = I::try_into_outgoing_request(request)
546+
.map_err(|e| SendError::RequestConversion(e.into()))?;
547+
if let Some(body_buffer) = body_buffer {
548+
// It is part of the contract of the trait that implementors of `TryIntoOutgoingRequest`
549+
// do not call `OutgoingRequest::write`` if they return a buffered body.
550+
let mut body_sink = request.take_body();
551+
body_sink
552+
.send(body_buffer)
553+
.await
554+
.map_err(|e| SendError::Http(Error::UnexpectedError(e.to_string())))?;
555+
}
556+
let response = executor::outgoing_request_send(request)
557+
.await
558+
.map_err(SendError::Http)?;
559+
541560
TryFromIncomingResponse::try_from_incoming_response(response)
542561
.await
543562
.map_err(|e: O::Error| SendError::ResponseConversion(e.into()))
@@ -642,6 +661,14 @@ pub mod responses {
642661
}
643662
}
644663

664+
impl std::fmt::Display for streams::Error {
665+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
666+
f.write_str(&self.to_debug_string())
667+
}
668+
}
669+
670+
impl std::error::Error for streams::Error {}
671+
645672
#[cfg(test)]
646673
mod tests {
647674
use super::*;

sdk/rust/src/http/conversions.rs

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::collections::HashMap;
22

33
use async_trait::async_trait;
44

5+
use crate::wit::wasi::io::streams;
6+
57
use super::{Headers, IncomingRequest, IncomingResponse, OutgoingRequest, OutgoingResponse};
68

79
use super::{responses, NonUtf8BodyError, Request, Response};
@@ -61,12 +63,12 @@ impl TryFromIncomingRequest for Request {
6163
.method(request.method())
6264
.uri(request.uri())
6365
.headers(request.headers())
64-
.body(
65-
request
66-
.into_body()
67-
.await
68-
.map_err(IncomingRequestError::BodyConversionError)?,
69-
)
66+
.body(request.into_body().await.map_err(|e| {
67+
IncomingRequestError::BodyConversionError(anyhow::anyhow!(
68+
"{}",
69+
e.to_debug_string()
70+
))
71+
})?)
7072
.build())
7173
}
7274
}
@@ -482,37 +484,59 @@ where
482484
}
483485
}
484486

485-
impl TryFrom<Request> for OutgoingRequest {
487+
/// A trait for converting a type into an `OutgoingRequest`
488+
pub trait TryIntoOutgoingRequest {
489+
/// The error if the conversion fails
490+
type Error;
491+
492+
/// Turn the type into an `OutgoingRequest`
493+
///
494+
/// If the implementor can be sure that the `OutgoingRequest::write` has not been called they
495+
/// can return a buffer as the second element of the returned tuple and `send` will send
496+
/// that as the request body.
497+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error>;
498+
}
499+
500+
impl TryIntoOutgoingRequest for OutgoingRequest {
501+
type Error = std::convert::Infallible;
502+
503+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
504+
Ok((self, None))
505+
}
506+
}
507+
508+
impl TryIntoOutgoingRequest for Request {
486509
type Error = std::convert::Infallible;
487510

488-
fn try_from(req: Request) -> Result<Self, Self::Error> {
489-
let headers = req
511+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
512+
let headers = self
490513
.headers()
491514
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_owned()))
492515
.collect::<Vec<_>>();
493-
Ok(OutgoingRequest::new(
494-
req.method(),
495-
req.path_and_query(),
496-
Some(if req.is_https() {
516+
let request = OutgoingRequest::new(
517+
self.method(),
518+
self.path_and_query(),
519+
Some(if self.is_https() {
497520
&super::Scheme::Https
498521
} else {
499522
&super::Scheme::Http
500523
}),
501-
req.authority(),
524+
self.authority(),
502525
&Headers::new(&headers),
503-
))
526+
);
527+
Ok((request, Some(self.into_body())))
504528
}
505529
}
506530

507531
#[cfg(feature = "http")]
508-
impl<B> TryFrom<hyperium::Request<B>> for OutgoingRequest
532+
impl<B> TryIntoOutgoingRequest for hyperium::Request<B>
509533
where
510534
B: TryIntoBody,
511535
B::Error: std::error::Error + Send + Sync + 'static,
512536
{
513537
type Error = anyhow::Error;
514-
fn try_from(req: hyperium::Request<B>) -> Result<Self, Self::Error> {
515-
let method = match req.method() {
538+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
539+
let method = match self.method() {
516540
&hyperium::Method::GET => super::Method::Get,
517541
&hyperium::Method::POST => super::Method::Post,
518542
&hyperium::Method::PUT => super::Method::Put,
@@ -522,34 +546,36 @@ where
522546
&hyperium::Method::OPTIONS => super::Method::Options,
523547
m => anyhow::bail!("Unsupported method: {m}"),
524548
};
525-
let headers = req
549+
let headers = self
526550
.headers()
527551
.into_iter()
528552
.map(|(n, v)| (n.as_str().to_owned(), v.as_bytes().to_owned()))
529553
.collect::<Vec<_>>();
530-
Ok(OutgoingRequest::new(
554+
let request = OutgoingRequest::new(
531555
&method,
532-
req.uri().path_and_query().map(|p| p.as_str()),
533-
req.uri()
556+
self.uri().path_and_query().map(|p| p.as_str()),
557+
self.uri()
534558
.scheme()
535559
.map(|s| match s.as_str() {
536560
"http" => super::Scheme::Http,
537561
"https" => super::Scheme::Https,
538562
s => super::Scheme::Other(s.to_owned()),
539563
})
540564
.as_ref(),
541-
req.uri().authority().map(|a| a.as_str()),
565+
self.uri().authority().map(|a| a.as_str()),
542566
&Headers::new(&headers),
543-
))
567+
);
568+
let buffer = TryIntoBody::try_into_body(self.into_body())?;
569+
Ok((request, Some(buffer)))
544570
}
545571
}
546572

573+
/// A trait for converting from an `IncomingRequest`
547574
#[async_trait]
548-
/// TODO
549575
pub trait TryFromIncomingResponse {
550-
/// TODO
576+
/// The error if conversion fails
551577
type Error;
552-
/// TODO
578+
/// Turn the `IncomingResponse` into the type
553579
async fn try_from_incoming_response(resp: IncomingResponse) -> Result<Self, Self::Error>
554580
where
555581
Self: Sized;
@@ -563,6 +589,18 @@ impl TryFromIncomingResponse for IncomingResponse {
563589
}
564590
}
565591

592+
#[async_trait]
593+
impl TryFromIncomingResponse for Response {
594+
type Error = streams::Error;
595+
async fn try_from_incoming_response(resp: IncomingResponse) -> Result<Self, Self::Error> {
596+
Ok(Response::builder()
597+
.status(resp.status())
598+
.headers(resp.headers())
599+
.body(resp.into_body().await?)
600+
.build())
601+
}
602+
}
603+
566604
#[cfg(feature = "http")]
567605
#[async_trait]
568606
impl<B: TryFromBody> TryFromIncomingResponse for hyperium::Response<B> {

sdk/rust/src/http/executor.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::wit::wasi::http::types::{
55
use crate::wit::wasi::io;
66
use crate::wit::wasi::io::streams::{InputStream, OutputStream, StreamError};
77

8-
use anyhow::{anyhow, Result};
98
use futures::{future, sink, stream, Sink, Stream};
109

1110
use std::cell::RefCell;
@@ -165,7 +164,9 @@ pub(crate) fn outgoing_request_send(
165164
}
166165

167166
#[doc(hidden)]
168-
pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>> {
167+
pub fn incoming_body(
168+
body: IncomingBody,
169+
) -> impl Stream<Item = Result<Vec<u8>, io::streams::Error>> {
169170
struct Incoming(Option<(InputStream, IncomingBody)>);
170171

171172
impl Drop for Incoming {
@@ -196,9 +197,7 @@ pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>>
196197
}
197198
}
198199
Err(StreamError::Closed) => Poll::Ready(None),
199-
Err(StreamError::LastOperationFailed(error)) => Poll::Ready(Some(Err(
200-
anyhow!("Last operation failed: {}", error.to_debug_string()),
201-
))),
200+
Err(StreamError::LastOperationFailed(error)) => Poll::Ready(Some(Err(error))),
202201
}
203202
} else {
204203
Poll::Ready(None)

0 commit comments

Comments
 (0)