Skip to content

Commit 658c4ee

Browse files
committed
switch up constructors
1 parent 3503f5b commit 658c4ee

File tree

5 files changed

+113
-60
lines changed

5 files changed

+113
-60
lines changed

examples/complex_http_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ async fn main() -> Result<()> {
9898
Some(Ok(t))
9999
}
100100
});
101-
let request = request.body(body)?;
101+
let request = request.body(Body::from_http_body(body))?;
102102

103103
// Send the request.
104104
eprintln!("> {} / {:?}", request.method(), request.version());

examples/http_server.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,16 @@ async fn http_wait_body(_request: Request<Body>) -> Result<Response<Body>> {
4949
// Get the time now
5050
let now = Instant::now();
5151

52-
let body = StreamBody::new(once_future(async move {
52+
let body = async move {
5353
// Sleep for one second.
5454
wstd::task::sleep(Duration::from_secs(1)).await;
5555

5656
// Compute how long we slept for.
5757
let elapsed = Instant::now().duration_since(now).as_millis();
58-
anyhow::Ok(Frame::data(Bytes::from(format!(
59-
"slept for {elapsed} millis\n"
60-
))))
61-
}));
58+
Ok(Bytes::from(format!("slept for {elapsed} millis\n")))
59+
};
6260

63-
Ok(Response::new(body.into()))
61+
Ok(Response::new(Body::from_try_stream(once_future(body))))
6462
}
6563

6664
async fn http_echo(request: Request<Body>) -> Result<Response<Body>> {
@@ -85,7 +83,7 @@ async fn http_echo_trailers(request: Request<Body>) -> Result<Response<Body>> {
8583
let body = StreamBody::new(once_future(async move {
8684
anyhow::Ok(Frame::<Bytes>::trailers(trailers))
8785
}));
88-
Ok(Response::new(body.into()))
86+
Ok(Response::new(Body::from_http_body(body)))
8987
}
9088

9189
async fn http_response_status(request: Request<Body>) -> Result<Response<Body>> {
@@ -112,7 +110,7 @@ async fn http_body_fail(_request: Request<Body>) -> Result<Response<Body>> {
112110
Err::<Frame<Bytes>, _>(anyhow::anyhow!("error creating body"))
113111
}));
114112

115-
Ok(Response::new(body.into()))
113+
Ok(Response::new(Body::from_http_body(body)))
116114
}
117115

118116
async fn http_not_found(_request: Request<Body>) -> Result<Response<Body>> {

src/http/body.rs

Lines changed: 91 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,20 @@ pub mod util {
2424
pub use http_body_util::*;
2525
}
2626

27+
/// A HTTP Body.
28+
///
29+
/// Construct this HTTP body using:
30+
/// * `Body::empty` for the empty body, or `impl From<()> for Body`
31+
/// * `From<&[u8]>` (which will make a clone) or `From<Vec<u8>>` or
32+
/// `From<Bytes>` for a `Body` from bytes.
33+
/// * `From<&str>` (which will make a clone) or `From<String>` for a `Body`
34+
/// from strings.
35+
/// * `Body::from_json` for a `Body` from a `Serialize` (requires feature
36+
/// `json`)
37+
/// * `Body::from_stream` or `Body::from_try_stream` for a `Body` from a
38+
/// `Stream` of `Into<Bytes>`
39+
///
40+
/// Consume
2741
#[derive(Debug)]
2842
pub struct Body(BodyInner);
2943

@@ -41,7 +55,7 @@ enum BodyInner {
4155
}
4256

4357
impl Body {
44-
pub async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> {
58+
pub(crate) async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> {
4559
match self.0 {
4660
BodyInner::Incoming(incoming) => incoming.send(outgoing_body).await,
4761
BodyInner::Boxed(box_body) => {
@@ -107,17 +121,6 @@ impl Body {
107121
}
108122
}
109123

110-
pub fn as_boxed_body(&mut self) -> &mut UnsyncBoxBody<Bytes, Error> {
111-
let mut prev = Self::empty();
112-
std::mem::swap(self, &mut prev);
113-
self.0 = BodyInner::Boxed(prev.into_boxed_body());
114-
115-
match &mut self.0 {
116-
BodyInner::Boxed(ref mut b) => b,
117-
_ => unreachable!(),
118-
}
119-
}
120-
121124
pub async fn contents(&mut self) -> Result<&[u8], Error> {
122125
match &mut self.0 {
123126
BodyInner::Complete { ref data, .. } => Ok(data.as_ref()),
@@ -154,34 +157,22 @@ impl Body {
154157
}
155158
}
156159

160+
/// Construct an empty Body
157161
pub fn empty() -> Self {
158162
Body(BodyInner::Complete {
159163
data: Bytes::new(),
160164
trailers: None,
161165
})
162166
}
163167

164-
pub fn from_string(s: impl Into<String>) -> Self {
165-
let s = s.into();
166-
Body(BodyInner::Complete {
167-
data: Bytes::from_owner(s.into_bytes()),
168-
trailers: None,
169-
})
170-
}
171-
172168
pub async fn str_contents(&mut self) -> Result<&str, Error> {
173169
let bs = self.contents().await?;
174170
std::str::from_utf8(bs).context("decoding body contents as string")
175171
}
176172

177-
pub fn from_bytes(b: impl Into<Bytes>) -> Self {
178-
let b = b.into();
179-
Body::from(http_body_util::Full::new(b))
180-
}
181-
182173
#[cfg(feature = "json")]
183174
pub fn from_json<T: serde::Serialize>(data: &T) -> Result<Self, serde_json::Error> {
184-
Ok(Self::from_string(serde_json::to_string(data)?))
175+
Ok(Self::from(serde_json::to_vec(data)?))
185176
}
186177

187178
#[cfg(feature = "json")]
@@ -190,28 +181,38 @@ impl Body {
190181
serde_json::from_str(str).context("decoding body contents as json")
191182
}
192183

193-
pub fn from_input_stream(r: crate::io::AsyncInputStream) -> Self {
194-
use futures_lite::stream::StreamExt;
195-
Body(BodyInner::Boxed(http_body_util::BodyExt::boxed_unsync(
196-
http_body_util::StreamBody::new(r.into_stream().map(|res| {
197-
res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec)))
198-
.map_err(Into::into)
199-
})),
200-
)))
201-
}
202-
203184
pub(crate) fn from_incoming(body: WasiIncomingBody, size_hint: BodyHint) -> Self {
204185
Body(BodyInner::Incoming(Incoming { body, size_hint }))
205186
}
206-
}
207187

208-
impl<B> From<B> for Body
209-
where
210-
B: HttpBody + Send + 'static,
211-
<B as HttpBody>::Data: Into<Bytes>,
212-
<B as HttpBody>::Error: Into<Error>,
213-
{
214-
fn from(http_body: B) -> Body {
188+
pub fn from_stream<S>(stream: S) -> Self
189+
where
190+
S: futures_lite::Stream + Send + 'static,
191+
<S as futures_lite::Stream>::Item: Into<Bytes>,
192+
{
193+
use futures_lite::StreamExt;
194+
Self::from_http_body(http_body_util::StreamBody::new(
195+
stream.map(|bs| Ok::<_, Error>(Frame::data(bs.into()))),
196+
))
197+
}
198+
199+
pub fn from_try_stream<S, D>(stream: S) -> Self
200+
where
201+
S: futures_lite::Stream<Item = Result<D, Error>> + Send + 'static,
202+
D: Into<Bytes>,
203+
{
204+
use futures_lite::StreamExt;
205+
Self::from_http_body(http_body_util::StreamBody::new(
206+
stream.map(|bs| Ok::<_, Error>(Frame::data(bs?.into()))),
207+
))
208+
}
209+
210+
pub fn from_http_body<B>(http_body: B) -> Self
211+
where
212+
B: HttpBody + Send + 'static,
213+
<B as HttpBody>::Data: Into<Bytes>,
214+
<B as HttpBody>::Error: Into<Error>,
215+
{
215216
use util::BodyExt;
216217
Body(BodyInner::Boxed(
217218
http_body
@@ -222,9 +223,52 @@ where
222223
}
223224
}
224225

225-
impl From<Incoming> for Body {
226-
fn from(incoming: Incoming) -> Body {
227-
Body(BodyInner::Incoming(incoming))
226+
impl From<()> for Body {
227+
fn from(_: ()) -> Body {
228+
Body::empty()
229+
}
230+
}
231+
impl From<&[u8]> for Body {
232+
fn from(bytes: &[u8]) -> Body {
233+
Body::from(bytes.to_owned())
234+
}
235+
}
236+
impl From<Vec<u8>> for Body {
237+
fn from(bytes: Vec<u8>) -> Body {
238+
Body::from(Bytes::from(bytes))
239+
}
240+
}
241+
impl From<Bytes> for Body {
242+
fn from(data: Bytes) -> Body {
243+
Body(BodyInner::Complete {
244+
data,
245+
trailers: None,
246+
})
247+
}
248+
}
249+
impl From<&str> for Body {
250+
fn from(data: &str) -> Body {
251+
Body::from(data.as_bytes())
252+
}
253+
}
254+
impl From<String> for Body {
255+
fn from(data: String) -> Body {
256+
Body::from(data.into_bytes())
257+
}
258+
}
259+
260+
impl From<crate::io::AsyncInputStream> for Body {
261+
fn from(r: crate::io::AsyncInputStream) -> Body {
262+
// TODO: with another BodyInner variant for a boxed AsyncRead for which
263+
// as_input_stream is_some, this could allow for use of
264+
// crate::io::copy.
265+
use futures_lite::stream::StreamExt;
266+
Body(BodyInner::Boxed(http_body_util::BodyExt::boxed_unsync(
267+
http_body_util::StreamBody::new(r.into_stream().map(|res| {
268+
res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec)))
269+
.map_err(Into::into)
270+
})),
271+
)))
228272
}
229273
}
230274

src/io/streams.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@ impl AsyncRead for AsyncInputStream {
116116
}
117117
}
118118

119+
#[async_trait::async_trait(?Send)]
120+
impl AsyncRead for &AsyncInputStream {
121+
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
122+
Self::read(self, buf).await
123+
}
124+
125+
#[inline]
126+
fn as_async_input_stream(&self) -> Option<&AsyncInputStream> {
127+
Some(self)
128+
}
129+
}
130+
119131
/// Wrapper of `AsyncInputStream` that impls `futures_lite::stream::Stream`
120132
/// with an item of `Result<Vec<u8>, std::io::Error>`
121133
pub struct AsyncInputChunkStream {

tests/http_post.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::error::Error;
2-
use wstd::http::{Body, Client, HeaderValue, Request};
2+
use wstd::http::{Client, HeaderValue, Request};
33

44
#[wstd::test]
55
async fn main() -> Result<(), Box<dyn Error>> {
@@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
88
"content-type",
99
HeaderValue::from_str("application/json; charset=utf-8")?,
1010
)
11-
.body(Body::from_string("{\"test\": \"data\"}"))?;
11+
.body("{\"test\": \"data\"}")?;
1212

1313
let response = Client::new().send(request).await?;
1414

@@ -19,9 +19,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
1919
assert_eq!(content_type, "application/json; charset=utf-8");
2020

2121
let mut body = response.into_body();
22-
let body_buf = body.contents().await?;
22+
let val: serde_json::Value = body.json().await?;
2323

24-
let val: serde_json::Value = serde_json::from_slice(body_buf)?;
2524
let body_url = val
2625
.get("url")
2726
.ok_or("body json has url")?

0 commit comments

Comments
 (0)