diff --git a/Cargo.toml b/Cargo.toml index fa68f45..63c11be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,14 @@ default = ["json"] json = ["dep:serde", "dep:serde_json"] [dependencies] +anyhow.workspace = true async-task.workspace = true +async-trait.workspace = true +bytes.workspace = true +futures-lite.workspace = true http.workspace = true +http-body.workspace = true +http-body-util.workspace = true itoa.workspace = true pin-project-lite.workspace = true slab.workspace = true @@ -32,7 +38,7 @@ serde_json = { workspace = true, optional = true } [dev-dependencies] anyhow.workspace = true clap.workspace = true -futures-lite.workspace = true +http-body-util.workspace = true futures-concurrency.workspace = true humantime.workspace = true serde = { workspace = true, features = ["derive"] } @@ -64,6 +70,8 @@ authors = [ [workspace.dependencies] anyhow = "1" async-task = "4.7" +async-trait = "*" +bytes = "1.10.1" cargo_metadata = "0.22" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" @@ -72,6 +80,8 @@ futures-concurrency = "7.6" humantime = "2.1.0" heck = "0.5" http = "1.1" +http-body = "1.0.1" +http-body-util = "0.1.3" itoa = "1" pin-project-lite = "0.2.8" quote = "1.0" diff --git a/examples/complex_http_client.rs b/examples/complex_http_client.rs index 8a7b2b6..3754d60 100644 --- a/examples/complex_http_client.rs +++ b/examples/complex_http_client.rs @@ -1,9 +1,8 @@ use anyhow::{anyhow, Result}; use clap::{ArgAction, Parser}; use std::str::FromStr; -use wstd::http::{ - body::BodyForthcoming, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, -}; +use wstd::http::{Body, BodyExt, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri}; +use wstd::io::AsyncWrite; /// Complex HTTP client /// @@ -86,23 +85,29 @@ async fn main() -> Result<()> { trailers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?); } - // Send the request. - - let request = request.body(BodyForthcoming)?; + let body = if args.body { + Body::from_input_stream(wstd::io::stdin().into_inner()).into_boxed_body() + } else { + Body::empty().into_boxed_body() + }; + let t = trailers.clone(); + let body = body.with_trailers(async move { + if t.is_empty() { + None + } else { + Some(Ok(t)) + } + }); + let request = request.body(body)?; + // Send the request. eprintln!("> {} / {:?}", request.method(), request.version()); for (key, value) in request.headers().iter() { let value = String::from_utf8_lossy(value.as_bytes()); eprintln!("> {key}: {value}"); } - let (mut outgoing_body, response) = client.start_request(request).await?; - - if args.body { - wstd::io::copy(wstd::io::stdin(), &mut outgoing_body).await?; - } else { - wstd::io::copy(wstd::io::empty(), &mut outgoing_body).await?; - } + let response = client.send(request).await?; if !trailers.is_empty() { eprintln!("..."); @@ -112,10 +117,6 @@ async fn main() -> Result<()> { eprintln!("> {key}: {value}"); } - Client::finish(outgoing_body, Some(trailers))?; - - let response = response.await?; - // Print the response. eprintln!("< {:?} {}", response.version(), response.status()); @@ -124,10 +125,12 @@ async fn main() -> Result<()> { eprintln!("< {key}: {value}"); } - let mut body = response.into_body(); - wstd::io::copy(&mut body, wstd::io::stdout()).await?; + let body = response.into_body().into_http_body().collect().await?; + let trailers = body.trailers().cloned(); + wstd::io::stdout() + .write_all(body.to_bytes().as_ref()) + .await?; - let trailers = body.finish().await?; if let Some(trailers) = trailers { for (key, value) in trailers.iter() { let value = String::from_utf8_lossy(value.as_bytes()); diff --git a/examples/http_client.rs b/examples/http_client.rs index 12bc685..2153f41 100644 --- a/examples/http_client.rs +++ b/examples/http_client.rs @@ -1,10 +1,7 @@ use anyhow::{anyhow, Result}; use clap::{ArgAction, Parser}; -use wstd::http::{ - body::{IncomingBody, StreamedBody}, - request::Builder, - Body, Client, Method, Request, Response, Uri, -}; +use wstd::http::{Body, BodyExt, Client, Method, Request, Uri}; +use wstd::io::AsyncWrite; /// Simple HTTP client /// @@ -75,39 +72,35 @@ async fn main() -> Result<()> { // Send the request. - async fn send_request( - client: &Client, - request: Builder, - body: B, - ) -> Result> { - let request = request.body(body)?; + let body = if args.body { + Body::from_input_stream(wstd::io::stdin().into_inner()) + } else { + Body::empty() + }; - eprintln!("> {} / {:?}", request.method(), request.version()); - for (key, value) in request.headers().iter() { - let value = String::from_utf8_lossy(value.as_bytes()); - eprintln!("> {key}: {value}"); - } + let request = request.body(body)?; - Ok(client.send(request).await?) + eprintln!("> {} / {:?}", request.method(), request.version()); + for (key, value) in request.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("> {key}: {value}"); } - let response = if args.body { - send_request(&client, request, StreamedBody::new(wstd::io::stdin())).await - } else { - send_request(&client, request, wstd::io::empty()).await - }?; - // Print the response. + let response = client.send(request).await?; + // Print the response. eprintln!("< {:?} {}", response.version(), response.status()); for (key, value) in response.headers().iter() { let value = String::from_utf8_lossy(value.as_bytes()); eprintln!("< {key}: {value}"); } - let mut body = response.into_body(); - wstd::io::copy(&mut body, wstd::io::stdout()).await?; + let body = response.into_body().into_http_body().collect().await?; + let trailers = body.trailers().cloned(); + wstd::io::stdout() + .write_all(body.to_bytes().as_ref()) + .await?; - let trailers = body.finish().await?; if let Some(trailers) = trailers { for (key, value) in trailers.iter() { let value = String::from_utf8_lossy(value.as_bytes()); diff --git a/examples/http_server.rs b/examples/http_server.rs index b21eda4..d4df71c 100644 --- a/examples/http_server.rs +++ b/examples/http_server.rs @@ -1,31 +1,36 @@ -use wstd::http::body::{BodyForthcoming, IncomingBody, OutgoingBody}; -use wstd::http::server::{Finished, Responder}; -use wstd::http::{IntoBody, Request, Response, StatusCode}; -use wstd::io::{copy, empty, AsyncWrite}; +use anyhow::{Context, Result}; +use futures_lite::stream::once_future; +use http_body_util::{BodyExt, StreamBody}; +use wstd::http::body::{Body, Bytes, Frame, Incoming}; +use wstd::http::{Error, HeaderMap, Request, Response, StatusCode}; use wstd::time::{Duration, Instant}; #[wstd::http_server] -async fn main(request: Request, responder: Responder) -> Finished { - match request.uri().path_and_query().unwrap().as_str() { - "/wait" => http_wait(request, responder).await, - "/echo" => http_echo(request, responder).await, - "/echo-headers" => http_echo_headers(request, responder).await, - "/echo-trailers" => http_echo_trailers(request, responder).await, - "/fail" => http_fail(request, responder).await, - "/bigfail" => http_bigfail(request, responder).await, - "/" => http_home(request, responder).await, - _ => http_not_found(request, responder).await, +async fn main(request: Request) -> Result, Error> { + let path = request.uri().path_and_query().unwrap().as_str(); + println!("serving {path}"); + match path { + "/" => http_home(request).await, + "/wait-response" => http_wait_response(request).await, + "/wait-body" => http_wait_body(request).await, + "/echo" => http_echo(request).await, + "/echo-headers" => http_echo_headers(request).await, + "/echo-trailers" => http_echo_trailers(request).await, + "/response-status" => http_response_status(request).await, + "/response-fail" => http_response_fail(request).await, + "/response-body-fail" => http_body_fail(request).await, + _ => http_not_found(request).await, } } -async fn http_home(_request: Request, responder: Responder) -> Finished { +async fn http_home(_request: Request) -> Result> { // To send a single string as the response body, use `Responder::respond`. - responder - .respond(Response::new("Hello, wasi:http/proxy world!\n".into_body())) - .await + Ok(Response::new( + "Hello, wasi:http/proxy world!\n".to_owned().into(), + )) } -async fn http_wait(_request: Request, responder: Responder) -> Finished { +async fn http_wait_response(_request: Request) -> Result> { // Get the time now let now = Instant::now(); @@ -35,60 +40,85 @@ async fn http_wait(_request: Request, responder: Responder) -> Fin // Compute how long we slept for. let elapsed = Instant::now().duration_since(now).as_millis(); - // To stream data to the response body, use `Responder::start_response`. - let mut body = responder.start_response(Response::new(BodyForthcoming)); - let result = body - .write_all(format!("slept for {elapsed} millis\n").as_bytes()) - .await; - Finished::finish(body, result, None) + Ok(Response::new( + format!("slept for {elapsed} millis\n").into(), + )) } -async fn http_echo(mut request: Request, responder: Responder) -> Finished { - // Stream data from the request body to the response body. - let mut body = responder.start_response(Response::new(BodyForthcoming)); - let result = copy(request.body_mut(), &mut body).await; - Finished::finish(body, result, None) -} +async fn http_wait_body(_request: Request) -> Result> { + // Get the time now + let now = Instant::now(); -async fn http_fail(_request: Request, responder: Responder) -> Finished { - let body = responder.start_response(Response::new(BodyForthcoming)); - Finished::fail(body) -} + let body = StreamBody::new(once_future(async move { + // Sleep for one second. + wstd::task::sleep(Duration::from_secs(1)).await; -async fn http_bigfail(_request: Request, responder: Responder) -> Finished { - async fn write_body(body: &mut OutgoingBody) -> wstd::io::Result<()> { - for _ in 0..0x10 { - body.write_all("big big big big\n".as_bytes()).await?; - } - body.flush().await?; - Ok(()) - } + // Compute how long we slept for. + let elapsed = Instant::now().duration_since(now).as_millis(); + anyhow::Ok(Frame::data(Bytes::from(format!( + "slept for {elapsed} millis\n" + )))) + })); + + Ok(Response::new(body.into())) +} - let mut body = responder.start_response(Response::new(BodyForthcoming)); - let _ = write_body(&mut body).await; - Finished::fail(body) +async fn http_echo(request: Request) -> Result> { + let (_parts, body) = request.into_parts(); + Ok(Response::new(body.into())) } -async fn http_echo_headers(request: Request, responder: Responder) -> Finished { +async fn http_echo_headers(request: Request) -> Result> { let mut response = Response::builder(); *response.headers_mut().unwrap() = request.into_parts().0.headers; - let response = response.body(empty()).unwrap(); - responder.respond(response).await + Ok(response.body("".to_owned().into())?) } -async fn http_echo_trailers(request: Request, responder: Responder) -> Finished { - let body = responder.start_response(Response::new(BodyForthcoming)); - let (trailers, result) = match request.into_body().finish().await { - Ok(trailers) => (trailers, Ok(())), - Err(err) => (Default::default(), Err(std::io::Error::other(err))), +async fn http_echo_trailers(request: Request) -> Result> { + let collected = request.into_body().into_http_body().collect().await?; + let trailers = collected.trailers().cloned().unwrap_or_else(|| { + let mut trailers = HeaderMap::new(); + trailers.insert("x-no-trailers", "1".parse().unwrap()); + trailers + }); + + let body = StreamBody::new(once_future(async move { + anyhow::Ok(Frame::::trailers(trailers)) + })); + Ok(Response::new(body.into())) +} + +async fn http_response_status(request: Request) -> Result> { + let status = if let Some(header_val) = request.headers().get("x-response-status") { + header_val + .to_str() + .context("contents of x-response-status")? + .parse::() + .context("u16 value from x-response-status")? + } else { + 500 }; - Finished::finish(body, result, trailers) + Ok(Response::builder() + .status(status) + .body(String::new().into())?) +} + +async fn http_response_fail(_request: Request) -> Result> { + Err(anyhow::anyhow!("error creating response")) +} + +async fn http_body_fail(_request: Request) -> Result> { + let body = StreamBody::new(once_future(async move { + Err::, _>(anyhow::anyhow!("error creating body")) + })); + + Ok(Response::new(body.into())) } -async fn http_not_found(_request: Request, responder: Responder) -> Finished { +async fn http_not_found(_request: Request) -> Result> { let response = Response::builder() .status(StatusCode::NOT_FOUND) - .body(empty()) + .body(Body::empty()) .unwrap(); - responder.respond(response).await + Ok(response) } diff --git a/macro/src/lib.rs b/macro/src/lib.rs index 2c6c6e5..956613f 100644 --- a/macro/src/lib.rs +++ b/macro/src/lib.rs @@ -92,10 +92,8 @@ pub fn attr_macro_test(_attr: TokenStream, item: TokenStream) -> TokenStream { /// /// ```ignore /// #[wstd::http_server] -/// async fn main(request: Request, responder: Responder) -> Finished { -/// responder -/// .respond(Response::new("Hello!\n".into_body())) -/// .await +/// async fn main(request: Request) -> Result> { +/// Ok(Response::new("Hello!\n".into_body())) /// } /// ``` #[proc_macro_attribute] @@ -137,12 +135,15 @@ pub fn attr_macro_http_server(_attr: TokenStream, item: TokenStream) -> TokenStr } let responder = ::wstd::http::server::Responder::new(response_out); - let _finished: ::wstd::http::server::Finished = - match ::wstd::http::request::try_from_incoming(request) - { - Ok(request) => ::wstd::runtime::block_on(async { __run(request, responder).await }), - Err(err) => responder.fail(err), - }; + ::wstd::runtime::block_on(async move { + match ::wstd::http::request::try_from_incoming(request) { + Ok(request) => match __run(request).await { + Ok(response) => { responder.respond(response).await.unwrap() }, + Err(err) => responder.fail(err).unwrap(), + } + Err(err) => responder.fail(err).unwrap(), + } + }) } } diff --git a/src/http/body.rs b/src/http/body.rs index 9eecfae..2e6b434 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -1,353 +1,500 @@ -//! HTTP body types +use crate::http::{ + error::Context as _, + fields::{header_map_from_wasi, header_map_to_wasi}, + Error, HeaderMap, +}; +use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncWrite}; +use crate::runtime::{AsyncPollable, Reactor, WaitFor}; + +pub use ::http_body::{Body as HttpBody, Frame, SizeHint}; +pub use bytes::Bytes; -use crate::http::fields::header_map_from_wasi; -use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Cursor, Empty}; -use crate::runtime::AsyncPollable; -use core::fmt; use http::header::CONTENT_LENGTH; -use wasip2::http::types::IncomingBody as WasiIncomingBody; +use http_body_util::{combinators::BoxBody, BodyExt}; +use std::fmt; +use std::future::{poll_fn, Future}; +use std::pin::{pin, Pin}; +use std::task::{Context, Poll}; +use wasip2::http::types::{ + FutureTrailers, IncomingBody as WasiIncomingBody, OutgoingBody as WasiOutgoingBody, +}; +use wasip2::io::streams::{InputStream as WasiInputStream, StreamError}; -#[cfg(feature = "json")] -use serde::de::DeserializeOwned; -#[cfg(feature = "json")] -use serde_json; +pub mod util { + pub use http_body_util::*; +} -pub use super::{ - error::{Error, ErrorVariant}, - HeaderMap, -}; +#[derive(Debug)] +pub struct Body(pub(crate) BodyInner); #[derive(Debug)] -pub(crate) enum BodyKind { - Fixed(u64), - Chunked, +pub(crate) enum BodyInner { + Boxed(BoxBody), + Incoming(Incoming), + Complete(Bytes), } -impl BodyKind { - pub(crate) fn from_headers(headers: &HeaderMap) -> Result { - if let Some(value) = headers.get(CONTENT_LENGTH) { - let content_length = std::str::from_utf8(value.as_ref()) - .unwrap() - .parse::() - .map_err(|_| InvalidContentLength)?; - Ok(BodyKind::Fixed(content_length)) - } else { - Ok(BodyKind::Chunked) +impl Body { + pub async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> { + match self.0 { + BodyInner::Incoming(incoming) => { + let in_body = incoming.into_inner(); + let mut in_stream = + AsyncInputStream::new(in_body.stream().expect("incoming body already read")); + let mut out_stream = AsyncOutputStream::new( + outgoing_body + .write() + .expect("outgoing body already written"), + ); + crate::io::copy(&mut in_stream, &mut out_stream) + .await + .map_err(|e| { + Error::from(e) + .context("copying incoming body stream to outgoing body stream") + })?; + drop(in_stream); + drop(out_stream); + let future_in_trailers = WasiIncomingBody::finish(in_body); + Reactor::current() + .schedule(future_in_trailers.subscribe()) + .wait_for() + .await; + let in_trailers: Option = future_in_trailers + .get() + .expect("pollable ready") + .expect("got once") + .map_err(|e| Error::from(e).context("recieving incoming trailers"))?; + WasiOutgoingBody::finish(outgoing_body, in_trailers) + .map_err(|e| Error::from(e).context("finishing outgoing body"))?; + Ok(()) + } + BodyInner::Boxed(box_body) => { + let mut out_stream = AsyncOutputStream::new( + outgoing_body + .write() + .expect("outgoing body already written"), + ); + let mut body = pin!(box_body); + let mut trailers = None; + loop { + match poll_fn(|cx| body.as_mut().poll_frame(cx)).await { + Some(Ok(frame)) if frame.is_data() => { + let data = frame.data_ref().unwrap(); + out_stream.write_all(data).await?; + } + Some(Ok(frame)) if frame.is_trailers() => { + trailers = + Some(header_map_to_wasi(frame.trailers_ref().unwrap()).map_err( + |e| Error::from(e).context("outoging trailers to wasi"), + )?); + } + Some(Err(err)) => break Err(err.context("sending outgoing body")), + None => { + drop(out_stream); + WasiOutgoingBody::finish(outgoing_body, trailers) + .map_err(|e| Error::from(e).context("finishing outgoing body"))?; + break Ok(()); + } + _ => unreachable!(), + } + } + } + BodyInner::Complete(bytes) => { + let mut out_stream = AsyncOutputStream::new( + outgoing_body + .write() + .expect("outgoing body already written"), + ); + out_stream.write_all(&bytes).await?; + drop(out_stream); + WasiOutgoingBody::finish(outgoing_body, None) + .map_err(|e| Error::from(e).context("finishing outgoing body"))?; + Ok(()) + } } } -} -/// A trait representing an HTTP body. -#[doc(hidden)] -pub trait Body: AsyncRead { - /// Returns the exact remaining length of the iterator, if known. - fn len(&self) -> Option; - - /// Returns `true` if the body is known to be empty. - fn is_empty(&self) -> bool { - matches!(self.len(), Some(0)) + pub fn into_boxed_body(self) -> BoxBody { + match self.0 { + BodyInner::Incoming(i) => i.into_http_body().boxed(), + BodyInner::Complete(bytes) => http_body_util::Full::new(bytes) + .map_err(annotate_err) + .boxed(), + BodyInner::Boxed(b) => b, + } } -} -/// Conversion into a `Body`. -#[doc(hidden)] -pub trait IntoBody { - /// What type of `Body` are we turning this into? - type IntoBody: Body; - /// Convert into `Body`. - fn into_body(self) -> Self::IntoBody; -} -impl IntoBody for T -where - T: Body, -{ - type IntoBody = T; - fn into_body(self) -> Self::IntoBody { - self + pub fn as_boxed_body(&mut self) -> &mut BoxBody { + let mut prev = Self::empty(); + std::mem::swap(self, &mut prev); + self.0 = BodyInner::Boxed(prev.into_boxed_body()); + + match &mut self.0 { + BodyInner::Boxed(ref mut b) => b, + _ => unreachable!(), + } } -} -impl IntoBody for String { - type IntoBody = BoundedBody>; - fn into_body(self) -> Self::IntoBody { - BoundedBody(Cursor::new(self.into_bytes())) + pub async fn contents(&mut self) -> Result<&[u8], Error> { + match &mut self.0 { + BodyInner::Complete(ref bs) => Ok(bs.as_ref()), + inner => { + let mut prev = BodyInner::Complete(Bytes::new()); + std::mem::swap(inner, &mut prev); + let boxed_body = match prev { + BodyInner::Incoming(i) => i.into_http_body().boxed(), + BodyInner::Boxed(b) => b, + BodyInner::Complete(_) => unreachable!(), + }; + let collected = boxed_body.collect().await?; + *inner = BodyInner::Complete(collected.to_bytes()); + Ok(match inner { + BodyInner::Complete(ref bs) => bs.as_ref(), + _ => unreachable!(), + }) + } + } } -} -impl IntoBody for &str { - type IntoBody = BoundedBody>; - fn into_body(self) -> Self::IntoBody { - BoundedBody(Cursor::new(self.to_owned().into_bytes())) + pub fn content_length(&self) -> Option { + match &self.0 { + BodyInner::Boxed(b) => b.size_hint().exact(), + BodyInner::Complete(bs) => Some(bs.len() as u64), + BodyInner::Incoming(i) => i.size_hint.content_length(), + } } -} -impl IntoBody for Vec { - type IntoBody = BoundedBody>; - fn into_body(self) -> Self::IntoBody { - BoundedBody(Cursor::new(self)) + pub fn empty() -> Self { + Body(BodyInner::Complete(Bytes::new())) } -} -impl IntoBody for &[u8] { - type IntoBody = BoundedBody>; - fn into_body(self) -> Self::IntoBody { - BoundedBody(Cursor::new(self.to_owned())) + pub fn from_string(s: impl Into) -> Self { + let s = s.into(); + Body(BodyInner::Complete(Bytes::from_owner(s.into_bytes()))) } -} -/// An HTTP body with a known length -#[derive(Debug)] -pub struct BoundedBody(Cursor); + pub async fn str_contents(&mut self) -> Result<&str, Error> { + let bs = self.contents().await?; + std::str::from_utf8(bs).context("decoding body contents as string") + } -impl> AsyncRead for BoundedBody { - async fn read(&mut self, buf: &mut [u8]) -> crate::io::Result { - self.0.read(buf).await + pub fn from_bytes(b: impl Into) -> Self { + let b = b.into(); + Body::from(http_body_util::Full::new(b)) } -} -impl> Body for BoundedBody { - fn len(&self) -> Option { - Some(self.0.get_ref().as_ref().len()) + + #[cfg(feature = "json")] + pub fn from_json(data: &T) -> Result { + Ok(Self::from_string(serde_json::to_string(data)?)) } -} -/// An HTTP body with an unknown length -#[derive(Debug)] -pub struct StreamedBody(S); + #[cfg(feature = "json")] + pub async fn json serde::Deserialize<'a>>(&mut self) -> Result { + let str = self.str_contents().await?; + serde_json::from_str(str).context("decoding body contents as json") + } -impl StreamedBody { - /// Wrap an `AsyncRead` impl in a type that provides a [`Body`] implementation. - pub fn new(s: S) -> Self { - Self(s) + pub fn from_input_stream(r: crate::io::AsyncInputStream) -> Self { + use futures_lite::stream::StreamExt; + Body(BodyInner::Boxed(http_body_util::BodyExt::boxed( + http_body_util::StreamBody::new(r.into_stream().map(|res| { + res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec))) + .map_err(Into::into) + })), + ))) } } -impl AsyncRead for StreamedBody { - async fn read(&mut self, buf: &mut [u8]) -> crate::io::Result { - self.0.read(buf).await - } + +fn annotate_err(_: E) -> Error { + unreachable!() } -impl Body for StreamedBody { - fn len(&self) -> Option { - None + +impl From for Body +where + B: HttpBody + Send + Sync + 'static, + ::Data: Into, + ::Error: Into, +{ + fn from(http_body: B) -> Body { + use util::BodyExt; + Body(BodyInner::Boxed( + http_body + .map_frame(|f| f.map_data(Into::into)) + .map_err(Into::into) + .boxed(), + )) } } -impl Body for Empty { - fn len(&self) -> Option { - Some(0) +impl From for Body { + fn from(incoming: Incoming) -> Body { + Body(BodyInner::Incoming(incoming)) } } -/// An incoming HTTP body #[derive(Debug)] -pub struct IncomingBody { - kind: BodyKind, - // IMPORTANT: the order of these fields here matters. `body_stream` must - // be dropped before `incoming_body`. - body_stream: AsyncInputStream, - incoming_body: WasiIncomingBody, +pub struct Incoming { + body: WasiIncomingBody, + size_hint: BodyHint, } -impl IncomingBody { - pub(crate) fn new( - kind: BodyKind, - body_stream: AsyncInputStream, - incoming_body: WasiIncomingBody, - ) -> Self { - Self { - kind, - body_stream, - incoming_body, - } +impl Incoming { + pub(crate) fn new(body: WasiIncomingBody, size_hint: BodyHint) -> Self { + Self { body, size_hint } } - - /// Consume this `IncomingBody` and return the trailers, if present. - pub async fn finish(self) -> Result, Error> { - // The stream is a child resource of the `IncomingBody`, so ensure that - // it's dropped first. - drop(self.body_stream); - - let trailers = WasiIncomingBody::finish(self.incoming_body); - - AsyncPollable::new(trailers.subscribe()).wait_for().await; - - let trailers = trailers.get().unwrap().unwrap()?; - - let trailers = match trailers { - None => None, - Some(trailers) => Some(header_map_from_wasi(trailers)?), - }; - - Ok(trailers) + /// Use with `http_body::Body` trait + pub fn into_http_body(self) -> IncomingBody { + IncomingBody::new(self.body, self.size_hint) } - - /// Try to deserialize the incoming body as JSON. The optional - /// `json` feature is required. - /// - /// Fails whenever the response body is not in JSON format, - /// or it cannot be properly deserialized to target type `T`. For more - /// details please see [`serde_json::from_reader`]. - /// - /// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html - #[cfg(feature = "json")] - pub async fn json(&mut self) -> Result { - let buf = self.bytes().await?; - serde_json::from_slice(&buf).map_err(|e| ErrorVariant::Other(e.to_string()).into()) + pub fn into_body(self) -> Body { + self.into() } - - /// Get the full response body as `Vec`. - pub async fn bytes(&mut self) -> Result, Error> { - let mut buf = match self.kind { - BodyKind::Fixed(l) => { - if l > (usize::MAX as u64) { - return Err(ErrorVariant::Other( - "incoming body is too large to allocate and buffer in memory".to_string(), - ) - .into()); - } else { - Vec::with_capacity(l as usize) - } - } - BodyKind::Chunked => Vec::with_capacity(4096), - }; - self.read_to_end(&mut buf).await?; - Ok(buf) + pub fn into_inner(self) -> WasiIncomingBody { + self.body } } -impl AsyncRead for IncomingBody { - async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result { - self.body_stream.read(out_buf).await - } - - fn as_async_input_stream(&self) -> Option<&AsyncInputStream> { - Some(&self.body_stream) - } +#[derive(Clone, Copy, Debug)] +pub enum BodyHint { + ContentLength(u64), + Unknown, } -impl Body for IncomingBody { - fn len(&self) -> Option { - match self.kind { - BodyKind::Fixed(l) => { - if l > (usize::MAX as u64) { - None - } else { - Some(l as usize) - } - } - BodyKind::Chunked => None, +impl BodyHint { + pub fn from_headers(headers: &HeaderMap) -> Result { + if let Some(val) = headers.get(CONTENT_LENGTH) { + let len = std::str::from_utf8(val.as_ref()) + .map_err(|_| InvalidContentLength)? + .parse::() + .map_err(|_| InvalidContentLength)?; + Ok(BodyHint::ContentLength(len)) + } else { + Ok(BodyHint::Unknown) + } + } + fn content_length(&self) -> Option { + match self { + BodyHint::ContentLength(l) => Some(*l), + _ => None, } } } - #[derive(Debug)] pub struct InvalidContentLength; - impl fmt::Display for InvalidContentLength { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "incoming content-length should be a u64; violates HTTP/1.1".fmt(f) + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Invalid Content-Length header") } } - impl std::error::Error for InvalidContentLength {} -impl From for Error { - fn from(e: InvalidContentLength) -> Self { - // TODO: What's the right error code here? - ErrorVariant::Other(e.to_string()).into() - } -} - -/// The output stream for the body, implementing [`AsyncWrite`]. Call -/// [`Responder::start_response`] or [`Client::start_request`] to obtain -/// one. Once the body is complete, it must be declared finished, using -/// [`Finished::finish`], [`Finished::fail`], [`Client::finish`], or -/// [`Client::fail`]. -/// -/// [`Responder::start_response`]: crate::http::server::Responder::start_response -/// [`Client::start_request`]: crate::http::client::Client::start_request -/// [`Finished::finish`]: crate::http::server::Finished::finish -/// [`Finished::fail`]: crate::http::server::Finished::fail -/// [`Client::finish`]: crate::http::client::Client::finish -/// [`Client::fail`]: crate::http::client::Client::fail -#[must_use] -pub struct OutgoingBody { - // IMPORTANT: the order of these fields here matters. `stream` must - // be dropped before `body`. - stream: AsyncOutputStream, - body: wasip2::http::types::OutgoingBody, - dontdrop: DontDropOutgoingBody, +#[derive(Debug)] +pub struct IncomingBody { + state: Option>>, + size_hint: BodyHint, } -impl OutgoingBody { - pub(crate) fn new(stream: AsyncOutputStream, body: wasip2::http::types::OutgoingBody) -> Self { +impl IncomingBody { + fn new(body: WasiIncomingBody, size_hint: BodyHint) -> Self { Self { - stream, - body, - dontdrop: DontDropOutgoingBody, + state: Some(Box::pin(IncomingBodyState::Body { + read_state: BodyState { + wait: None, + subscription: None, + stream: body + .stream() + .expect("wasi incoming-body stream should not yet be taken"), + }, + body: Some(body), + })), + size_hint, } } +} - pub(crate) fn consume(self) -> (AsyncOutputStream, wasip2::http::types::OutgoingBody) { - let Self { - stream, - body, - dontdrop, - } = self; - - std::mem::forget(dontdrop); - - (stream, body) +impl HttpBody for IncomingBody { + type Data = Bytes; + type Error = Error; + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + loop { + let state = self.as_mut().state.take(); + if state.is_none() { + return Poll::Ready(None); + } + let mut state = state.unwrap(); + match state.as_mut().project() { + IBSProj::Body { read_state, body } => match read_state.poll_frame(cx) { + Poll::Pending => { + self.as_mut().state = Some(state); + return Poll::Pending; + } + Poll::Ready(Some(r)) => { + self.as_mut().state = Some(state); + return Poll::Ready(Some(r)); + } + Poll::Ready(None) => { + // state contains children of the incoming-body. Must drop it + // in order to finish + let body = body.take().expect("finishing Body state"); + drop(state); + let trailers_state = TrailersState::new(WasiIncomingBody::finish(body)); + self.as_mut().state = + Some(Box::pin(IncomingBodyState::Trailers { trailers_state })); + continue; + } + }, + IBSProj::Trailers { trailers_state } => match trailers_state.poll_frame(cx) { + Poll::Pending => { + self.as_mut().state = Some(state); + return Poll::Pending; + } + Poll::Ready(r) => return Poll::Ready(r), + }, + } + } } - - /// Return a reference to the underlying `AsyncOutputStream`. - /// - /// This usually isn't needed, as `OutgoingBody` implements `AsyncWrite` - /// too, however it is useful for code that expects to work with - /// `AsyncOutputStream` specifically. - pub fn stream(&mut self) -> &mut AsyncOutputStream { - &mut self.stream + fn is_end_stream(&self) -> bool { + self.state.is_none() + } + fn size_hint(&self) -> SizeHint { + match self.size_hint { + BodyHint::ContentLength(l) => SizeHint::with_exact(l), + _ => Default::default(), + } } } -impl AsyncWrite for OutgoingBody { - async fn write(&mut self, buf: &[u8]) -> crate::io::Result { - self.stream.write(buf).await +pin_project_lite::pin_project! { + #[project = IBSProj] + #[derive(Debug)] + enum IncomingBodyState { + Body { + #[pin] + read_state: BodyState, + // body is Some until we need to remove it from a projection + // during a state transition + body: Option + }, + Trailers { + #[pin] + trailers_state: TrailersState + }, } +} - async fn flush(&mut self) -> crate::io::Result<()> { - self.stream.flush().await - } +#[derive(Debug)] +struct BodyState { + wait: Option>>, + subscription: Option, + stream: WasiInputStream, +} - fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { - Some(&self.stream) +const MAX_FRAME_SIZE: u64 = 64 * 1024; + +impl BodyState { + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>>> { + loop { + match self.stream.read(MAX_FRAME_SIZE) { + Ok(bs) if !bs.is_empty() => { + return Poll::Ready(Some(Ok(Frame::data(Bytes::from(bs))))) + } + Err(StreamError::Closed) => return Poll::Ready(None), + Err(StreamError::LastOperationFailed(err)) => { + return Poll::Ready(Some(Err( + Error::msg(err.to_debug_string()).context("reading incoming body stream") + ))) + } + Ok(_empty) => { + if self.subscription.is_none() { + self.as_mut().subscription = + Some(Reactor::current().schedule(self.stream.subscribe())); + } + if self.wait.is_none() { + let wait = self.as_ref().subscription.as_ref().unwrap().wait_for(); + self.as_mut().wait = Some(Box::pin(wait)); + } + let mut taken_wait = self.as_mut().wait.take().unwrap(); + match taken_wait.as_mut().poll(cx) { + Poll::Pending => { + self.as_mut().wait = Some(taken_wait); + return Poll::Pending; + } + // Its possible that, after returning ready, the + // stream does not actually provide any input. This + // behavior should only occur once. + Poll::Ready(()) => { + continue; + } + } + } + } + } } } -/// A utility to ensure that `OutgoingBody` is either finished or failed, and -/// not implicitly dropped. -struct DontDropOutgoingBody; +#[derive(Debug)] +struct TrailersState { + wait: Option>>, + subscription: Option, + future_trailers: FutureTrailers, +} -impl Drop for DontDropOutgoingBody { - fn drop(&mut self) { - unreachable!("`OutgoingBody::drop` called; `OutgoingBody`s should be consumed with `finish` or `fail`."); +impl TrailersState { + fn new(future_trailers: FutureTrailers) -> Self { + Self { + wait: None, + subscription: None, + future_trailers, + } } -} -/// A placeholder for use as the type parameter to [`Request`] and [`Response`] -/// to indicate that the body has not yet started. This is used with -/// [`Client::start_request`] and [`Responder::start_response`], which have -/// `Requeset` and `Response` arguments, -/// respectively. -/// -/// To instead start the response and obtain the output stream for the body, -/// use [`Responder::respond`]. -/// To instead send a request or response with an input stream for the body, -/// use [`Client::send`] or [`Responder::respond`]. -/// -/// [`Request`]: crate::http::Request -/// [`Response`]: crate::http::Response -/// [`Client::start_request`]: crate::http::Client::start_request -/// [`Responder::start_response`]: crate::http::server::Responder::start_response -/// [`Client::send`]: crate::http::Client::send -/// [`Responder::respond`]: crate::http::server::Responder::respond -pub struct BodyForthcoming; + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>>> { + loop { + if let Some(ready) = self.future_trailers.get() { + return match ready { + Ok(Ok(Some(trailers))) => match header_map_from_wasi(trailers) { + Ok(header_map) => Poll::Ready(Some(Ok(Frame::trailers(header_map)))), + Err(e) => { + Poll::Ready(Some(Err(e.context("decoding incoming body trailers")))) + } + }, + Ok(Ok(None)) => Poll::Ready(None), + Ok(Err(e)) => Poll::Ready(Some(Err( + Error::from(e).context("reading incoming body trailers") + ))), + Err(()) => unreachable!("future_trailers.get with some called at most once"), + }; + } + if self.subscription.is_none() { + self.as_mut().subscription = + Some(Reactor::current().schedule(self.future_trailers.subscribe())); + } + if self.wait.is_none() { + let wait = self.as_ref().subscription.as_ref().unwrap().wait_for(); + self.as_mut().wait = Some(Box::pin(wait)); + } + let mut taken_wait = self.as_mut().wait.take().unwrap(); + match taken_wait.as_mut().poll(cx) { + Poll::Pending => { + self.as_mut().wait = Some(taken_wait); + return Poll::Pending; + } + // Its possible that, after returning ready, the + // future_trailers.get() does not actually provide any input. This + // behavior should only occur once. + Poll::Ready(()) => { + continue; + } + } + } + } +} diff --git a/src/http/client.rs b/src/http/client.rs index fd251d6..49613b9 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -1,24 +1,11 @@ -use super::{ - body::{BodyForthcoming, IncomingBody, OutgoingBody}, - fields::header_map_to_wasi, - Body, Error, HeaderMap, Request, Response, Result, -}; +use super::{body::Incoming, Body, Error, Request, Response}; use crate::http::request::try_into_outgoing; use crate::http::response::try_from_incoming; -use crate::io::{self, AsyncOutputStream, AsyncPollable}; -use crate::runtime::WaitFor; +use crate::io::AsyncPollable; use crate::time::Duration; -use pin_project_lite::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use wasip2::http::types::{ - FutureIncomingResponse as WasiFutureIncomingResponse, OutgoingBody as WasiOutgoingBody, - RequestOptions as WasiRequestOptions, -}; +use wasip2::http::types::RequestOptions as WasiRequestOptions; /// An HTTP client. -// Empty for now, but permits adding support for RequestOptions soon: #[derive(Debug)] pub struct Client { options: Option, @@ -37,121 +24,32 @@ impl Client { } /// Send an HTTP request. - /// - /// TODO: Should this automatically add a "Content-Length" header if the - /// body size is known? - /// - /// To respond with trailers, use [`Client::start_request`] instead. - pub async fn send(&self, req: Request) -> Result> { - // We don't use `body::OutputBody` here because we can report I/O - // errors from the `copy` directly. + pub async fn send>(&self, req: Request) -> Result, Error> { let (wasi_req, body) = try_into_outgoing(req)?; + let body = body.into(); let wasi_body = wasi_req.body().unwrap(); - let wasi_stream = wasi_body.write().unwrap(); // 1. Start sending the request head let res = wasip2::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap(); - // 2. Start sending the request body - io::copy(body, AsyncOutputStream::new(wasi_stream)).await?; - - // 3. Finish sending the request body - let trailers = None; - WasiOutgoingBody::finish(wasi_body, trailers).unwrap(); - - // 4. Receive the response - AsyncPollable::new(res.subscribe()).wait_for().await; - - // NOTE: the first `unwrap` is to ensure readiness, the second `unwrap` - // is to trap if we try and get the response more than once. The final - // `?` is to raise the actual error if there is one. - let res = res.get().unwrap().unwrap()?; - try_from_incoming(res) - } - - /// Start sending an HTTP request, and return an `OutgoingBody` stream to - /// write the body to. - /// - /// The returned `OutgoingBody` must be consumed by [`Client::finish`] or - /// [`Client::fail`]. - pub async fn start_request( - &self, - req: Request, - ) -> Result<( - OutgoingBody, - impl Future>>, - )> { - let (wasi_req, _body_forthcoming) = try_into_outgoing(req)?; - let wasi_body = wasi_req.body().unwrap(); - let wasi_stream = wasi_body.write().unwrap(); - - // Start sending the request head. - let res = wasip2::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap(); - - let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); - - pin_project! { - #[must_use = "futures do nothing unless polled or .awaited"] - struct IncomingResponseFuture { - #[pin] - subscription: WaitFor, - wasi: WasiFutureIncomingResponse, - } - } - impl Future for IncomingResponseFuture { - type Output = Result>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.subscription.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(()) => Poll::Ready( - this.wasi - .get() - .unwrap() - .unwrap() - .map_err(Error::from) - .and_then(try_from_incoming), - ), - } - } - } - - let subscription = AsyncPollable::new(res.subscribe()).wait_for(); - let future = IncomingResponseFuture { - subscription, - wasi: res, - }; - - Ok((outgoing_body, future)) - } - - /// Finish the body, optionally with trailers. - /// - /// This is used with [`Client::start_request`]. - pub fn finish(body: OutgoingBody, trailers: Option) -> Result<()> { - let (stream, body) = body.consume(); - - // The stream is a child resource of the `OutgoingBody`, so ensure that - // it's dropped first. - drop(stream); - - let wasi_trailers = match trailers { - Some(trailers) => Some(header_map_to_wasi(&trailers)?), - None => None, - }; - - wasip2::http::types::OutgoingBody::finish(body, wasi_trailers) - .expect("body length did not match Content-Length header value"); - Ok(()) - } - - /// Consume the `OutgoingBody` and indicate that the body was not - /// completed. - /// - /// This is used with [`Client::start_request`]. - pub fn fail(body: OutgoingBody) { - let (_stream, _body) = body.consume(); + let ((), body) = futures_lite::future::try_zip( + async move { + // 3. send the body: + body.send(wasi_body).await + }, + async move { + // 4. Receive the response + AsyncPollable::new(res.subscribe()).wait_for().await; + + // NOTE: the first `unwrap` is to ensure readiness, the second `unwrap` + // is to trap if we try and get the response more than once. The final + // `?` is to raise the actual error if there is one. + let res = res.get().unwrap().unwrap()?; + try_from_incoming(res) + }, + ) + .await?; + Ok(body) } /// Set timeout on connecting to HTTP server @@ -173,13 +71,13 @@ impl Client { match &mut self.options { Some(o) => o, uninit => { - *uninit = Some(Default::default()); + *uninit = Some(RequestOptions::default()); uninit.as_mut().unwrap() } } } - fn wasi_options(&self) -> Result> { + fn wasi_options(&self) -> Result, crate::http::Error> { self.options .as_ref() .map(RequestOptions::to_wasi) @@ -195,22 +93,26 @@ struct RequestOptions { } impl RequestOptions { - fn to_wasi(&self) -> Result { + fn to_wasi(&self) -> Result { let wasi = WasiRequestOptions::new(); if let Some(timeout) = self.connect_timeout { wasi.set_connect_timeout(Some(timeout.0)).map_err(|()| { - Error::other("wasi-http implementation does not support connect timeout option") + anyhow::Error::msg( + "wasi-http implementation does not support connect timeout option", + ) })?; } if let Some(timeout) = self.first_byte_timeout { wasi.set_first_byte_timeout(Some(timeout.0)).map_err(|()| { - Error::other("wasi-http implementation does not support first byte timeout option") + anyhow::Error::msg( + "wasi-http implementation does not support first byte timeout option", + ) })?; } if let Some(timeout) = self.between_bytes_timeout { wasi.set_between_bytes_timeout(Some(timeout.0)) .map_err(|()| { - Error::other( + anyhow::Error::msg( "wasi-http implementation does not support between byte timeout option", ) })?; diff --git a/src/http/error.rs b/src/http/error.rs index c3c540b..a4f22b0 100644 --- a/src/http/error.rs +++ b/src/http/error.rs @@ -1,124 +1,13 @@ -use crate::http::fields::ToWasiHeaderError; -use std::fmt; - -/// The `http` result type. -pub type Result = std::result::Result; - -/// The `http` error type. -pub struct Error { - variant: ErrorVariant, - context: Vec, -} +//! The http portion of wstd uses `anyhow::Error` as its `Error` type. +//! +//! There are various concrete error types +pub use crate::http::body::InvalidContentLength; +pub use anyhow::Context; pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; pub use http::method::InvalidMethod; -pub use wasip2::http::types::{ErrorCode as WasiHttpErrorCode, HeaderError as WasiHttpHeaderError}; - -impl fmt::Debug for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for c in self.context.iter() { - writeln!(f, "in {c}:")?; - } - match &self.variant { - ErrorVariant::WasiHttp(e) => write!(f, "wasi http error: {e:?}"), - ErrorVariant::WasiHeader(e) => write!(f, "wasi header error: {e:?}"), - ErrorVariant::HeaderName(e) => write!(f, "header name error: {e:?}"), - ErrorVariant::HeaderValue(e) => write!(f, "header value error: {e:?}"), - ErrorVariant::Method(e) => write!(f, "method error: {e:?}"), - ErrorVariant::BodyIo(e) => write!(f, "body error: {e:?}"), - ErrorVariant::Other(e) => write!(f, "{e}"), - } - } -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.variant { - ErrorVariant::WasiHttp(e) => write!(f, "wasi http error: {e}"), - ErrorVariant::WasiHeader(e) => write!(f, "wasi header error: {e}"), - ErrorVariant::HeaderName(e) => write!(f, "header name error: {e}"), - ErrorVariant::HeaderValue(e) => write!(f, "header value error: {e}"), - ErrorVariant::Method(e) => write!(f, "method error: {e}"), - ErrorVariant::BodyIo(e) => write!(f, "body error: {e}"), - ErrorVariant::Other(e) => write!(f, "{e}"), - } - } -} - -impl std::error::Error for Error {} - -impl Error { - pub fn variant(&self) -> &ErrorVariant { - &self.variant - } - pub(crate) fn other(s: impl Into) -> Self { - ErrorVariant::Other(s.into()).into() - } - pub(crate) fn context(self, s: impl Into) -> Self { - let mut context = self.context; - context.push(s.into()); - Self { - variant: self.variant, - context, - } - } -} - -impl From for Error { - fn from(variant: ErrorVariant) -> Error { - Error { - variant, - context: Vec::new(), - } - } -} +pub use wasip2::http::types::{ErrorCode, HeaderError}; -impl From for Error { - fn from(e: WasiHttpErrorCode) -> Error { - ErrorVariant::WasiHttp(e).into() - } -} - -impl From for Error { - fn from(error: ToWasiHeaderError) -> Error { - Error { - variant: ErrorVariant::WasiHeader(error.error), - context: vec![error.context], - } - } -} - -impl From for Error { - fn from(e: InvalidHeaderValue) -> Error { - ErrorVariant::HeaderValue(e).into() - } -} - -impl From for Error { - fn from(e: InvalidHeaderName) -> Error { - ErrorVariant::HeaderName(e).into() - } -} - -impl From for Error { - fn from(e: InvalidMethod) -> Error { - ErrorVariant::Method(e).into() - } -} - -impl From for Error { - fn from(e: std::io::Error) -> Error { - ErrorVariant::BodyIo(e).into() - } -} - -#[derive(Debug)] -pub enum ErrorVariant { - WasiHttp(WasiHttpErrorCode), - WasiHeader(WasiHttpHeaderError), - HeaderName(InvalidHeaderName), - HeaderValue(InvalidHeaderValue), - Method(InvalidMethod), - BodyIo(std::io::Error), - Other(String), -} +pub type Error = anyhow::Error; +/// The `http` result type. +pub type Result = std::result::Result; diff --git a/src/http/fields.rs b/src/http/fields.rs index 34452f5..35a9be7 100644 --- a/src/http/fields.rs +++ b/src/http/fields.rs @@ -1,36 +1,27 @@ pub use http::header::{HeaderMap, HeaderName, HeaderValue}; -use super::Error; -use wasip2::http::types::{Fields, HeaderError as WasiHttpHeaderError}; +use super::{error::Context, Error}; +use wasip2::http::types::Fields; pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { let mut output = HeaderMap::new(); for (key, value) in wasi_fields.entries() { - let key = HeaderName::from_bytes(key.as_bytes()) - .map_err(|e| Error::from(e).context("header name {key}"))?; - let value = HeaderValue::from_bytes(&value) - .map_err(|e| Error::from(e).context("header value for {key}"))?; + let key = + HeaderName::from_bytes(key.as_bytes()).with_context(|| format!("header name {key}"))?; + let value = + HeaderValue::from_bytes(&value).with_context(|| format!("header value for {key}"))?; output.append(key, value); } Ok(output) } -pub(crate) fn header_map_to_wasi(header_map: &HeaderMap) -> Result { +pub(crate) fn header_map_to_wasi(header_map: &HeaderMap) -> Result { let wasi_fields = Fields::new(); for (key, value) in header_map { // Unwrap because `HeaderMap` has already validated the headers. wasi_fields .append(key.as_str(), value.as_bytes()) - .map_err(|error| ToWasiHeaderError { - error, - context: format!("header {key}: {value:?}"), - })?; + .with_context(|| format!("wasi rejected header `{key}: {value:?}`"))? } Ok(wasi_fields) } - -#[derive(Debug)] -pub(crate) struct ToWasiHeaderError { - pub(crate) error: WasiHttpHeaderError, - pub(crate) context: String, -} diff --git a/src/http/mod.rs b/src/http/mod.rs index ef16d13..fd607d8 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -4,9 +4,9 @@ pub use http::status::StatusCode; pub use http::uri::{Authority, PathAndQuery, Uri}; #[doc(inline)] -pub use body::{Body, IntoBody}; +pub use body::{util::BodyExt, Body, Incoming}; pub use client::Client; -pub use error::{Error, Result}; +pub use error::{Error, ErrorCode, Result}; pub use fields::{HeaderMap, HeaderName, HeaderValue}; pub use method::Method; pub use request::Request; diff --git a/src/http/request.rs b/src/http/request.rs index d150367..aeec352 100644 --- a/src/http/request.rs +++ b/src/http/request.rs @@ -1,59 +1,17 @@ use super::{ - body::{BodyKind, IncomingBody}, - error::WasiHttpErrorCode, + body::{BodyHint, Incoming}, + error::{Context, Error, ErrorCode}, fields::{header_map_from_wasi, header_map_to_wasi}, method::{from_wasi_method, to_wasi_method}, scheme::{from_wasi_scheme, to_wasi_scheme}, - Authority, Error, HeaderMap, PathAndQuery, Uri, + Authority, HeaderMap, PathAndQuery, Uri, }; -use crate::io::AsyncInputStream; use wasip2::http::outgoing_handler::OutgoingRequest; use wasip2::http::types::IncomingRequest; pub use http::request::{Builder, Request}; -#[cfg(feature = "json")] -use super::{ - body::{BoundedBody, IntoBody}, - error::ErrorVariant, -}; -#[cfg(feature = "json")] -use http::header::{HeaderValue, CONTENT_TYPE}; -#[cfg(feature = "json")] -use serde::Serialize; -#[cfg(feature = "json")] -use serde_json; - -#[cfg(feature = "json")] -pub trait JsonRequest { - fn json(self, json: &T) -> Result>>, Error>; -} - -#[cfg(feature = "json")] -impl JsonRequest for Builder { - /// Send a JSON body. Requires optional `json` feature. - /// - /// Serialization can fail if `T`'s implementation of `Serialize` decides to - /// fail. - #[cfg(feature = "json")] - fn json(self, json: &T) -> Result>>, Error> { - let encoded = serde_json::to_vec(json).map_err(|e| ErrorVariant::Other(e.to_string()))?; - let builder = if !self - .headers_ref() - .is_some_and(|headers| headers.contains_key(CONTENT_TYPE)) - { - self.header( - CONTENT_TYPE, - HeaderValue::from_static("application/json; charset=utf-8"), - ) - } else { - self - }; - builder - .body(encoded.into_body()) - .map_err(|e| ErrorVariant::Other(e.to_string()).into()) - } -} +// TODO: go back and add json stuff??? pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingRequest, T), Error> { let wasi_req = OutgoingRequest::new(header_map_to_wasi(request.headers())?); @@ -64,7 +22,7 @@ pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingReque let method = to_wasi_method(parts.method); wasi_req .set_method(&method) - .map_err(|()| Error::other(format!("method rejected by wasi-http: {method:?}",)))?; + .map_err(|()| anyhow::anyhow!("method rejected by wasi-http: {method:?}"))?; // Set the url scheme let scheme = parts @@ -74,21 +32,19 @@ pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingReque .unwrap_or(wasip2::http::types::Scheme::Https); wasi_req .set_scheme(Some(&scheme)) - .map_err(|()| Error::other(format!("scheme rejected by wasi-http: {scheme:?}")))?; + .map_err(|()| anyhow::anyhow!("scheme rejected by wasi-http: {scheme:?}"))?; // Set authority let authority = parts.uri.authority().map(Authority::as_str); wasi_req .set_authority(authority) - .map_err(|()| Error::other(format!("authority rejected by wasi-http {authority:?}")))?; + .map_err(|()| anyhow::anyhow!("authority rejected by wasi-http {authority:?}"))?; // Set the url path + query string if let Some(p_and_q) = parts.uri.path_and_query() { wasi_req .set_path_with_query(Some(p_and_q.as_str())) - .map_err(|()| { - Error::other(format!("path and query rejected by wasi-http {p_and_q:?}")) - })?; + .map_err(|()| anyhow::anyhow!("path and query rejected by wasi-http {p_and_q:?}"))?; } // All done; request is ready for send-off @@ -97,41 +53,41 @@ pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingReque /// This is used by the `http_server` macro. #[doc(hidden)] -pub fn try_from_incoming( - incoming: IncomingRequest, -) -> Result, WasiHttpErrorCode> { - // TODO: What's the right error code to use for invalid headers? +pub fn try_from_incoming(incoming: IncomingRequest) -> Result, Error> { let headers: HeaderMap = header_map_from_wasi(incoming.headers()) - .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string())))?; + .context("headers provided by wasi rejected by http::HeaderMap")?; - let method = from_wasi_method(incoming.method()) - .map_err(|_| WasiHttpErrorCode::HttpRequestMethodInvalid)?; - let scheme = incoming.scheme().map(|scheme| { - from_wasi_scheme(scheme).expect("TODO: what shall we do with an invalid uri here?") - }); - let authority = incoming.authority().map(|authority| { - Authority::from_maybe_shared(authority) - .expect("TODO: what shall we do with an invalid uri authority here?") - }); - let path_and_query = incoming.path_with_query().map(|path_and_query| { - PathAndQuery::from_maybe_shared(path_and_query) - .expect("TODO: what shall we do with an invalid uri path-and-query here?") - }); + let method = + from_wasi_method(incoming.method()).map_err(|_| ErrorCode::HttpRequestMethodInvalid)?; + let scheme = incoming + .scheme() + .map(|scheme| { + from_wasi_scheme(scheme).context("scheme provided by wasi rejected by http::Scheme") + }) + .transpose()?; + let authority = incoming + .authority() + .map(|authority| { + Authority::from_maybe_shared(authority) + .context("authority provided by wasi rejected by http::Authority") + }) + .transpose()?; + let path_and_query = incoming + .path_with_query() + .map(|path_and_query| { + PathAndQuery::from_maybe_shared(path_and_query) + .context("path and query provided by wasi rejected by http::PathAndQuery") + }) + .transpose()?; + + let hint = BodyHint::from_headers(&headers)?; - // TODO: What's the right error code to use for invalid headers? - let kind = BodyKind::from_headers(&headers) - .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string())))?; // `body_stream` is a child of `incoming_body` which means we cannot // drop the parent before we drop the child let incoming_body = incoming .consume() - .expect("cannot call `consume` twice on incoming request"); - let body_stream = incoming_body - .stream() - .expect("cannot call `stream` twice on an incoming body"); - let body_stream = AsyncInputStream::new(body_stream); - - let body = IncomingBody::new(kind, body_stream, incoming_body); + .expect("`consume` should not have been called previously on this incoming-request"); + let body = Incoming::new(incoming_body, hint); let mut uri = Uri::builder(); if let Some(scheme) = scheme { @@ -143,17 +99,11 @@ pub fn try_from_incoming( if let Some(path_and_query) = path_and_query { uri = uri.path_and_query(path_and_query); } - // TODO: What's the right error code to use for an invalid uri? - let uri = uri - .build() - .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string())))?; + let uri = uri.build().context("building uri from wasi")?; let mut request = Request::builder().method(method).uri(uri); if let Some(headers_mut) = request.headers_mut() { *headers_mut = headers; } - // TODO: What's the right error code to use for an invalid request? - request - .body(body) - .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string()))) + request.body(body).context("building request from wasi") } diff --git a/src/http/response.rs b/src/http/response.rs index 0c80ff0..d7142e3 100644 --- a/src/http/response.rs +++ b/src/http/response.rs @@ -1,34 +1,25 @@ +use http::StatusCode; use wasip2::http::types::IncomingResponse; -use super::{ - body::{BodyKind, IncomingBody}, - fields::header_map_from_wasi, - Error, HeaderMap, -}; -use crate::io::AsyncInputStream; -use http::StatusCode; +use crate::http::body::{BodyHint, Incoming}; +use crate::http::error::{Context, Error}; +use crate::http::fields::{header_map_from_wasi, HeaderMap}; pub use http::response::{Builder, Response}; -pub(crate) fn try_from_incoming( - incoming: IncomingResponse, -) -> Result, Error> { +pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result, Error> { let headers: HeaderMap = header_map_from_wasi(incoming.headers())?; // TODO: Does WASI guarantee that the incoming status is valid? - let status = - StatusCode::from_u16(incoming.status()).map_err(|err| Error::other(err.to_string()))?; + let status = StatusCode::from_u16(incoming.status()) + .map_err(|err| anyhow::anyhow!("wasi provided invalid status code ({err})"))?; - let kind = BodyKind::from_headers(&headers)?; + let hint = BodyHint::from_headers(&headers)?; // `body_stream` is a child of `incoming_body` which means we cannot // drop the parent before we drop the child let incoming_body = incoming .consume() .expect("cannot call `consume` twice on incoming response"); - let body_stream = incoming_body - .stream() - .expect("cannot call `stream` twice on an incoming body"); - - let body = IncomingBody::new(kind, AsyncInputStream::new(body_stream), incoming_body); + let body = Incoming::new(incoming_body, hint); let mut builder = Response::builder().status(status); @@ -36,7 +27,5 @@ pub(crate) fn try_from_incoming( *headers_mut = headers; } - builder - .body(body) - .map_err(|err| Error::other(err.to_string())) + builder.body(body).context("building response") } diff --git a/src/http/server.rs b/src/http/server.rs index 7a9117d..3763151 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -1,16 +1,14 @@ //! HTTP servers //! -//! The WASI HTTP server API uses the [typed main] idiom, with a `main` function -//! that takes a [`Request`] and a [`Responder`], and responds with a [`Response`], -//! using the [`http_server`] macro: +//! The WASI HTTP server uses the [typed main] idiom, with a `main` function +//! that takes a [`Request`] and succeeds with a [`Response`], using the +//! [`http_server`] macro: //! //! ```no_run -//! # use wstd::http::{Request, Response, IntoBody, server::{Finished, Responder}, body::IncomingBody}; +//! use wstd::http::{Request, Response, Incoming, Body, Error}; //! #[wstd::http_server] -//! async fn main(request: Request, responder: Responder) -> Finished { -//! responder -//! .respond(Response::new("Hello!\n".into_body())) -//! .await +//! async fn main(_request: Request) -> Result, Error> { +//! Ok(Response::new("Hello!\n".to_string().into())) //! } //! ``` //! @@ -20,22 +18,12 @@ //! [`Response`]: crate::http::Response //! [`http_server`]: crate::http_server -use super::{ - body::{BodyForthcoming, OutgoingBody}, - error::WasiHttpErrorCode, - fields::header_map_to_wasi, - Body, HeaderMap, Response, -}; -use crate::io::{copy, AsyncOutputStream}; +use super::{error::ErrorCode, fields::header_map_to_wasi, Body, Error, Response}; use http::header::CONTENT_LENGTH; use wasip2::exports::http::incoming_handler::ResponseOutparam; use wasip2::http::types::OutgoingResponse; -/// This is passed into the [`http_server`] `main` function and holds the state -/// needed for a handler to produce a response, or fail. There are two ways to -/// respond, with [`Responder::start_response`] to stream the body in, or -/// [`Responder::respond`] to give the body as a string, byte array, or input -/// stream. See those functions for examples. +/// For use by the [`http_server`] macro only. /// /// [`http_server`]: crate::http_server #[must_use] @@ -44,72 +32,19 @@ pub struct Responder { } impl Responder { - /// Start responding with the given `Response` and return an `OutgoingBody` - /// stream to write the body to. - /// - /// # Example - /// - /// ``` - /// # use wstd::http::{body::{IncomingBody, BodyForthcoming}, Response, Request}; - /// # use wstd::http::server::{Finished, Responder}; - /// # use crate::wstd::io::AsyncWrite; - /// # async fn example(responder: Responder) -> Finished { - /// let mut body = responder.start_response(Response::new(BodyForthcoming)); - /// let result = body - /// .write_all("Hello!\n".as_bytes()) - /// .await; - /// Finished::finish(body, result, None) - /// # } - /// # fn main() {} - /// ``` - pub fn start_response(self, response: Response) -> OutgoingBody { - let wasi_headers = header_map_to_wasi(response.headers()).expect("header error"); - let wasi_response = OutgoingResponse::new(wasi_headers); - let wasi_status = response.status().as_u16(); - - // Unwrap because `StatusCode` has already validated the status. - wasi_response.set_status_code(wasi_status).unwrap(); - - // Unwrap because we can be sure we only call these once. - let wasi_body = wasi_response.body().unwrap(); - let wasi_stream = wasi_body.write().unwrap(); - - // Tell WASI to start the show. - ResponseOutparam::set(self.outparam, Ok(wasi_response)); - - OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body) - } - - /// Respond with the given `Response` which contains the body. - /// - /// If the body has a known length, a Content-Length header is automatically added. - /// - /// To respond with trailers, use [`Responder::start_response`] instead. - /// - /// # Example - /// - /// ``` - /// # use wstd::http::{body::{IncomingBody, BodyForthcoming}, IntoBody, Response, Request}; - /// # use wstd::http::server::{Finished, Responder}; - /// # - /// # async fn example(responder: Responder) -> Finished { - /// responder - /// .respond(Response::new("Hello!\n".into_body())) - /// .await - /// # } - /// # fn main() {} - /// ``` - pub async fn respond(self, response: Response) -> Finished { + /// This is used by the `http_server` macro. + #[doc(hidden)] + pub async fn respond>(self, response: Response) -> Result<(), Error> { let headers = response.headers(); let status = response.status().as_u16(); let wasi_headers = header_map_to_wasi(headers).expect("header error"); // Consume the `response` and prepare to write the body. - let mut body = response.into_body(); + let body = response.into_body().into(); // Automatically add a Content-Length header. - if let Some(len) = body.len() { + if let Some(len) = body.content_length() { let mut buffer = itoa::Buffer::new(); wasi_headers .append(CONTENT_LENGTH.as_str(), buffer.format(len).as_bytes()) @@ -123,16 +58,14 @@ impl Responder { // Unwrap because we can be sure we only call these once. let wasi_body = wasi_response.body().unwrap(); - let wasi_stream = wasi_body.write().unwrap(); - // Tell WASI to start the show. + // Set the outparam to the response, which allows wasi-http to send + // the response status and headers. ResponseOutparam::set(self.outparam, Ok(wasi_response)); - let mut outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); - - let result = copy(&mut body, &mut outgoing_body).await; - let trailers = None; - Finished::finish(outgoing_body, result, trailers) + // Then send the body. The response will be fully sent once this + // future is ready. + body.send(wasi_body).await } /// This is used by the `http_server` macro. @@ -143,60 +76,12 @@ impl Responder { /// This is used by the `http_server` macro. #[doc(hidden)] - pub fn fail(self, err: WasiHttpErrorCode) -> Finished { - ResponseOutparam::set(self.outparam, Err(err)); - Finished(()) - } -} - -/// An opaque value returned from a handler indicating that the body is -/// finished, either by [`Finished::finish`] or [`Finished::fail`]. -pub struct Finished(pub(crate) ()); - -impl Finished { - /// Finish the body, optionally with trailers, and return a `Finished` - /// token to be returned from the [`http_server`] `main` function to indicate - /// that the response is finished. - /// - /// `result` is a `std::io::Result` for reporting any I/O errors that - /// occur while writing to the body stream. - /// - /// [`http_server`]: crate::http_server - pub fn finish( - body: OutgoingBody, - result: std::io::Result<()>, - trailers: Option, - ) -> Self { - let (stream, body) = body.consume(); - - // The stream is a child resource of the `OutgoingBody`, so ensure that - // it's dropped first. - drop(stream); - - // If there was an I/O error, panic and don't call `OutgoingBody::finish`. - result.expect("I/O error while writing the body"); - - let wasi_trailers = - trailers.map(|trailers| header_map_to_wasi(&trailers).expect("header error")); - - wasip2::http::types::OutgoingBody::finish(body, wasi_trailers) - .expect("body length did not match Content-Length header value"); - - Self(()) - } - - /// Return a `Finished` token that can be returned from a handler to - /// indicate that the body is not finished and should be considered - /// corrupted. - pub fn fail(body: OutgoingBody) -> Self { - let (stream, _body) = body.consume(); - - // The stream is a child resource of the `OutgoingBody`, so ensure that - // it's dropped first. - drop(stream); - - // No need to do anything else; omitting the call to `finish` achieves - // the desired effect. - Self(()) + pub fn fail(self, err: Error) -> Result<(), Error> { + let e = match err.downcast_ref::() { + Some(e) => e.clone(), + None => ErrorCode::InternalError(Some(format!("{err:?}"))), + }; + ResponseOutparam::set(self.outparam, Err(e)); + Err(err) } } diff --git a/src/io/cursor.rs b/src/io/cursor.rs index f05c284..51e4f7b 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs @@ -57,6 +57,7 @@ where } } +#[async_trait::async_trait(?Send)] impl AsyncRead for Cursor where T: AsRef<[u8]>, @@ -66,6 +67,7 @@ where } } +#[async_trait::async_trait(?Send)] impl AsyncWrite for Cursor<&mut [u8]> { async fn write(&mut self, buf: &[u8]) -> io::Result { std::io::Write::write(&mut self.inner, buf) @@ -75,6 +77,7 @@ impl AsyncWrite for Cursor<&mut [u8]> { } } +#[async_trait::async_trait(?Send)] impl AsyncWrite for Cursor<&mut Vec> { async fn write(&mut self, buf: &[u8]) -> io::Result { std::io::Write::write(&mut self.inner, buf) @@ -84,6 +87,7 @@ impl AsyncWrite for Cursor<&mut Vec> { } } +#[async_trait::async_trait(?Send)] impl AsyncWrite for Cursor> { async fn write(&mut self, buf: &[u8]) -> io::Result { std::io::Write::write(&mut self.inner, buf) diff --git a/src/io/empty.rs b/src/io/empty.rs index 9fbf873..386160c 100644 --- a/src/io/empty.rs +++ b/src/io/empty.rs @@ -2,12 +2,15 @@ use super::{AsyncRead, AsyncWrite}; #[non_exhaustive] pub struct Empty; + +#[async_trait::async_trait(?Send)] impl AsyncRead for Empty { async fn read(&mut self, _buf: &mut [u8]) -> super::Result { Ok(0) } } +#[async_trait::async_trait(?Send)] impl AsyncWrite for Empty { async fn write(&mut self, buf: &[u8]) -> super::Result { Ok(buf.len()) diff --git a/src/io/read.rs b/src/io/read.rs index a6a95da..a3b8ad4 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -2,6 +2,7 @@ use crate::io; const CHUNK_SIZE: usize = 2048; +#[async_trait::async_trait(?Send)] /// Read bytes from a source. pub trait AsyncRead { async fn read(&mut self, buf: &mut [u8]) -> io::Result; @@ -33,6 +34,7 @@ pub trait AsyncRead { } } +#[async_trait::async_trait(?Send)] impl AsyncRead for &mut R { #[inline] async fn read(&mut self, buf: &mut [u8]) -> io::Result { diff --git a/src/io/stdio.rs b/src/io/stdio.rs index 0a37e2a..af8a0ae 100644 --- a/src/io/stdio.rs +++ b/src/io/stdio.rs @@ -24,8 +24,14 @@ impl Stdin { pub fn is_terminal(&self) -> bool { LazyCell::force(&self.terminput).is_some() } + + /// Get the `AsyncInputStream` used to implement `Stdin` + pub fn into_inner(self) -> AsyncInputStream { + self.stream + } } +#[async_trait::async_trait(?Send)] impl AsyncRead for Stdin { #[inline] async fn read(&mut self, buf: &mut [u8]) -> Result { @@ -64,8 +70,14 @@ impl Stdout { pub fn is_terminal(&self) -> bool { LazyCell::force(&self.termoutput).is_some() } + + /// Get the `AsyncOutputStream` used to implement `Stdout` + pub fn into_inner(self) -> AsyncOutputStream { + self.stream + } } +#[async_trait::async_trait(?Send)] impl AsyncWrite for Stdout { #[inline] async fn write(&mut self, buf: &[u8]) -> Result { @@ -109,8 +121,14 @@ impl Stderr { pub fn is_terminal(&self) -> bool { LazyCell::force(&self.termoutput).is_some() } + + /// Get the `AsyncOutputStream` used to implement `Stderr` + pub fn into_inner(self) -> AsyncOutputStream { + self.stream + } } +#[async_trait::async_trait(?Send)] impl AsyncWrite for Stderr { #[inline] async fn write(&mut self, buf: &[u8]) -> Result { diff --git a/src/io/streams.rs b/src/io/streams.rs index d8b35ec..fd97063 100644 --- a/src/io/streams.rs +++ b/src/io/streams.rs @@ -1,6 +1,8 @@ use super::{AsyncPollable, AsyncRead, AsyncWrite}; -use std::cell::OnceCell; -use std::io::Result; +use std::future::{poll_fn, Future}; +use std::pin::Pin; +use std::sync::OnceLock; +use std::task::{Context, Poll}; use wasip2::io::streams::{InputStream, OutputStream, StreamError}; /// A wrapper for WASI's `InputStream` resource that provides implementations of `AsyncRead` and @@ -9,7 +11,7 @@ use wasip2::io::streams::{InputStream, OutputStream, StreamError}; pub struct AsyncInputStream { // Lazily initialized pollable, used for lifetime of stream to check readiness. // Field ordering matters: this child must be dropped before stream - subscription: OnceCell, + subscription: OnceLock, stream: InputStream, } @@ -17,22 +19,27 @@ impl AsyncInputStream { /// Construct an `AsyncInputStream` from a WASI `InputStream` resource. pub fn new(stream: InputStream) -> Self { Self { - subscription: OnceCell::new(), + subscription: OnceLock::new(), stream, } } - /// Await for read readiness. - async fn ready(&self) { + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { // Lazily initialize the AsyncPollable let subscription = self .subscription .get_or_init(|| AsyncPollable::new(self.stream.subscribe())); // Wait on readiness - subscription.wait_for().await; + let wait_for = subscription.wait_for(); + let mut pinned = std::pin::pin!(wait_for); + pinned.as_mut().poll(cx) + } + /// Await for read readiness. + async fn ready(&self) { + poll_fn(|cx| self.poll_ready(cx)).await } /// Asynchronously read from the input stream. /// This method is the same as [`AsyncRead::read`], but doesn't require a `&mut self`. - pub async fn read(&self, buf: &mut [u8]) -> Result { + pub async fn read(&self, buf: &mut [u8]) -> std::io::Result { let read = loop { self.ready().await; // Ideally, the ABI would be able to read directly into buf. @@ -56,10 +63,41 @@ impl AsyncInputStream { buf[0..len].copy_from_slice(&read); Ok(len) } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result, std::io::Error>`. The returned byte vectors + /// will be at most 8k. If you want to control chunk size, use + /// `Self::into_stream_of`. + pub fn into_stream(self) -> AsyncInputChunkStream { + AsyncInputChunkStream { + stream: self, + chunk_size: 8 * 1024, + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result, std::io::Error>`. The returned byte vectors + /// will be at most the `chunk_size` argument specified. + pub fn into_stream_of(self, chunk_size: usize) -> AsyncInputChunkStream { + AsyncInputChunkStream { + stream: self, + chunk_size, + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result`. + pub fn into_bytestream(self) -> AsyncInputByteStream { + AsyncInputByteStream { + stream: self.into_stream(), + buffer: std::io::Read::bytes(std::io::Cursor::new(Vec::new())), + } + } } +#[async_trait::async_trait(?Send)] impl AsyncRead for AsyncInputStream { - async fn read(&mut self, buf: &mut [u8]) -> Result { + async fn read(&mut self, buf: &mut [u8]) -> std::io::Result { Self::read(self, buf).await } @@ -69,13 +107,94 @@ impl AsyncRead for AsyncInputStream { } } +/// Wrapper of `AsyncInputStream` that impls `futures_lite::stream::Stream` +/// with an item of `Result, std::io::Error>` +pub struct AsyncInputChunkStream { + stream: AsyncInputStream, + chunk_size: usize, +} + +impl AsyncInputChunkStream { + /// Extract the `AsyncInputStream` which backs this stream. + pub fn into_inner(self) -> AsyncInputStream { + self.stream + } +} + +impl futures_lite::stream::Stream for AsyncInputChunkStream { + type Item = Result, std::io::Error>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.stream.poll_ready(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => match self.stream.stream.read(self.chunk_size as u64) { + Ok(r) if r.is_empty() => Poll::Pending, + Ok(r) => Poll::Ready(Some(Ok(r))), + Err(StreamError::LastOperationFailed(err)) => { + Poll::Ready(Some(Err(std::io::Error::other(err.to_debug_string())))) + } + Err(StreamError::Closed) => Poll::Ready(None), + }, + } + } +} + +pin_project_lite::pin_project! { + /// Wrapper of `AsyncInputStream` that impls + /// `futures_lite::stream::Stream` with item `Result`. + pub struct AsyncInputByteStream { + #[pin] + stream: AsyncInputChunkStream, + buffer: std::io::Bytes>>, + } +} + +impl AsyncInputByteStream { + /// Extract the `AsyncInputStream` which backs this stream, and any bytes + /// read from the `AsyncInputStream` which have not yet been yielded by + /// the byte stream. + pub fn into_inner(self) -> (AsyncInputStream, Vec) { + ( + self.stream.into_inner(), + self.buffer + .collect::, std::io::Error>>() + .expect("read of Cursor> is infallible"), + ) + } +} + +impl futures_lite::stream::Stream for AsyncInputByteStream { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.buffer.next() { + Some(byte) => Poll::Ready(Some(Ok(byte.expect("cursor on Vec is infallible")))), + None => match futures_lite::stream::Stream::poll_next(this.stream, cx) { + Poll::Ready(Some(Ok(bytes))) => { + let mut bytes = std::io::Read::bytes(std::io::Cursor::new(bytes)); + match bytes.next() { + Some(Ok(byte)) => { + *this.buffer = bytes; + Poll::Ready(Some(Ok(byte))) + } + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + } + } + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }, + } + } +} + /// A wrapper for WASI's `output-stream` resource that provides implementations of `AsyncWrite` and /// `AsyncPollable`. #[derive(Debug)] pub struct AsyncOutputStream { // Lazily initialized pollable, used for lifetime of stream to check readiness. // Field ordering matters: this child must be dropped before stream - subscription: OnceCell, + subscription: OnceLock, stream: OutputStream, } @@ -83,7 +202,7 @@ impl AsyncOutputStream { /// Construct an `AsyncOutputStream` from a WASI `OutputStream` resource. pub fn new(stream: OutputStream) -> Self { Self { - subscription: OnceCell::new(), + subscription: OnceLock::new(), stream, } } @@ -104,7 +223,7 @@ impl AsyncOutputStream { /// a `std::io::Error` indicating either an error returned by the stream write /// using the debug string provided by the WASI error, or else that the, /// indicated by `std::io::ErrorKind::ConnectionReset`. - pub async fn write(&self, buf: &[u8]) -> Result { + pub async fn write(&self, buf: &[u8]) -> std::io::Result { // Loops at most twice. loop { match self.stream.check_write() { @@ -145,7 +264,7 @@ impl AsyncOutputStream { /// the stream flush, using the debug string provided by the WASI error, /// or else that the stream is closed, indicated by /// `std::io::ErrorKind::ConnectionReset`. - pub async fn flush(&self) -> Result<()> { + pub async fn flush(&self) -> std::io::Result<()> { match self.stream.flush() { Ok(()) => { self.ready().await; @@ -160,12 +279,14 @@ impl AsyncOutputStream { } } } + +#[async_trait::async_trait(?Send)] impl AsyncWrite for AsyncOutputStream { // Required methods - async fn write(&mut self, buf: &[u8]) -> Result { + async fn write(&mut self, buf: &[u8]) -> std::io::Result { Self::write(self, buf).await } - async fn flush(&mut self) -> Result<()> { + async fn flush(&mut self) -> std::io::Result<()> { Self::flush(self).await } @@ -180,7 +301,7 @@ pub(crate) async fn splice( reader: &AsyncInputStream, writer: &AsyncOutputStream, len: u64, -) -> core::result::Result { +) -> Result { // Wait for both streams to be ready. let r = reader.ready(); writer.ready().await; diff --git a/src/io/write.rs b/src/io/write.rs index 79cf0d9..ce45121 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,6 +1,7 @@ use crate::io; /// Write bytes to a sink. +#[async_trait::async_trait(?Send)] pub trait AsyncWrite { // Required methods async fn write(&mut self, buf: &[u8]) -> io::Result; @@ -25,6 +26,7 @@ pub trait AsyncWrite { } } +#[async_trait::async_trait(?Send)] impl AsyncWrite for &mut W { #[inline] async fn write(&mut self, buf: &[u8]) -> io::Result { diff --git a/src/lib.rs b/src/lib.rs index ccbc2c1..c996866 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,7 +76,6 @@ pub use wasip2; pub mod prelude { pub use crate::future::FutureExt as _; - pub use crate::http::Body as _; pub use crate::io::AsyncRead as _; pub use crate::io::AsyncWrite as _; } diff --git a/src/net/tcp_stream.rs b/src/net/tcp_stream.rs index fc6ef99..6bee699 100644 --- a/src/net/tcp_stream.rs +++ b/src/net/tcp_stream.rs @@ -42,6 +42,7 @@ impl Drop for TcpStream { } } +#[async_trait::async_trait(?Send)] impl io::AsyncRead for TcpStream { async fn read(&mut self, buf: &mut [u8]) -> io::Result { self.input.read(buf).await @@ -52,6 +53,7 @@ impl io::AsyncRead for TcpStream { } } +#[async_trait::async_trait(?Send)] impl io::AsyncRead for &TcpStream { async fn read(&mut self, buf: &mut [u8]) -> io::Result { self.input.read(buf).await @@ -62,6 +64,7 @@ impl io::AsyncRead for &TcpStream { } } +#[async_trait::async_trait(?Send)] impl io::AsyncWrite for TcpStream { async fn write(&mut self, buf: &[u8]) -> io::Result { self.output.write(buf).await @@ -76,6 +79,7 @@ impl io::AsyncWrite for TcpStream { } } +#[async_trait::async_trait(?Send)] impl io::AsyncWrite for &TcpStream { async fn write(&mut self, buf: &[u8]) -> io::Result { self.output.write(buf).await @@ -91,6 +95,7 @@ impl io::AsyncWrite for &TcpStream { } pub struct ReadHalf<'a>(&'a TcpStream); +#[async_trait::async_trait(?Send)] impl<'a> io::AsyncRead for ReadHalf<'a> { async fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf).await @@ -111,6 +116,7 @@ impl<'a> Drop for ReadHalf<'a> { } pub struct WriteHalf<'a>(&'a TcpStream); +#[async_trait::async_trait(?Send)] impl<'a> io::AsyncWrite for WriteHalf<'a> { async fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf).await diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 67d1de2..407485d 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -1,13 +1,12 @@ use super::REACTOR; use async_task::{Runnable, Task}; -use core::cell::RefCell; use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll, Waker}; use slab::Slab; use std::collections::{HashMap, VecDeque}; -use std::rc::Rc; +use std::sync::{Arc, Mutex}; use wasip2::io::poll::Pollable; /// A key for a `Pollable`, which is an index into the `Slab` in `Reactor`. @@ -31,7 +30,7 @@ impl Drop for Registration { /// An AsyncPollable is a reference counted Registration. It can be cloned, and used to create /// as many WaitFor futures on a Pollable that the user needs. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct AsyncPollable(Rc); +pub struct AsyncPollable(Arc); impl AsyncPollable { /// Create an `AsyncPollable` from a Wasi `Pollable`. Schedules the `Pollable` with the current @@ -39,6 +38,8 @@ impl AsyncPollable { pub fn new(pollable: Pollable) -> Self { Reactor::current().schedule(pollable) } + // TODO: can I instead return a Pin<&mut WaitFor> here? so we dont keep + // recreating this. /// Create a Future that waits for the Pollable's readiness. pub fn wait_for(&self) -> WaitFor { use std::sync::atomic::{AtomicU64, Ordering}; @@ -92,16 +93,16 @@ impl Drop for WaitFor { /// Manage async system resources for WASI 0.2 #[derive(Debug, Clone)] pub struct Reactor { - inner: Rc, + inner: Arc, } /// The private, internal `Reactor` implementation - factored out so we can take /// a lock of the whole. #[derive(Debug)] struct InnerReactor { - pollables: RefCell>, - wakers: RefCell>, - ready_list: RefCell>, + pollables: Mutex>, + wakers: Mutex>, + ready_list: Mutex>, } impl Reactor { @@ -121,10 +122,10 @@ impl Reactor { /// Create a new instance of `Reactor` pub(crate) fn new() -> Self { Self { - inner: Rc::new(InnerReactor { - pollables: RefCell::new(Slab::new()), - wakers: RefCell::new(HashMap::new()), - ready_list: RefCell::new(VecDeque::new()), + inner: Arc::new(InnerReactor { + pollables: Mutex::new(Slab::new()), + wakers: Mutex::new(HashMap::new()), + ready_list: Mutex::new(VecDeque::new()), }), } } @@ -133,7 +134,7 @@ impl Reactor { /// Future pending on their readiness. This function returns indicating /// that set of pollables is not empty. pub(crate) fn pending_pollables_is_empty(&self) -> bool { - self.inner.wakers.borrow().is_empty() + self.inner.wakers.lock().unwrap().is_empty() } /// Block until at least one pending pollable is ready, waking a pending future. @@ -189,8 +190,8 @@ impl Reactor { where F: FnOnce(&[&Pollable]) -> Vec, { - let wakers = self.inner.wakers.borrow(); - let pollables = self.inner.pollables.borrow(); + let wakers = self.inner.wakers.lock().unwrap(); + let pollables = self.inner.pollables.lock().unwrap(); // We're about to wait for a number of pollables. When they wake we get // the *indexes* back for the pollables whose events were available - so @@ -225,18 +226,18 @@ impl Reactor { /// Turn a Wasi [`Pollable`] into an [`AsyncPollable`] pub fn schedule(&self, pollable: Pollable) -> AsyncPollable { - let mut pollables = self.inner.pollables.borrow_mut(); + let mut pollables = self.inner.pollables.lock().unwrap(); let key = EventKey(pollables.insert(pollable)); - AsyncPollable(Rc::new(Registration { key })) + AsyncPollable(Arc::new(Registration { key })) } fn deregister_event(&self, key: EventKey) { - let mut pollables = self.inner.pollables.borrow_mut(); + let mut pollables = self.inner.pollables.lock().unwrap(); pollables.remove(key.0); } fn deregister_waitee(&self, waitee: &Waitee) { - let mut wakers = self.inner.wakers.borrow_mut(); + let mut wakers = self.inner.wakers.lock().unwrap(); wakers.remove(waitee); } @@ -244,14 +245,16 @@ impl Reactor { let ready = self .inner .pollables - .borrow() + .lock() + .unwrap() .get(waitee.pollable.0.key.0) .expect("only live EventKey can be checked for readiness") .ready(); if !ready { self.inner .wakers - .borrow_mut() + .lock() + .unwrap() .insert(waitee.clone(), waker.clone()); } ready @@ -264,7 +267,7 @@ impl Reactor { T: 'static, { let this = self.clone(); - let schedule = move |runnable| this.inner.ready_list.borrow_mut().push_back(runnable); + let schedule = move |runnable| this.inner.ready_list.lock().unwrap().push_back(runnable); // SAFETY: // we're using this exactly like async_task::spawn_local, except that @@ -273,16 +276,16 @@ impl Reactor { // single-threaded. #[allow(unsafe_code)] let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) }; - self.inner.ready_list.borrow_mut().push_back(runnable); + self.inner.ready_list.lock().unwrap().push_back(runnable); task } pub(super) fn pop_ready_list(&self) -> Option { - self.inner.ready_list.borrow_mut().pop_front() + self.inner.ready_list.lock().unwrap().pop_front() } pub(super) fn ready_list_is_empty(&self) -> bool { - self.inner.ready_list.borrow().is_empty() + self.inner.ready_list.lock().unwrap().is_empty() } } diff --git a/test-programs/Cargo.toml b/test-programs/Cargo.toml index 84610da..94eaa2a 100644 --- a/test-programs/Cargo.toml +++ b/test-programs/Cargo.toml @@ -7,6 +7,8 @@ edition.workspace = true rust-version.workspace = true [dependencies] +anyhow.workspace = true futures-lite.workspace = true +http-body-util.workspace = true serde_json.workspace = true wstd.workspace = true diff --git a/test-programs/artifacts/tests/http_server.rs b/test-programs/artifacts/tests/http_server.rs index 995298b..7e60465 100644 --- a/test-programs/artifacts/tests/http_server.rs +++ b/test-programs/artifacts/tests/http_server.rs @@ -1,21 +1,31 @@ use anyhow::Result; -use std::process::Command; +use std::net::TcpStream; +use std::process::{Child, Command}; +use std::thread::sleep; +use std::time::{Duration, Instant}; + +// Wasmtime serve will run until killed. Kill it in a drop impl so the process +// isnt orphaned when the test suite ends (successfully, or unsuccessfully) +struct DontOrphan(Child); +impl Drop for DontOrphan { + fn drop(&mut self) { + let _ = self.0.kill(); + } +} #[test_log::test] fn http_server() -> Result<()> { - use std::net::TcpStream; - use std::thread::sleep; - use std::time::Duration; - // Run wasmtime serve. // Enable -Scli because we currently don't have a way to build with the // proxy adapter, so we build with the default adapter. - let mut wasmtime_process = Command::new("wasmtime") - .arg("serve") - .arg("-Scli") - .arg("--addr=127.0.0.1:8081") - .arg(test_programs_artifacts::HTTP_SERVER) - .spawn()?; + let _wasmtime_process = DontOrphan( + Command::new("wasmtime") + .arg("serve") + .arg("-Scli") + .arg("--addr=127.0.0.1:8081") + .arg(test_programs_artifacts::HTTP_SERVER) + .spawn()?, + ); // Clumsily wait for the server to accept connections. 'wait: loop { @@ -25,28 +35,63 @@ fn http_server() -> Result<()> { } } - // Do some tests! + // Test each path in the server: + // TEST / http_home + // Response body is the hard-coded default let body: String = ureq::get("http://127.0.0.1:8081").call()?.into_string()?; assert_eq!(body, "Hello, wasi:http/proxy world!\n"); - match ureq::get("http://127.0.0.1:8081/fail").call() { - Ok(body) => { - unreachable!("unexpected success from /fail: {:?}", body); - } - Err(ureq::Error::Transport(_transport)) => {} - Err(other) => { - unreachable!("unexpected error: {:?}", other); - } - } + // TEST /wait-response http_wait_response + // Sleeps for 1 second, then sends a response with body containing + // internally measured sleep time. + let start = Instant::now(); + let body: String = ureq::get("http://127.0.0.1:8081/wait-response") + .call()? + .into_string()?; + let duration = start.elapsed(); + let sleep_report = body + .split(' ') + .find_map(|s| s.parse::().ok()) + .expect("body should print 'slept for 10xx millis'"); + assert!( + sleep_report >= 1000, + "should have slept for 1000 or more millis, got {sleep_report}" + ); + assert!(duration >= Duration::from_secs(1)); - const MESSAGE: &[u8] = b"hello, echoserver!\n"; + // TEST /wait-body http_wait_body + // Sends response status and headers, then sleeps for 1 second, then sends + // body with internally measured sleep time. + // With ureq we can't tell that the response status and headers were sent + // with a delay in the body. Additionally, the implementation MAY buffer up the + // entire response and body before sending it, though wasmtime does not. + let start = Instant::now(); + let body: String = ureq::get("http://127.0.0.1:8081/wait-body") + .call()? + .into_string()?; + let duration = start.elapsed(); + let sleep_report = body + .split(' ') + .find_map(|s| s.parse::().ok()) + .expect("body should print 'slept for 10xx millis'"); + assert!( + sleep_report >= 1000, + "should have slept for 1000 or more millis, got {sleep_report}" + ); + assert!(duration >= Duration::from_secs(1)); + // TEST /echo htto_echo + // Send a request body, see that we got the same back in response body. + const MESSAGE: &[u8] = b"hello, echoserver!\n"; let body: String = ureq::get("http://127.0.0.1:8081/echo") .send(MESSAGE)? .into_string()?; assert_eq!(body.as_bytes(), MESSAGE); + // TEST /echo-headers htto_echo_headers + // Send request with headers, see that all of those headers are present in + // response headers let test_headers = [ ("Red", "Rhubarb"), ("Orange", "Carrots"), @@ -55,19 +100,57 @@ fn http_server() -> Result<()> { ("Blue", "Blueberries"), ("Purple", "Beets"), ]; - - let mut response = ureq::get("http://127.0.0.1:8081/echo-headers"); + let mut request = ureq::get("http://127.0.0.1:8081/echo-headers"); for (name, value) in test_headers { - response = response.set(name, value); + request = request.set(name, value); } - let response = response.call()?; - + let response = request.call()?; assert!(response.headers_names().len() >= test_headers.len()); for (name, value) in test_headers { assert_eq!(response.header(name), Some(value)); } - wasmtime_process.kill()?; + // NOT TESTED /echo-trailers htto_echo_trailers + // ureq doesn't support trailers + + // TEST /response-code http_response_code + // Send request with `X-Request-Code: `. Should get back that + // status. + let response = ureq::get("http://127.0.0.1:8081/response-status") + .set("X-Response-Status", "302") + .call()?; + assert_eq!(response.status(), 302); + + let response = ureq::get("http://127.0.0.1:8081/response-status") + .set("X-Response-Status", "401") + .call(); + // ureq interprets some statuses as OK, some as Err: + match response { + Err(ureq::Error::Status(401, _)) => {} + result => { + panic!("/response-code expected status 302, got: {result:?}"); + } + } + + // TEST /response-fail http_response_fail + // Wasmtime gives a 500 error when wasi-http guest gives error instead of + // response + match ureq::get("http://127.0.0.1:8081/response-fail").call() { + Err(ureq::Error::Status(500, _)) => {} + result => { + panic!("/response-fail expected status 500 error, got: {result:?}"); + } + } + + // TEST /response-body-fail http_body_fail + // Response status and headers sent off, then error in body will close + // connection + match ureq::get("http://127.0.0.1:8081/response-body-fail").call() { + Err(ureq::Error::Transport(_transport)) => {} + result => { + panic!("/response-body-fail expected transport error, got: {result:?}") + } + } Ok(()) } diff --git a/tests/http_first_byte_timeout.rs b/tests/http_first_byte_timeout.rs index f8a0ac3..4882966 100644 --- a/tests/http_first_byte_timeout.rs +++ b/tests/http_first_byte_timeout.rs @@ -1,8 +1,4 @@ -use wstd::http::{ - error::{ErrorVariant, WasiHttpErrorCode}, - Client, Request, -}; -use wstd::io::empty; +use wstd::http::{error::ErrorCode, Body, Client, Request}; #[wstd::main] async fn main() -> Result<(), Box> { @@ -11,15 +7,15 @@ async fn main() -> Result<(), Box> { client.set_first_byte_timeout(std::time::Duration::from_millis(500)); // This get request will connect to the server, which will then wait 1 second before // returning a response. - let request = Request::get("https://postman-echo.com/delay/1").body(empty())?; + let request = Request::get("https://postman-echo.com/delay/1").body(Body::empty())?; let result = client.send(request).await; assert!(result.is_err(), "response should be an error"); let error = result.unwrap_err(); assert!( matches!( - error.variant(), - ErrorVariant::WasiHttp(WasiHttpErrorCode::ConnectionReadTimeout) + error.downcast_ref::(), + Some(ErrorCode::ConnectionReadTimeout) ), "expected ConnectionReadTimeout error, got: {error:?>}" ); diff --git a/tests/http_get.rs b/tests/http_get.rs index 9100237..4f4bc33 100644 --- a/tests/http_get.rs +++ b/tests/http_get.rs @@ -1,41 +1,39 @@ use std::error::Error; use wstd::http::{Body, Client, HeaderValue, Request}; -use wstd::io::{empty, AsyncRead}; #[wstd::test] async fn main() -> Result<(), Box> { let request = Request::get("https://postman-echo.com/get") .header("my-header", HeaderValue::from_str("my-value")?) - .body(empty())?; + .body(Body::empty())?; - let mut response = Client::new().send(request).await?; + let response = Client::new().send(request).await?; let content_type = response .headers() .get("Content-Type") - .ok_or_else(|| "response expected to have Content-Type header")?; + .ok_or("response expected to have Content-Type header")?; assert_eq!(content_type, "application/json; charset=utf-8"); - let body = response.body_mut(); + let mut body = response.into_body().into_body(); let body_len = body - .len() - .ok_or_else(|| "GET postman-echo.com/get is supposed to provide a content-length")?; + .content_length() + .ok_or("GET postman-echo.com/get is supposed to provide a content-length")?; - let mut body_buf = Vec::new(); - body.read_to_end(&mut body_buf).await?; + let contents = body.contents().await?; assert_eq!( - body_buf.len(), + contents.len() as u64, body_len, - "read_to_end length should match content-length" + "contents length should match content-length" ); - let val: serde_json::Value = serde_json::from_slice(&body_buf)?; + let val: serde_json::Value = serde_json::from_slice(contents)?; let body_url = val .get("url") - .ok_or_else(|| "body json has url")? + .ok_or("body json has url")? .as_str() - .ok_or_else(|| "body json url is str")?; + .ok_or("body json url is str")?; assert!( body_url.contains("postman-echo.com/get"), "expected body url to contain the authority and path, got: {body_url}" @@ -43,11 +41,11 @@ async fn main() -> Result<(), Box> { assert_eq!( val.get("headers") - .ok_or_else(|| "body json has headers")? + .ok_or("body json has headers")? .get("my-header") - .ok_or_else(|| "headers contains my-header")? + .ok_or("headers contains my-header")? .as_str() - .ok_or_else(|| "my-header is a str")?, + .ok_or("my-header is a str")?, "my-value" ); diff --git a/tests/http_get_json.rs b/tests/http_get_json.rs index de409d3..15548db 100644 --- a/tests/http_get_json.rs +++ b/tests/http_get_json.rs @@ -1,7 +1,6 @@ use serde::Deserialize; use std::error::Error; -use wstd::http::{Client, Request}; -use wstd::io::empty; +use wstd::http::{Body, Client, Request}; #[derive(Deserialize)] struct Echo { @@ -10,17 +9,17 @@ struct Echo { #[wstd::test] async fn main() -> Result<(), Box> { - let request = Request::get("https://postman-echo.com/get").body(empty())?; + let request = Request::get("https://postman-echo.com/get").body(Body::empty())?; - let mut response = Client::new().send(request).await?; + let response = Client::new().send(request).await?; let content_type = response .headers() .get("Content-Type") - .ok_or_else(|| "response expected to have Content-Type header")?; + .ok_or("response expected to have Content-Type header")?; assert_eq!(content_type, "application/json; charset=utf-8"); - let Echo { url } = response.body_mut().json::().await?; + let Echo { url } = response.into_body().into_body().json::().await?; assert!( url.contains("postman-echo.com/get"), "expected body url to contain the authority and path, got: {url}" diff --git a/tests/http_post.rs b/tests/http_post.rs index c2257a4..3926e60 100644 --- a/tests/http_post.rs +++ b/tests/http_post.rs @@ -1,6 +1,5 @@ use std::error::Error; -use wstd::http::{Client, HeaderValue, IntoBody, Request}; -use wstd::io::AsyncRead; +use wstd::http::{Body, Client, HeaderValue, Request}; #[wstd::test] async fn main() -> Result<(), Box> { @@ -9,25 +8,25 @@ async fn main() -> Result<(), Box> { "content-type", HeaderValue::from_str("application/json; charset=utf-8")?, ) - .body("{\"test\": \"data\"}".into_body())?; + .body(Body::from_string("{\"test\": \"data\"}"))?; - let mut response = Client::new().send(request).await?; + let response = Client::new().send(request).await?; let content_type = response .headers() .get("Content-Type") - .ok_or_else(|| "response expected to have Content-Type header")?; + .ok_or("response expected to have Content-Type header")?; assert_eq!(content_type, "application/json; charset=utf-8"); - let mut body_buf = Vec::new(); - response.body_mut().read_to_end(&mut body_buf).await?; + let mut body = response.into_body().into_body(); + let body_buf = body.contents().await?; - let val: serde_json::Value = serde_json::from_slice(&body_buf)?; + let val: serde_json::Value = serde_json::from_slice(body_buf)?; let body_url = val .get("url") - .ok_or_else(|| "body json has url")? + .ok_or("body json has url")? .as_str() - .ok_or_else(|| "body json url is str")?; + .ok_or("body json url is str")?; assert!( body_url.contains("postman-echo.com/post"), "expected body url to contain the authority and path, got: {body_url}" @@ -35,7 +34,7 @@ async fn main() -> Result<(), Box> { let posted_json = val .get("json") - .ok_or_else(|| "body json has 'json' key")? + .ok_or("body json has 'json' key")? .as_object() .ok_or_else(|| format!("body json 'json' is object. got {val:?}"))?; @@ -43,9 +42,9 @@ async fn main() -> Result<(), Box> { assert_eq!( posted_json .get("test") - .ok_or_else(|| "returned json has 'test' key")? + .ok_or("returned json has 'test' key")? .as_str() - .ok_or_else(|| "returned json 'test' key should be str value")?, + .ok_or("returned json 'test' key should be str value")?, "data" ); diff --git a/tests/http_post_json.rs b/tests/http_post_json.rs index 5ccb0cc..6a3e5ae 100644 --- a/tests/http_post_json.rs +++ b/tests/http_post_json.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use std::error::Error; -use wstd::http::{request::JsonRequest, Client, Request}; +use wstd::http::{Body, Client, HeaderValue, Request}; #[derive(Serialize)] struct TestData { @@ -17,23 +17,23 @@ async fn main() -> Result<(), Box> { let test_data = TestData { test: "data".to_string(), }; - let request = Request::post("https://postman-echo.com/post").json(&test_data)?; + let mut request = + Request::post("https://postman-echo.com/post").body(Body::from_json(&test_data)?)?; - let content_type = request - .headers() - .get("Content-Type") - .ok_or_else(|| "request expected to have Content-Type header")?; - assert_eq!(content_type, "application/json; charset=utf-8"); + request.headers_mut().insert( + "Content-Type", + HeaderValue::from_static("application/json; charset=utf-8"), + ); - let mut response = Client::new().send(request).await?; + let response = Client::new().send(request).await?; let content_type = response .headers() .get("Content-Type") - .ok_or_else(|| "response expected to have Content-Type header")?; + .ok_or("response expected to have Content-Type header")?; assert_eq!(content_type, "application/json; charset=utf-8"); - let Echo { url } = response.body_mut().json::().await?; + let Echo { url } = response.into_body().into_body().json::().await?; assert!( url.contains("postman-echo.com/post"), "expected body url to contain the authority and path, got: {url}" diff --git a/tests/http_timeout.rs b/tests/http_timeout.rs index dea1ac9..96d40de 100644 --- a/tests/http_timeout.rs +++ b/tests/http_timeout.rs @@ -1,13 +1,12 @@ use wstd::future::FutureExt; -use wstd::http::{Client, Request}; -use wstd::io::empty; +use wstd::http::{Body, Client, Request}; use wstd::time::Duration; #[wstd::test] async fn http_timeout() -> Result<(), Box> { // This get request will connect to the server, which will then wait 1 second before // returning a response. - let request = Request::get("https://postman-echo.com/delay/1").body(empty())?; + let request = Request::get("https://postman-echo.com/delay/1").body(Body::empty())?; let result = Client::new() .send(request) .timeout(Duration::from_millis(500))