Skip to content

Commit e781d6b

Browse files
committed
add some futures_core::stream::Stream impls for AsyncInputStream
1 parent 383902a commit e781d6b

File tree

2 files changed

+131
-11
lines changed

2 files changed

+131
-11
lines changed

src/io/streams.rs

Lines changed: 129 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use super::{AsyncPollable, AsyncRead, AsyncWrite};
22
use std::cell::OnceCell;
3-
use std::io::Result;
3+
use std::future::{poll_fn, Future};
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
46
use wasi::io::streams::{InputStream, OutputStream, StreamError};
57

68
/// A wrapper for WASI's `InputStream` resource that provides implementations of `AsyncRead` and
@@ -21,18 +23,23 @@ impl AsyncInputStream {
2123
stream,
2224
}
2325
}
24-
/// Await for read readiness.
25-
async fn ready(&self) {
26+
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
2627
// Lazily initialize the AsyncPollable
2728
let subscription = self
2829
.subscription
2930
.get_or_init(|| AsyncPollable::new(self.stream.subscribe()));
3031
// Wait on readiness
31-
subscription.wait_for().await;
32+
let wait_for = subscription.wait_for();
33+
let mut pinned = std::pin::pin!(wait_for);
34+
pinned.as_mut().poll(cx)
35+
}
36+
/// Await for read readiness.
37+
async fn ready(&self) {
38+
poll_fn(|cx| self.poll_ready(cx)).await
3239
}
3340
/// Asynchronously read from the input stream.
3441
/// This method is the same as [`AsyncRead::read`], but doesn't require a `&mut self`.
35-
pub async fn read(&self, buf: &mut [u8]) -> Result<usize> {
42+
pub async fn read(&self, buf: &mut [u8]) -> std::io::Result<usize> {
3643
let read = loop {
3744
self.ready().await;
3845
// Ideally, the ABI would be able to read directly into buf.
@@ -56,10 +63,40 @@ impl AsyncInputStream {
5663
buf[0..len].copy_from_slice(&read);
5764
Ok(len)
5865
}
66+
67+
/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
68+
/// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
69+
/// will be at most 8k. If you want to control chunk size, use
70+
/// `Self::into_stream_of`.
71+
pub fn into_stream(self) -> AsyncInputChunkStream {
72+
AsyncInputChunkStream {
73+
stream: self,
74+
chunk_size: 8 * 1024,
75+
}
76+
}
77+
78+
/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
79+
/// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
80+
/// will be at most the `chunk_size` argument specified.
81+
pub fn into_stream_of(self, chunk_size: usize) -> AsyncInputChunkStream {
82+
AsyncInputChunkStream {
83+
stream: self,
84+
chunk_size,
85+
}
86+
}
87+
88+
/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
89+
/// items of `Result<u8, std::io::Error>`.
90+
pub fn into_bytestream(self) -> AsyncInputByteStream {
91+
AsyncInputByteStream {
92+
stream: self.into_stream(),
93+
buffer: std::io::Read::bytes(std::io::Cursor::new(Vec::new())),
94+
}
95+
}
5996
}
6097

6198
impl AsyncRead for AsyncInputStream {
62-
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
99+
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
63100
Self::read(self, buf).await
64101
}
65102

@@ -69,6 +106,87 @@ impl AsyncRead for AsyncInputStream {
69106
}
70107
}
71108

109+
/// Wrapper of `AsyncInputStream` that impls `futures_core::stream::Stream`
110+
/// with an item of `Result<Vec<u8>, std::io::Error>`
111+
pub struct AsyncInputChunkStream {
112+
stream: AsyncInputStream,
113+
chunk_size: usize,
114+
}
115+
116+
impl AsyncInputChunkStream {
117+
/// Extract the `AsyncInputStream` which backs this stream.
118+
pub fn into_inner(self) -> AsyncInputStream {
119+
self.stream
120+
}
121+
}
122+
123+
impl futures_core::stream::Stream for AsyncInputChunkStream {
124+
type Item = Result<Vec<u8>, std::io::Error>;
125+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126+
match self.stream.poll_ready(cx) {
127+
Poll::Pending => Poll::Pending,
128+
Poll::Ready(()) => match self.stream.stream.read(self.chunk_size as u64) {
129+
Ok(r) if r.is_empty() => Poll::Pending,
130+
Ok(r) => Poll::Ready(Some(Ok(r))),
131+
Err(StreamError::LastOperationFailed(err)) => {
132+
Poll::Ready(Some(Err(std::io::Error::other(err.to_debug_string()))))
133+
}
134+
Err(StreamError::Closed) => Poll::Ready(None),
135+
},
136+
}
137+
}
138+
}
139+
140+
pin_project_lite::pin_project! {
141+
/// Wrapper of `AsyncInputStream` that impls
142+
/// `futures_core::stream::Stream` with item `Result<u8, std::io::Error>`.
143+
pub struct AsyncInputByteStream {
144+
#[pin]
145+
stream: AsyncInputChunkStream,
146+
buffer: std::io::Bytes<std::io::Cursor<Vec<u8>>>,
147+
}
148+
}
149+
150+
impl AsyncInputByteStream {
151+
/// Extract the `AsyncInputStream` which backs this stream, and any bytes
152+
/// read from the `AsyncInputStream` which have not yet been yielded by
153+
/// the byte stream.
154+
pub fn into_inner(self) -> (AsyncInputStream, Vec<u8>) {
155+
(
156+
self.stream.into_inner(),
157+
self.buffer
158+
.collect::<Result<Vec<u8>, std::io::Error>>()
159+
.expect("read of Cursor<Vec<u8>> is infallible"),
160+
)
161+
}
162+
}
163+
164+
impl futures_core::stream::Stream for AsyncInputByteStream {
165+
type Item = Result<u8, std::io::Error>;
166+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167+
let this = self.project();
168+
match this.buffer.next() {
169+
Some(byte) => Poll::Ready(Some(Ok(byte.expect("cursor on Vec<u8> is infallible")))),
170+
None => match futures_core::stream::Stream::poll_next(this.stream, cx) {
171+
Poll::Ready(Some(Ok(bytes))) => {
172+
let mut bytes = std::io::Read::bytes(std::io::Cursor::new(bytes));
173+
match bytes.next() {
174+
Some(Ok(byte)) => {
175+
*this.buffer = bytes;
176+
Poll::Ready(Some(Ok(byte)))
177+
}
178+
Some(Err(err)) => Poll::Ready(Some(Err(err))),
179+
None => Poll::Ready(None),
180+
}
181+
}
182+
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
183+
Poll::Ready(None) => Poll::Ready(None),
184+
Poll::Pending => Poll::Pending,
185+
},
186+
}
187+
}
188+
}
189+
72190
/// A wrapper for WASI's `output-stream` resource that provides implementations of `AsyncWrite` and
73191
/// `AsyncPollable`.
74192
#[derive(Debug)]
@@ -104,7 +222,7 @@ impl AsyncOutputStream {
104222
/// a `std::io::Error` indicating either an error returned by the stream write
105223
/// using the debug string provided by the WASI error, or else that the,
106224
/// indicated by `std::io::ErrorKind::ConnectionReset`.
107-
pub async fn write(&self, buf: &[u8]) -> Result<usize> {
225+
pub async fn write(&self, buf: &[u8]) -> std::io::Result<usize> {
108226
// Loops at most twice.
109227
loop {
110228
match self.stream.check_write() {
@@ -145,7 +263,7 @@ impl AsyncOutputStream {
145263
/// the stream flush, using the debug string provided by the WASI error,
146264
/// or else that the stream is closed, indicated by
147265
/// `std::io::ErrorKind::ConnectionReset`.
148-
pub async fn flush(&self) -> Result<()> {
266+
pub async fn flush(&self) -> std::io::Result<()> {
149267
match self.stream.flush() {
150268
Ok(()) => {
151269
self.ready().await;
@@ -162,10 +280,10 @@ impl AsyncOutputStream {
162280
}
163281
impl AsyncWrite for AsyncOutputStream {
164282
// Required methods
165-
async fn write(&mut self, buf: &[u8]) -> Result<usize> {
283+
async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
166284
Self::write(self, buf).await
167285
}
168-
async fn flush(&mut self) -> Result<()> {
286+
async fn flush(&mut self) -> std::io::Result<()> {
169287
Self::flush(self).await
170288
}
171289

@@ -180,7 +298,7 @@ pub(crate) async fn splice(
180298
reader: &AsyncInputStream,
181299
writer: &AsyncOutputStream,
182300
len: u64,
183-
) -> core::result::Result<u64, StreamError> {
301+
) -> Result<u64, StreamError> {
184302
// Wait for both streams to be ready.
185303
let r = reader.ready();
186304
writer.ready().await;

src/runtime/reactor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ impl AsyncPollable {
3838
pub fn new(pollable: Pollable) -> Self {
3939
Reactor::current().schedule(pollable)
4040
}
41+
// TODO: can I instead return a Pin<&mut WaitFor> here? so we dont keep
42+
// recreating this.
4143
/// Create a Future that waits for the Pollable's readiness.
4244
pub fn wait_for(&self) -> WaitFor {
4345
use std::sync::atomic::{AtomicUsize, Ordering};

0 commit comments

Comments
 (0)