Skip to content

Commit c6e9750

Browse files
committed
asyncread for body stream
1 parent 824506d commit c6e9750

File tree

1 file changed

+40
-29
lines changed

1 file changed

+40
-29
lines changed

src/http/response.rs

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use wasi::http::types::{IncomingBody as WasiIncomingBody, IncomingResponse};
22
use wasi::io::streams::{InputStream, StreamError};
33

4-
use super::{Body, Fields, Headers, StatusCode};
4+
use super::{Body, Headers, StatusCode};
55
use crate::io::AsyncRead;
6-
use crate::iter::AsyncIterator;
76
use crate::runtime::Reactor;
87

98
/// Stream 2kb chunks at a time
@@ -63,7 +62,8 @@ impl Response<IncomingBody> {
6362
.expect("cannot call `stream` twice on an incoming body");
6463

6564
let body = IncomingBody {
66-
bytes_read: 0,
65+
buf_offset: 0,
66+
buf: None,
6767
reactor,
6868
body_stream,
6969
_incoming_body: incoming_body,
@@ -100,8 +100,10 @@ impl<B: Body> Response<B> {
100100

101101
#[derive(Debug)]
102102
pub struct IncomingBody {
103-
bytes_read: u64,
104103
reactor: Reactor,
104+
buf: Option<Vec<u8>>,
105+
// How many bytes have we already read from the buf?
106+
buf_offset: usize,
105107

106108
// IMPORTANT: the order of these fields here matters. `incoming_body` must
107109
// be dropped before `body_stream`.
@@ -110,30 +112,39 @@ pub struct IncomingBody {
110112
}
111113

112114
impl AsyncRead for IncomingBody {
113-
async fn read(&mut self, buf: &mut [u8]) -> crate::io::Result<usize> {
114-
// // Wait for an event to be ready
115-
// let pollable = self.body_stream.subscribe();
116-
// self.reactor.wait_for(pollable).await;
117-
118-
// // Read the bytes from the body stream
119-
// let buf = self.body_stream.read(CHUNK_SIZE);
120-
// // self.bytes_read += len;
121-
// Some(buf)
122-
todo!();
123-
}
124-
}
125-
126-
impl AsyncIterator for IncomingBody {
127-
type Item = Result<Vec<u8>, StreamError>;
128-
129-
async fn next(&mut self) -> Option<Self::Item> {
130-
// Wait for an event to be ready
131-
let pollable = self.body_stream.subscribe();
132-
self.reactor.wait_for(pollable).await;
133-
134-
// Read the bytes from the body stream
135-
let buf = self.body_stream.read(CHUNK_SIZE);
136-
// self.bytes_read += len;
137-
Some(buf)
115+
async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result<usize> {
116+
loop {
117+
let buf = match &mut self.buf {
118+
Some(ref mut buf) => buf,
119+
None => {
120+
// Wait for an event to be ready
121+
let pollable = self.body_stream.subscribe();
122+
self.reactor.wait_for(pollable).await;
123+
124+
// Read the bytes from the body stream
125+
let buf = self.body_stream.read(CHUNK_SIZE).map_err(|err| match err {
126+
StreamError::LastOperationFailed(err) => {
127+
std::io::Error::other(format!("{}", err.to_debug_string()))
128+
}
129+
StreamError::Closed => std::io::Error::other("Connection closed"),
130+
})?;
131+
self.buf.insert(buf)
132+
}
133+
};
134+
135+
// copy bytes
136+
let max = (buf.len() - self.buf_offset).min(out_buf.len());
137+
let slice = &buf[self.buf_offset..max];
138+
out_buf[0..max].copy_from_slice(slice);
139+
self.buf_offset += max;
140+
141+
// reset the local slice if necessary
142+
if self.buf_offset == buf.len() {
143+
self.buf = None;
144+
self.buf_offset = 0;
145+
}
146+
147+
break Ok(dbg!(max));
148+
}
138149
}
139150
}

0 commit comments

Comments
 (0)