Skip to content

Commit da75dda

Browse files
committed
wip
1 parent 8437866 commit da75dda

File tree

2 files changed

+110
-25
lines changed

2 files changed

+110
-25
lines changed

src/http/body.rs

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,21 @@ pub mod util {
2424
pub use http_body_util::*;
2525
}
2626

27+
/// A HTTP Body.
28+
///
29+
/// Construct this HTTP body using:
30+
/// * `Body::empty` for the empty body, or `impl From<()> for Body`
31+
/// * `From<&[u8]>` (which will make a clone) or `From<Vec<u8>>` or
32+
/// `From<Bytes>` for a `Body` from bytes.
33+
/// * `From<&str>` (which will make a clone) or `From<String>` for a `Body`
34+
/// from strings.
35+
/// * `Body::from_json` for a `Body` from a `Serialize` (requires feature
36+
/// `json`)
37+
/// * `Body::from_stream` or `Body::from_try_stream` for a `Body` from a
38+
/// `Stream` of `Into<Bytes>`
39+
/// * `From<AsyncInputStream>`
40+
///
41+
/// Consume
2742
#[derive(Debug)]
2843
pub struct Body(BodyInner);
2944

@@ -33,6 +48,8 @@ enum BodyInner {
3348
Boxed(UnsyncBoxBody<Bytes, Error>),
3449
// a body created from a wasi-http incoming-body (WasiIncomingBody)
3550
Incoming(Incoming),
51+
// a body created from an AsyncRead which has a AsyncInputStream
52+
InputStream(BoxedInputStream),
3653
// a body in memory
3754
Complete {
3855
data: Bytes,
@@ -41,9 +58,10 @@ enum BodyInner {
4158
}
4259

4360
impl Body {
44-
pub async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> {
61+
pub(crate) async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> {
4562
match self.0 {
4663
BodyInner::Incoming(incoming) => incoming.send(outgoing_body).await,
64+
BodyInner::InputStream(input_stream) => input_stream.send(outgoing_body).await,
4765
BodyInner::Boxed(box_body) => {
4866
let mut out_stream = AsyncOutputStream::new(
4967
outgoing_body
@@ -104,17 +122,7 @@ impl Body {
104122
.with_trailers(async move { Ok(trailers).transpose() })
105123
.boxed_unsync(),
106124
BodyInner::Boxed(b) => b,
107-
}
108-
}
109-
110-
pub fn as_boxed_body(&mut self) -> &mut UnsyncBoxBody<Bytes, Error> {
111-
let mut prev = Self::empty();
112-
std::mem::swap(self, &mut prev);
113-
self.0 = BodyInner::Boxed(prev.into_boxed_body());
114-
115-
match &mut self.0 {
116-
BodyInner::Boxed(ref mut b) => b,
117-
_ => unreachable!(),
125+
BodyInner::InputStream(b) => b.boxed_unsync(),
118126
}
119127
}
120128

@@ -129,6 +137,7 @@ impl Body {
129137
std::mem::swap(inner, &mut prev);
130138
let boxed_body = match prev {
131139
BodyInner::Incoming(i) => i.into_http_body().boxed_unsync(),
140+
BodyInner::InputStream(i) => i.boxed_unsync(),
132141
BodyInner::Boxed(b) => b,
133142
BodyInner::Complete { .. } => unreachable!(),
134143
};
@@ -151,9 +160,11 @@ impl Body {
151160
BodyInner::Boxed(b) => b.size_hint().exact(),
152161
BodyInner::Complete { data, .. } => Some(data.len() as u64),
153162
BodyInner::Incoming(i) => i.size_hint.content_length(),
163+
BodyInner::InputStream(_) => None,
154164
}
155165
}
156166

167+
/// Construct an empty Body
157168
pub fn empty() -> Self {
158169
Body(BodyInner::Complete {
159170
data: Bytes::new(),
@@ -177,20 +188,22 @@ impl Body {
177188
serde_json::from_str(str).context("decoding body contents as json")
178189
}
179190

180-
pub fn from_input_stream(r: crate::io::AsyncInputStream) -> Self {
181-
use futures_lite::stream::StreamExt;
182-
Body(BodyInner::Boxed(http_body_util::BodyExt::boxed_unsync(
183-
http_body_util::StreamBody::new(r.into_stream().map(|res| {
184-
res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec)))
185-
.map_err(Into::into)
186-
})),
187-
)))
188-
}
189-
190191
pub(crate) fn from_incoming(body: WasiIncomingBody, size_hint: BodyHint) -> Self {
191192
Body(BodyInner::Incoming(Incoming { body, size_hint }))
192193
}
193194

195+
pub fn from_async_read<R>(read: R) -> Self
196+
where
197+
R: crate::io::AsyncRead + Send + 'static,
198+
{
199+
use futures_lite::StreamExt;
200+
if read.as_async_input_stream().is_some() {
201+
Body(BodyInner::InputStream(BoxedInputStream::new(read)))
202+
} else {
203+
Self::from_try_stream(read.map(|res| res.map_err(Into::into)))
204+
}
205+
}
206+
194207
pub fn from_stream<S>(stream: S) -> Self
195208
where
196209
S: futures_lite::Stream + Send + 'static,
@@ -202,10 +215,10 @@ impl Body {
202215
))
203216
}
204217

205-
pub fn from_try_stream<S, B>(stream: S) -> Self
218+
pub fn from_try_stream<S, D>(stream: S) -> Self
206219
where
207-
S: futures_lite::Stream<Item = Result<B, Error>> + Send + 'static,
208-
B: Into<Bytes>,
220+
S: futures_lite::Stream<Item = Result<D, Error>> + Send + 'static,
221+
D: Into<Bytes>,
209222
{
210223
use futures_lite::StreamExt;
211224
Self::from_http_body(http_body_util::StreamBody::new(
@@ -263,6 +276,21 @@ impl From<String> for Body {
263276
}
264277
}
265278

279+
impl From<crate::io::AsyncInputStream> for Body {
280+
fn from(r: crate::io::AsyncInputStream) -> Body {
281+
// TODO: with another BodyInner variant for a boxed AsyncRead for which
282+
// as_input_stream is_some, this could allow for use of
283+
// crate::io::copy.
284+
use futures_lite::stream::StreamExt;
285+
Body(BodyInner::Boxed(http_body_util::BodyExt::boxed_unsync(
286+
http_body_util::StreamBody::new(r.into_stream().map(|res| {
287+
res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec)))
288+
.map_err(Into::into)
289+
})),
290+
)))
291+
}
292+
}
293+
266294
#[derive(Debug)]
267295
struct Incoming {
268296
body: WasiIncomingBody,
@@ -549,3 +577,48 @@ impl TrailersState {
549577
}
550578
}
551579
}
580+
581+
struct BoxedInputStream(Box<dyn crate::io::AsyncRead + Send>);
582+
impl fmt::Debug for BoxedInputStream {
583+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
584+
write!(f, "BoxedInputStream")
585+
}
586+
}
587+
588+
impl BoxedInputStream {
589+
fn new(r: impl crate::io::AsyncRead + Send + 'static) -> Self {
590+
if r.as_async_input_stream().is_none() {
591+
panic!()
592+
}
593+
Self(Box::new(r))
594+
}
595+
async fn send(self, outgoing_body: WasiOutgoingBody) -> Result<(), Error> {
596+
let mut in_stream = self.0.as_async_input_stream().unwrap();
597+
let mut out_stream = AsyncOutputStream::new(
598+
outgoing_body
599+
.write()
600+
.expect("outgoing body already written"),
601+
);
602+
crate::io::copy(&mut in_stream, &mut out_stream)
603+
.await
604+
.map_err(|e| {
605+
Error::from(e).context("copying incoming body stream to outgoing body stream")
606+
})?;
607+
drop(in_stream);
608+
drop(out_stream);
609+
WasiOutgoingBody::finish(outgoing_body, None)
610+
.map_err(|e| Error::from(e).context("finishing outgoing body"))?;
611+
Ok(())
612+
}
613+
}
614+
615+
impl HttpBody for BoxedInputStream {
616+
type Data = Bytes;
617+
type Error = Error;
618+
fn poll_frame(
619+
self: Pin<&mut Self>,
620+
_cx: &mut Context<'_>,
621+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
622+
todo!()
623+
}
624+
}

src/io/streams.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@ impl AsyncRead for AsyncInputStream {
116116
}
117117
}
118118

119+
#[async_trait::async_trait(?Send)]
120+
impl AsyncRead for &AsyncInputStream {
121+
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
122+
Self::read(self, buf).await
123+
}
124+
125+
#[inline]
126+
fn as_async_input_stream(&self) -> Option<&AsyncInputStream> {
127+
Some(self)
128+
}
129+
}
130+
119131
/// Wrapper of `AsyncInputStream` that impls `futures_lite::stream::Stream`
120132
/// with an item of `Result<Vec<u8>, std::io::Error>`
121133
pub struct AsyncInputChunkStream {

0 commit comments

Comments
 (0)