Skip to content

Commit b4b3469

Browse files
committed
http: convert IncomingBody to use AsyncInputStream
this eliminates the buffering behavior that IncomingBody previously had. Will have to define a BufReader for AsyncRead at some point so users can opt back into that if they want it.
1 parent b5ad808 commit b4b3469

File tree

3 files changed

+9
-50
lines changed

3 files changed

+9
-50
lines changed

src/http/response.rs

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
use wasi::http::types::{IncomingBody as WasiIncomingBody, IncomingResponse};
2-
use wasi::io::streams::{InputStream, StreamError};
32

43
use super::{fields::header_map_from_wasi, Body, Error, HeaderMap, Result, StatusCode};
5-
use crate::io::AsyncRead;
6-
use crate::runtime::Reactor;
7-
8-
/// Stream 2kb chunks at a time
9-
const CHUNK_SIZE: u64 = 2048;
4+
use crate::io::{AsyncInputStream, AsyncRead};
105

116
/// An HTTP response
127
#[derive(Debug)]
@@ -57,9 +52,7 @@ impl Response<IncomingBody> {
5752

5853
let body = IncomingBody {
5954
kind,
60-
buf_offset: 0,
61-
buf: None,
62-
body_stream,
55+
body_stream: AsyncInputStream::new(body_stream),
6356
_incoming_body: incoming_body,
6457
};
6558

@@ -96,54 +89,15 @@ impl<B: Body> Response<B> {
9689
#[derive(Debug)]
9790
pub struct IncomingBody {
9891
kind: BodyKind,
99-
buf: Option<Vec<u8>>,
100-
// How many bytes have we already read from the buf?
101-
buf_offset: usize,
102-
10392
// IMPORTANT: the order of these fields here matters. `body_stream` must
10493
// be dropped before `_incoming_body`.
105-
body_stream: InputStream,
94+
body_stream: AsyncInputStream,
10695
_incoming_body: WasiIncomingBody,
10796
}
10897

10998
impl AsyncRead for IncomingBody {
11099
async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result<usize> {
111-
let buf = match &mut self.buf {
112-
Some(ref mut buf) => buf,
113-
None => {
114-
// Wait for an event to be ready
115-
let pollable = self.body_stream.subscribe();
116-
Reactor::current().wait_for(pollable).await;
117-
118-
// Read the bytes from the body stream
119-
let buf = match self.body_stream.read(CHUNK_SIZE) {
120-
Ok(buf) => buf,
121-
Err(StreamError::Closed) => return Ok(0),
122-
Err(StreamError::LastOperationFailed(err)) => {
123-
return Err(std::io::Error::other(format!(
124-
"last operation failed: {}",
125-
err.to_debug_string()
126-
)))
127-
}
128-
};
129-
self.buf.insert(buf)
130-
}
131-
};
132-
133-
// copy bytes
134-
let len = (buf.len() - self.buf_offset).min(out_buf.len());
135-
let max = self.buf_offset + len;
136-
let slice = &buf[self.buf_offset..max];
137-
out_buf[0..len].copy_from_slice(slice);
138-
self.buf_offset += len;
139-
140-
// reset the local slice if necessary
141-
if self.buf_offset == buf.len() {
142-
self.buf = None;
143-
self.buf_offset = 0;
144-
}
145-
146-
Ok(len)
100+
self.body_stream.read(out_buf).await
147101
}
148102
}
149103

src/io/stdio.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use wasi::cli::terminal_input::TerminalInput;
44
use wasi::cli::terminal_output::TerminalOutput;
55

66
/// Use the program's stdin as an `AsyncInputStream`.
7+
#[derive(Debug)]
78
pub struct Stdin {
89
stream: AsyncInputStream,
910
terminput: LazyCell<Option<TerminalInput>>,
@@ -38,6 +39,7 @@ impl Stdin {
3839
}
3940

4041
/// Use the program's stdout as an `AsyncOutputStream`.
42+
#[derive(Debug)]
4143
pub struct Stdout {
4244
stream: AsyncOutputStream,
4345
termoutput: LazyCell<Option<TerminalOutput>>,
@@ -72,6 +74,7 @@ impl std::ops::DerefMut for Stdout {
7274
}
7375

7476
/// Use the program's stdout as an `AsyncOutputStream`.
77+
#[derive(Debug)]
7578
pub struct Stderr {
7679
stream: AsyncOutputStream,
7780
termoutput: LazyCell<Option<TerminalOutput>>,

src/io/streams.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::cell::RefCell;
33
use std::io::Result;
44
use wasi::io::streams::{InputStream, OutputStream, StreamError};
55

6+
#[derive(Debug)]
67
pub struct AsyncInputStream {
78
// Lazily initialized pollable, used for lifetime of stream to check readiness.
89
// Field ordering matters: this child must be dropped before stream
@@ -51,6 +52,7 @@ impl AsyncRead for AsyncInputStream {
5152
}
5253
}
5354

55+
#[derive(Debug)]
5456
pub struct AsyncOutputStream {
5557
// Lazily initialized pollable, used for lifetime of stream to check readiness.
5658
// Field ordering matters: this child must be dropped before stream

0 commit comments

Comments
 (0)