Skip to content

Commit 46d0188

Browse files
committed
add AsyncInputStream and AsyncOutputStream
1 parent 3053229 commit 46d0188

File tree

3 files changed

+131
-0
lines changed

3 files changed

+131
-0
lines changed

src/io/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ mod cursor;
55
mod empty;
66
mod read;
77
mod seek;
8+
mod streams;
89
mod write;
910

11+
pub use crate::runtime::AsyncPollable;
1012
pub use copy::*;
1113
pub use cursor::*;
1214
pub use empty::*;
1315
pub use read::*;
1416
pub use seek::*;
17+
pub use streams::*;
1518
pub use write::*;
1619

1720
/// The error type for I/O operations.

src/io/streams.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use super::{AsyncPollable, AsyncRead, AsyncWrite};
2+
use std::cell::RefCell;
3+
use std::io::Result;
4+
use wasi::io::streams::{InputStream, OutputStream, StreamError};
5+
6+
pub struct AsyncInputStream {
7+
// Lazily initialized pollable, used for lifetime of stream to check readiness.
8+
// Field ordering matters: this child must be dropped before stream
9+
subscription: RefCell<Option<AsyncPollable>>,
10+
stream: InputStream,
11+
}
12+
13+
impl AsyncInputStream {
14+
pub fn new(stream: InputStream) -> Self {
15+
Self {
16+
subscription: RefCell::new(None),
17+
stream,
18+
}
19+
}
20+
async fn ready(&self) {
21+
// Lazily initialize the AsyncPollable
22+
if self.subscription.borrow().is_none() {
23+
self.subscription
24+
.replace(Some(AsyncPollable::new(self.stream.subscribe())));
25+
}
26+
// Wait on readiness
27+
self.subscription
28+
.borrow()
29+
.as_ref()
30+
.expect("populated refcell")
31+
.wait_for()
32+
.await;
33+
}
34+
}
35+
36+
impl AsyncRead for AsyncInputStream {
37+
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
38+
self.ready().await;
39+
// Ideally, the ABI would be able to read directly into buf. However, with the default
40+
// generated bindings, it returns a newly allocated vec, which we need to copy into buf.
41+
let read = match self.stream.read(buf.len() as u64) {
42+
Ok(r) => r,
43+
Err(StreamError::Closed) => return Ok(0),
44+
Err(StreamError::LastOperationFailed(err)) => {
45+
return Err(std::io::Error::other(err.to_debug_string()))
46+
}
47+
};
48+
let len = read.len();
49+
buf[0..len].copy_from_slice(&read);
50+
Ok(len)
51+
}
52+
}
53+
54+
pub struct AsyncOutputStream {
55+
// Lazily initialized pollable, used for lifetime of stream to check readiness.
56+
// Field ordering matters: this child must be dropped before stream
57+
subscription: RefCell<Option<AsyncPollable>>,
58+
stream: OutputStream,
59+
}
60+
61+
impl AsyncOutputStream {
62+
pub fn new(stream: OutputStream) -> Self {
63+
Self {
64+
subscription: RefCell::new(None),
65+
stream,
66+
}
67+
}
68+
async fn ready(&self) {
69+
// Lazily initialize the AsyncPollable
70+
if self.subscription.borrow().is_none() {
71+
self.subscription
72+
.replace(Some(AsyncPollable::new(self.stream.subscribe())));
73+
}
74+
// Wait on readiness
75+
self.subscription
76+
.borrow()
77+
.as_ref()
78+
.expect("populated refcell")
79+
.wait_for()
80+
.await;
81+
}
82+
}
83+
impl AsyncWrite for AsyncOutputStream {
84+
// Required methods
85+
async fn write(&mut self, buf: &[u8]) -> Result<usize> {
86+
// Loops at most twice.
87+
loop {
88+
match self.stream.check_write() {
89+
Ok(0) => {
90+
self.ready().await;
91+
// Next loop guaranteed to have nonzero check_write, or error.
92+
continue;
93+
}
94+
Ok(some) => {
95+
let writable = some.try_into().unwrap_or(usize::MAX).min(buf.len());
96+
match self.stream.write(&buf[0..writable]) {
97+
Ok(()) => return Ok(writable),
98+
Err(StreamError::Closed) => return Ok(0),
99+
Err(StreamError::LastOperationFailed(err)) => {
100+
return Err(std::io::Error::other(err.to_debug_string()))
101+
}
102+
}
103+
}
104+
Err(StreamError::Closed) => return Ok(0),
105+
Err(StreamError::LastOperationFailed(err)) => {
106+
return Err(std::io::Error::other(err.to_debug_string()))
107+
}
108+
}
109+
}
110+
}
111+
async fn flush(&mut self) -> Result<()> {
112+
match self.stream.flush() {
113+
Ok(()) => {
114+
self.ready().await;
115+
Ok(())
116+
}
117+
Err(StreamError::Closed) => Ok(()),
118+
Err(StreamError::LastOperationFailed(err)) => {
119+
Err(std::io::Error::other(err.to_debug_string()))
120+
}
121+
}
122+
}
123+
}

src/runtime/reactor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ impl Drop for Registration {
3333
pub struct AsyncPollable(Rc<Registration>);
3434

3535
impl AsyncPollable {
36+
/// Create an `AsyncPollable` from a Wasi `Pollable`. Schedules the `Pollable` with the current
37+
/// `Reactor`.
38+
pub fn new(pollable: Pollable) -> Self {
39+
Reactor::current().schedule(pollable)
40+
}
3641
/// Create a Future that waits for the Pollable's readiness.
3742
pub fn wait_for(&self) -> WaitFor {
3843
use std::sync::atomic::{AtomicUsize, Ordering};

0 commit comments

Comments
 (0)