Skip to content

Commit 922dfcf

Browse files
authored
Merge pull request #37 from yoshuawuyts/pch/iostreams
Introduce AsyncInputStream, AsyncOutputStream, and stdio
2 parents 3053229 + 9e460ee commit 922dfcf

File tree

9 files changed

+318
-138
lines changed

9 files changed

+318
-138
lines changed

src/http/client.rs

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{response::IncomingBody, Body, Error, Request, Response, Result};
2-
use crate::io::{self, AsyncWrite};
2+
use crate::io::{self, AsyncOutputStream};
33
use crate::runtime::Reactor;
44
use crate::time::Duration;
55
use wasi::http::types::{OutgoingBody, RequestOptions as WasiRequestOptions};
@@ -27,9 +27,7 @@ impl Client {
2727
let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap();
2828

2929
// 2. Start sending the request body
30-
io::copy(body, OutputStream::new(body_stream))
31-
.await
32-
.expect("io::copy broke oh no");
30+
io::copy(body, AsyncOutputStream::new(body_stream)).await?;
3331

3432
// 3. Finish sending the request body
3533
let trailers = None;
@@ -74,33 +72,6 @@ impl Client {
7472
}
7573
}
7674

77-
struct OutputStream {
78-
stream: wasi::http::types::OutputStream,
79-
}
80-
81-
impl OutputStream {
82-
fn new(stream: wasi::http::types::OutputStream) -> Self {
83-
Self { stream }
84-
}
85-
}
86-
87-
impl AsyncWrite for OutputStream {
88-
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
89-
let max = self.stream.check_write().unwrap() as usize;
90-
let max = max.min(buf.len());
91-
let buf = &buf[0..max];
92-
self.stream.write(buf).unwrap();
93-
Reactor::current().wait_for(self.stream.subscribe()).await;
94-
Ok(max)
95-
}
96-
97-
async fn flush(&mut self) -> io::Result<()> {
98-
self.stream.flush().unwrap();
99-
Reactor::current().wait_for(self.stream.subscribe()).await;
100-
Ok(())
101-
}
102-
}
103-
10475
#[derive(Default, Debug)]
10576
struct RequestOptions {
10677
connect_timeout: Option<Duration>,

src/http/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ impl fmt::Debug for Error {
2424
ErrorVariant::HeaderName(e) => write!(f, "header name error: {e:?}"),
2525
ErrorVariant::HeaderValue(e) => write!(f, "header value error: {e:?}"),
2626
ErrorVariant::Method(e) => write!(f, "method error: {e:?}"),
27+
ErrorVariant::BodyIo(e) => write!(f, "body error: {e:?}"),
2728
ErrorVariant::Other(e) => write!(f, "{e}"),
2829
}
2930
}
@@ -37,6 +38,7 @@ impl fmt::Display for Error {
3738
ErrorVariant::HeaderName(e) => write!(f, "header name error: {e}"),
3839
ErrorVariant::HeaderValue(e) => write!(f, "header value error: {e}"),
3940
ErrorVariant::Method(e) => write!(f, "method error: {e}"),
41+
ErrorVariant::BodyIo(e) => write!(f, "body error: {e}"),
4042
ErrorVariant::Other(e) => write!(f, "{e}"),
4143
}
4244
}
@@ -100,12 +102,19 @@ impl From<InvalidMethod> for Error {
100102
}
101103
}
102104

105+
impl From<std::io::Error> for Error {
106+
fn from(e: std::io::Error) -> Error {
107+
ErrorVariant::BodyIo(e).into()
108+
}
109+
}
110+
103111
#[derive(Debug)]
104112
pub enum ErrorVariant {
105113
WasiHttp(WasiHttpErrorCode),
106114
WasiHeader(WasiHttpHeaderError),
107115
HeaderName(InvalidHeaderName),
108116
HeaderValue(InvalidHeaderValue),
109117
Method(InvalidMethod),
118+
BodyIo(std::io::Error),
110119
Other(String),
111120
}

src/http/response.rs

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
use wasi::http::types::{IncomingBody as WasiIncomingBody, IncomingResponse};
2-
use wasi::io::streams::{InputStream, StreamError};
32

43
use super::{fields::header_map_from_wasi, Body, Error, HeaderMap, Result, StatusCode};
5-
use crate::io::AsyncRead;
6-
use crate::runtime::Reactor;
7-
8-
/// Stream 2kb chunks at a time
9-
const CHUNK_SIZE: u64 = 2048;
4+
use crate::io::{AsyncInputStream, AsyncRead};
105

116
/// An HTTP response
127
#[derive(Debug)]
@@ -57,9 +52,7 @@ impl Response<IncomingBody> {
5752

5853
let body = IncomingBody {
5954
kind,
60-
buf_offset: 0,
61-
buf: None,
62-
body_stream,
55+
body_stream: AsyncInputStream::new(body_stream),
6356
_incoming_body: incoming_body,
6457
};
6558

@@ -96,54 +89,15 @@ impl<B: Body> Response<B> {
9689
#[derive(Debug)]
9790
pub struct IncomingBody {
9891
kind: BodyKind,
99-
buf: Option<Vec<u8>>,
100-
// How many bytes have we already read from the buf?
101-
buf_offset: usize,
102-
10392
// IMPORTANT: the order of these fields here matters. `body_stream` must
10493
// be dropped before `_incoming_body`.
105-
body_stream: InputStream,
94+
body_stream: AsyncInputStream,
10695
_incoming_body: WasiIncomingBody,
10796
}
10897

10998
impl AsyncRead for IncomingBody {
11099
async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result<usize> {
111-
let buf = match &mut self.buf {
112-
Some(ref mut buf) => buf,
113-
None => {
114-
// Wait for an event to be ready
115-
let pollable = self.body_stream.subscribe();
116-
Reactor::current().wait_for(pollable).await;
117-
118-
// Read the bytes from the body stream
119-
let buf = match self.body_stream.read(CHUNK_SIZE) {
120-
Ok(buf) => buf,
121-
Err(StreamError::Closed) => return Ok(0),
122-
Err(StreamError::LastOperationFailed(err)) => {
123-
return Err(std::io::Error::other(format!(
124-
"last operation failed: {}",
125-
err.to_debug_string()
126-
)))
127-
}
128-
};
129-
self.buf.insert(buf)
130-
}
131-
};
132-
133-
// copy bytes
134-
let len = (buf.len() - self.buf_offset).min(out_buf.len());
135-
let max = self.buf_offset + len;
136-
let slice = &buf[self.buf_offset..max];
137-
out_buf[0..len].copy_from_slice(slice);
138-
self.buf_offset += len;
139-
140-
// reset the local slice if necessary
141-
if self.buf_offset == buf.len() {
142-
self.buf = None;
143-
self.buf_offset = 0;
144-
}
145-
146-
Ok(len)
100+
self.body_stream.read(out_buf).await
147101
}
148102
}
149103

src/io/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@ mod cursor;
55
mod empty;
66
mod read;
77
mod seek;
8+
mod stdio;
9+
mod streams;
810
mod write;
911

12+
pub use crate::runtime::AsyncPollable;
1013
pub use copy::*;
1114
pub use cursor::*;
1215
pub use empty::*;
1316
pub use read::*;
1417
pub use seek::*;
18+
pub use stdio::*;
19+
pub use streams::*;
1520
pub use write::*;
1621

1722
/// The error type for I/O operations.

src/io/stdio.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
use super::{AsyncInputStream, AsyncOutputStream};
2+
use std::cell::LazyCell;
3+
use wasi::cli::terminal_input::TerminalInput;
4+
use wasi::cli::terminal_output::TerminalOutput;
5+
6+
/// Use the program's stdin as an `AsyncInputStream`.
7+
#[derive(Debug)]
8+
pub struct Stdin {
9+
stream: AsyncInputStream,
10+
terminput: LazyCell<Option<TerminalInput>>,
11+
}
12+
13+
/// Get the program's stdin for use as an `AsyncInputStream`.
14+
pub fn stdin() -> Stdin {
15+
let stream = AsyncInputStream::new(wasi::cli::stdin::get_stdin());
16+
Stdin {
17+
stream,
18+
terminput: LazyCell::new(|| wasi::cli::terminal_stdin::get_terminal_stdin()),
19+
}
20+
}
21+
22+
impl std::ops::Deref for Stdin {
23+
type Target = AsyncInputStream;
24+
fn deref(&self) -> &AsyncInputStream {
25+
&self.stream
26+
}
27+
}
28+
impl std::ops::DerefMut for Stdin {
29+
fn deref_mut(&mut self) -> &mut AsyncInputStream {
30+
&mut self.stream
31+
}
32+
}
33+
34+
impl Stdin {
35+
/// Check if stdin is a terminal.
36+
pub fn is_terminal(&self) -> bool {
37+
LazyCell::force(&self.terminput).is_some()
38+
}
39+
}
40+
41+
/// Use the program's stdout as an `AsyncOutputStream`.
42+
#[derive(Debug)]
43+
pub struct Stdout {
44+
stream: AsyncOutputStream,
45+
termoutput: LazyCell<Option<TerminalOutput>>,
46+
}
47+
48+
/// Get the program's stdout for use as an `AsyncOutputStream`.
49+
pub fn stdout() -> Stdout {
50+
let stream = AsyncOutputStream::new(wasi::cli::stdout::get_stdout());
51+
Stdout {
52+
stream,
53+
termoutput: LazyCell::new(|| wasi::cli::terminal_stdout::get_terminal_stdout()),
54+
}
55+
}
56+
57+
impl Stdout {
58+
/// Check if stdout is a terminal.
59+
pub fn is_terminal(&self) -> bool {
60+
LazyCell::force(&self.termoutput).is_some()
61+
}
62+
}
63+
64+
impl std::ops::Deref for Stdout {
65+
type Target = AsyncOutputStream;
66+
fn deref(&self) -> &AsyncOutputStream {
67+
&self.stream
68+
}
69+
}
70+
impl std::ops::DerefMut for Stdout {
71+
fn deref_mut(&mut self) -> &mut AsyncOutputStream {
72+
&mut self.stream
73+
}
74+
}
75+
76+
/// Use the program's stdout as an `AsyncOutputStream`.
77+
#[derive(Debug)]
78+
pub struct Stderr {
79+
stream: AsyncOutputStream,
80+
termoutput: LazyCell<Option<TerminalOutput>>,
81+
}
82+
83+
/// Get the program's stdout for use as an `AsyncOutputStream`.
84+
pub fn stderr() -> Stderr {
85+
let stream = AsyncOutputStream::new(wasi::cli::stderr::get_stderr());
86+
Stderr {
87+
stream,
88+
termoutput: LazyCell::new(|| wasi::cli::terminal_stderr::get_terminal_stderr()),
89+
}
90+
}
91+
92+
impl Stderr {
93+
/// Check if stderr is a terminal.
94+
pub fn is_terminal(&self) -> bool {
95+
LazyCell::force(&self.termoutput).is_some()
96+
}
97+
}
98+
99+
impl std::ops::Deref for Stderr {
100+
type Target = AsyncOutputStream;
101+
fn deref(&self) -> &AsyncOutputStream {
102+
&self.stream
103+
}
104+
}
105+
impl std::ops::DerefMut for Stderr {
106+
fn deref_mut(&mut self) -> &mut AsyncOutputStream {
107+
&mut self.stream
108+
}
109+
}
110+
111+
#[cfg(test)]
112+
mod test {
113+
use crate::io::AsyncWrite;
114+
use crate::runtime::block_on;
115+
#[test]
116+
// No internal predicate. Run test with --nocapture and inspect output manually.
117+
fn stdout_println_hello_world() {
118+
block_on(async {
119+
let mut stdout = super::stdout();
120+
let term = if stdout.is_terminal() { "is" } else { "is not" };
121+
stdout
122+
.write_all(format!("hello, world! stdout {term} a terminal\n",).as_bytes())
123+
.await
124+
.unwrap();
125+
})
126+
}
127+
#[test]
128+
// No internal predicate. Run test with --nocapture and inspect output manually.
129+
fn stderr_println_hello_world() {
130+
block_on(async {
131+
let mut stdout = super::stdout();
132+
let term = if stdout.is_terminal() { "is" } else { "is not" };
133+
stdout
134+
.write_all(format!("hello, world! stderr {term} a terminal\n",).as_bytes())
135+
.await
136+
.unwrap();
137+
})
138+
}
139+
}

0 commit comments

Comments
 (0)