Skip to content

Commit 09c7e5c

Browse files
authored
Merge pull request #1975 from fermyon/http-fix
Fix outbound http in the Rust SDK (take 2)
2 parents 8bafcb2 + 2ac71e4 commit 09c7e5c

File tree

6 files changed

+177
-49
lines changed

6 files changed

+177
-49
lines changed

examples/http-rust-outbound-http/outbound-http-to-same-app/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use spin_sdk::{
77
/// Send an HTTP request and return the response.
88
#[http_component]
99
async fn send_outbound(_req: Request) -> Result<impl IntoResponse> {
10-
let mut res: http::Response<()> = spin_sdk::http::send(
10+
let mut res: http::Response<String> = spin_sdk::http::send(
1111
http::Request::builder()
1212
.method("GET")
1313
.uri("/hello")

examples/http-rust-outbound-http/outbound-http/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use spin_sdk::{
77
/// Send an HTTP request and return the response.
88
#[http_component]
99
async fn send_outbound(_req: Request) -> Result<impl IntoResponse> {
10-
let mut res: http::Response<()> = spin_sdk::http::send(
10+
let mut res: http::Response<String> = spin_sdk::http::send(
1111
http::Request::builder()
1212
.method("GET")
1313
.uri("https://random-data-api.fermyon.app/animals/json")

sdk/rust/readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ server, modifies the result, then returns it:
4141
```rust
4242
#[http_component]
4343
async fn hello_world(_req: Request) -> Result<Response> {
44-
let mut res: http::Response<()> = spin_sdk::http::send(
44+
let mut res: http::Response<String> = spin_sdk::http::send(
4545
http::Request::builder()
4646
.method("GET")
4747
.uri("https://fermyon.com")

sdk/rust/src/http.rs

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ use std::collections::HashMap;
55

66
#[doc(inline)]
77
pub use conversions::IntoResponse;
8-
9-
use self::conversions::TryFromIncomingResponse;
10-
11-
use super::wit::wasi::http::types;
128
#[doc(inline)]
139
pub use types::{
1410
Error, Fields, Headers, IncomingRequest, IncomingResponse, Method, OutgoingBody,
1511
OutgoingRequest, OutgoingResponse, Scheme, StatusCode, Trailers,
1612
};
1713

14+
use self::conversions::{TryFromIncomingResponse, TryIntoOutgoingRequest};
15+
use super::wit::wasi::http::types;
16+
use crate::wit::wasi::io::streams;
17+
use futures::SinkExt;
18+
1819
/// A unified request object that can represent both incoming and outgoing requests.
1920
///
2021
/// This should be used in favor of `IncomingRequest` and `OutgoingRequest` when there
@@ -107,6 +108,34 @@ impl Request {
107108
uri,
108109
)
109110
}
111+
112+
/// Whether the request is an HTTPS request
113+
fn is_https(&self) -> bool {
114+
self.uri
115+
.0
116+
.as_ref()
117+
.and_then(|u| u.scheme())
118+
.map(|s| s == &hyperium::uri::Scheme::HTTPS)
119+
.unwrap_or(true)
120+
}
121+
122+
/// The URI's authority
123+
fn authority(&self) -> Option<&str> {
124+
self.uri
125+
.0
126+
.as_ref()
127+
.and_then(|u| u.authority())
128+
.map(|a| a.as_str())
129+
}
130+
131+
/// The request path and query combined
132+
pub fn path_and_query(&self) -> Option<&str> {
133+
self.uri
134+
.0
135+
.as_ref()
136+
.and_then(|u| u.path_and_query())
137+
.map(|s| s.as_str())
138+
}
110139
}
111140

112141
/// A request builder
@@ -410,12 +439,12 @@ impl IncomingRequest {
410439
/// # Panics
411440
///
412441
/// Panics if the body was already consumed.
413-
pub fn into_body_stream(self) -> impl futures::Stream<Item = anyhow::Result<Vec<u8>>> {
442+
pub fn into_body_stream(self) -> impl futures::Stream<Item = Result<Vec<u8>, streams::Error>> {
414443
executor::incoming_body(self.consume().expect("request body was already consumed"))
415444
}
416445

417446
/// Return a `Vec<u8>` of the body or fails
418-
pub async fn into_body(self) -> anyhow::Result<Vec<u8>> {
447+
pub async fn into_body(self) -> Result<Vec<u8>, streams::Error> {
419448
use futures::TryStreamExt;
420449
let mut stream = self.into_body_stream();
421450
let mut body = Vec::new();
@@ -432,12 +461,12 @@ impl IncomingResponse {
432461
/// # Panics
433462
///
434463
/// Panics if the body was already consumed.
435-
pub fn into_body_stream(self) -> impl futures::Stream<Item = anyhow::Result<Vec<u8>>> {
464+
pub fn into_body_stream(self) -> impl futures::Stream<Item = Result<Vec<u8>, streams::Error>> {
436465
executor::incoming_body(self.consume().expect("response body was already consumed"))
437466
}
438467

439468
/// Return a `Vec<u8>` of the body or fails
440-
pub async fn into_body(self) -> anyhow::Result<Vec<u8>> {
469+
pub async fn into_body(self) -> Result<Vec<u8>, streams::Error> {
441470
use futures::TryStreamExt;
442471
let mut stream = self.into_body_stream();
443472
let mut body = Vec::new();
@@ -454,11 +483,22 @@ impl OutgoingResponse {
454483
/// # Panics
455484
///
456485
/// Panics if the body was already taken.
457-
pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = anyhow::Error> {
486+
pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = Error> {
458487
executor::outgoing_body(self.write().expect("response body was already taken"))
459488
}
460489
}
461490

491+
impl OutgoingRequest {
492+
/// Construct a `Sink` which writes chunks to the body of the specified response.
493+
///
494+
/// # Panics
495+
///
496+
/// Panics if the body was already taken.
497+
pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = Error> {
498+
executor::outgoing_body(self.write().expect("request body was already taken"))
499+
}
500+
}
501+
462502
/// The out param for setting an `OutgoingResponse`
463503
pub struct ResponseOutparam(types::ResponseOutparam);
464504

@@ -482,8 +522,7 @@ impl ResponseOutparam {
482522
self,
483523
response: OutgoingResponse,
484524
buffer: Vec<u8>,
485-
) -> anyhow::Result<()> {
486-
use futures::SinkExt;
525+
) -> Result<(), Error> {
487526
let mut body = response.take_body();
488527
self.set(response);
489528
body.send(buffer).await
@@ -496,20 +535,35 @@ impl ResponseOutparam {
496535
}
497536

498537
/// Send an outgoing request
538+
///
539+
/// If `request`` is an `OutgoingRequest` and you are streaming the body to the
540+
/// outgoing request body sink, you need to ensure it is dropped before awaiting this function.
499541
pub async fn send<I, O>(request: I) -> Result<O, SendError>
500542
where
501-
I: TryInto<OutgoingRequest>,
543+
I: TryIntoOutgoingRequest,
502544
I::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
503545
O: TryFromIncomingResponse,
504546
O::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
505547
{
506-
let response = executor::outgoing_request_send(
507-
request
508-
.try_into()
509-
.map_err(|e| SendError::RequestConversion(e.into()))?,
510-
)
511-
.await
548+
let (request, body_buffer) = I::try_into_outgoing_request(request)
549+
.map_err(|e| SendError::RequestConversion(e.into()))?;
550+
let response = if let Some(body_buffer) = body_buffer {
551+
// It is part of the contract of the trait that implementors of `TryIntoOutgoingRequest`
552+
// do not call `OutgoingRequest::write`` if they return a buffered body.
553+
let mut body_sink = request.take_body();
554+
let response = executor::outgoing_request_send(request);
555+
body_sink
556+
.send(body_buffer)
557+
.await
558+
.map_err(|e| SendError::Http(Error::UnexpectedError(e.to_string())))?;
559+
// The body sink needs to be dropped before we await the response, otherwise we deadlock
560+
drop(body_sink);
561+
response.await
562+
} else {
563+
executor::outgoing_request_send(request).await
564+
}
512565
.map_err(SendError::Http)?;
566+
513567
TryFromIncomingResponse::try_from_incoming_response(response)
514568
.await
515569
.map_err(|e: O::Error| SendError::ResponseConversion(e.into()))
@@ -614,6 +668,14 @@ pub mod responses {
614668
}
615669
}
616670

671+
impl std::fmt::Display for streams::Error {
672+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673+
f.write_str(&self.to_debug_string())
674+
}
675+
}
676+
677+
impl std::error::Error for streams::Error {}
678+
617679
#[cfg(test)]
618680
mod tests {
619681
use super::*;

sdk/rust/src/http/conversions.rs

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::collections::HashMap;
22

33
use async_trait::async_trait;
44

5+
use crate::wit::wasi::io::streams;
6+
57
use super::{Headers, IncomingRequest, IncomingResponse, OutgoingRequest, OutgoingResponse};
68

79
use super::{responses, NonUtf8BodyError, Request, Response};
@@ -61,12 +63,12 @@ impl TryFromIncomingRequest for Request {
6163
.method(request.method())
6264
.uri(request.uri())
6365
.headers(request.headers())
64-
.body(
65-
request
66-
.into_body()
67-
.await
68-
.map_err(IncomingRequestError::BodyConversionError)?,
69-
)
66+
.body(request.into_body().await.map_err(|e| {
67+
IncomingRequestError::BodyConversionError(anyhow::anyhow!(
68+
"{}",
69+
e.to_debug_string()
70+
))
71+
})?)
7072
.build())
7173
}
7274
}
@@ -482,15 +484,59 @@ where
482484
}
483485
}
484486

487+
/// A trait for converting a type into an `OutgoingRequest`
488+
pub trait TryIntoOutgoingRequest {
489+
/// The error if the conversion fails
490+
type Error;
491+
492+
/// Turn the type into an `OutgoingRequest`
493+
///
494+
/// If the implementor can be sure that the `OutgoingRequest::write` has not been called they
495+
/// can return a buffer as the second element of the returned tuple and `send` will send
496+
/// that as the request body.
497+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error>;
498+
}
499+
500+
impl TryIntoOutgoingRequest for OutgoingRequest {
501+
type Error = std::convert::Infallible;
502+
503+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
504+
Ok((self, None))
505+
}
506+
}
507+
508+
impl TryIntoOutgoingRequest for Request {
509+
type Error = std::convert::Infallible;
510+
511+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
512+
let headers = self
513+
.headers()
514+
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_owned()))
515+
.collect::<Vec<_>>();
516+
let request = OutgoingRequest::new(
517+
self.method(),
518+
self.path_and_query(),
519+
Some(if self.is_https() {
520+
&super::Scheme::Https
521+
} else {
522+
&super::Scheme::Http
523+
}),
524+
self.authority(),
525+
&Headers::new(&headers),
526+
);
527+
Ok((request, Some(self.into_body())))
528+
}
529+
}
530+
485531
#[cfg(feature = "http")]
486-
impl<B> TryFrom<hyperium::Request<B>> for OutgoingRequest
532+
impl<B> TryIntoOutgoingRequest for hyperium::Request<B>
487533
where
488534
B: TryIntoBody,
489535
B::Error: std::error::Error + Send + Sync + 'static,
490536
{
491537
type Error = anyhow::Error;
492-
fn try_from(req: hyperium::Request<B>) -> Result<Self, Self::Error> {
493-
let method = match req.method() {
538+
fn try_into_outgoing_request(self) -> Result<(OutgoingRequest, Option<Vec<u8>>), Self::Error> {
539+
let method = match self.method() {
494540
&hyperium::Method::GET => super::Method::Get,
495541
&hyperium::Method::POST => super::Method::Post,
496542
&hyperium::Method::PUT => super::Method::Put,
@@ -500,34 +546,36 @@ where
500546
&hyperium::Method::OPTIONS => super::Method::Options,
501547
m => anyhow::bail!("Unsupported method: {m}"),
502548
};
503-
let headers = req
549+
let headers = self
504550
.headers()
505551
.into_iter()
506552
.map(|(n, v)| (n.as_str().to_owned(), v.as_bytes().to_owned()))
507553
.collect::<Vec<_>>();
508-
Ok(OutgoingRequest::new(
554+
let request = OutgoingRequest::new(
509555
&method,
510-
req.uri().path_and_query().map(|p| p.as_str()),
511-
req.uri()
556+
self.uri().path_and_query().map(|p| p.as_str()),
557+
self.uri()
512558
.scheme()
513559
.map(|s| match s.as_str() {
514560
"http" => super::Scheme::Http,
515561
"https" => super::Scheme::Https,
516562
s => super::Scheme::Other(s.to_owned()),
517563
})
518564
.as_ref(),
519-
req.uri().authority().map(|a| a.as_str()),
565+
self.uri().authority().map(|a| a.as_str()),
520566
&Headers::new(&headers),
521-
))
567+
);
568+
let buffer = TryIntoBody::try_into_body(self.into_body())?;
569+
Ok((request, Some(buffer)))
522570
}
523571
}
524572

573+
/// A trait for converting from an `IncomingRequest`
525574
#[async_trait]
526-
/// TODO
527575
pub trait TryFromIncomingResponse {
528-
/// TODO
576+
/// The error if conversion fails
529577
type Error;
530-
/// TODO
578+
/// Turn the `IncomingResponse` into the type
531579
async fn try_from_incoming_response(resp: IncomingResponse) -> Result<Self, Self::Error>
532580
where
533581
Self: Sized;
@@ -541,6 +589,18 @@ impl TryFromIncomingResponse for IncomingResponse {
541589
}
542590
}
543591

592+
#[async_trait]
593+
impl TryFromIncomingResponse for Response {
594+
type Error = streams::Error;
595+
async fn try_from_incoming_response(resp: IncomingResponse) -> Result<Self, Self::Error> {
596+
Ok(Response::builder()
597+
.status(resp.status())
598+
.headers(resp.headers())
599+
.body(resp.into_body().await?)
600+
.build())
601+
}
602+
}
603+
544604
#[cfg(feature = "http")]
545605
#[async_trait]
546606
impl<B: TryFromBody> TryFromIncomingResponse for hyperium::Response<B> {

0 commit comments

Comments
 (0)