Skip to content

add some futures_core::stream::Stream impls for AsyncInputStream #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 129 additions & 11 deletions src/io/streams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::{AsyncPollable, AsyncRead, AsyncWrite};
use std::cell::OnceCell;
use std::io::Result;
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::task::{Context, Poll};
use wasi::io::streams::{InputStream, OutputStream, StreamError};

/// A wrapper for WASI's `InputStream` resource that provides implementations of `AsyncRead` and
Expand All @@ -21,18 +23,23 @@ impl AsyncInputStream {
stream,
}
}
/// Await for read readiness.
async fn ready(&self) {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
// Lazily initialize the AsyncPollable
let subscription = self
.subscription
.get_or_init(|| AsyncPollable::new(self.stream.subscribe()));
// Wait on readiness
subscription.wait_for().await;
let wait_for = subscription.wait_for();
let mut pinned = std::pin::pin!(wait_for);
pinned.as_mut().poll(cx)
}
/// Await for read readiness.
async fn ready(&self) {
poll_fn(|cx| self.poll_ready(cx)).await
}
/// Asynchronously read from the input stream.
/// This method is the same as [`AsyncRead::read`], but doesn't require a `&mut self`.
pub async fn read(&self, buf: &mut [u8]) -> Result<usize> {
pub async fn read(&self, buf: &mut [u8]) -> std::io::Result<usize> {
let read = loop {
self.ready().await;
// Ideally, the ABI would be able to read directly into buf.
Expand All @@ -56,10 +63,40 @@ impl AsyncInputStream {
buf[0..len].copy_from_slice(&read);
Ok(len)
}

/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
/// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
/// will be at most 8k. If you want to control chunk size, use
/// `Self::into_stream_of`.
pub fn into_stream(self) -> AsyncInputChunkStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling into_stream() on an AsyncInputStream feels odd. 😆

Could this instead be:

impl AsyncInputChunkStream {
    pub fn new(stream: AsyncInputStream) -> Self {
        Self {
            stream,
            chunk_size: 8 * 1024,
        }
    }

    pub fn with_size(stream: AsyncInputStream, chunk_size: usize) -> Self {
        Self {
            stream,
            chunk_size,
        }
    }
}

which makes it somewhat analogous to Vec::new and Vec::with_capacity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anyone wants to think about AsyncInputChunkStream as a type itself, I would make it a private type and just return impl Stream<Item = ...> since that is all that matters, except we need to return a concrete type in case the user wants the into_inner method on the ChunkStream/ByteStream. I don't think the analogy to Vec really holds much water.

AsyncInputChunkStream {
stream: self,
chunk_size: 8 * 1024,
}
}

/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
/// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
/// will be at most the `chunk_size` argument specified.
pub fn into_stream_of(self, chunk_size: usize) -> AsyncInputChunkStream {
AsyncInputChunkStream {
stream: self,
chunk_size,
}
}

/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
/// items of `Result<u8, std::io::Error>`.
pub fn into_bytestream(self) -> AsyncInputByteStream {
AsyncInputByteStream {
stream: self.into_stream(),
buffer: std::io::Read::bytes(std::io::Cursor::new(Vec::new())),
}
}
}

impl AsyncRead for AsyncInputStream {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
Self::read(self, buf).await
}

Expand All @@ -69,6 +106,87 @@ impl AsyncRead for AsyncInputStream {
}
}

/// Wrapper of `AsyncInputStream` that impls `futures_core::stream::Stream`
/// with an item of `Result<Vec<u8>, std::io::Error>`
pub struct AsyncInputChunkStream {
stream: AsyncInputStream,
chunk_size: usize,
}

impl AsyncInputChunkStream {
/// Extract the `AsyncInputStream` which backs this stream.
pub fn into_inner(self) -> AsyncInputStream {
self.stream
}
}

impl futures_core::stream::Stream for AsyncInputChunkStream {
type Item = Result<Vec<u8>, std::io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.stream.poll_ready(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => match self.stream.stream.read(self.chunk_size as u64) {
Ok(r) if r.is_empty() => Poll::Pending,
Ok(r) => Poll::Ready(Some(Ok(r))),
Err(StreamError::LastOperationFailed(err)) => {
Poll::Ready(Some(Err(std::io::Error::other(err.to_debug_string()))))
}
Err(StreamError::Closed) => Poll::Ready(None),
},
}
}
}

pin_project_lite::pin_project! {
/// Wrapper of `AsyncInputStream` that impls
/// `futures_core::stream::Stream` with item `Result<u8, std::io::Error>`.
pub struct AsyncInputByteStream {
#[pin]
stream: AsyncInputChunkStream,
buffer: std::io::Bytes<std::io::Cursor<Vec<u8>>>,
}
}

impl AsyncInputByteStream {
/// Extract the `AsyncInputStream` which backs this stream, and any bytes
/// read from the `AsyncInputStream` which have not yet been yielded by
/// the byte stream.
pub fn into_inner(self) -> (AsyncInputStream, Vec<u8>) {
(
self.stream.into_inner(),
self.buffer
.collect::<Result<Vec<u8>, std::io::Error>>()
.expect("read of Cursor<Vec<u8>> is infallible"),
)
}
}

impl futures_core::stream::Stream for AsyncInputByteStream {
type Item = Result<u8, std::io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.buffer.next() {
Some(byte) => Poll::Ready(Some(Ok(byte.expect("cursor on Vec<u8> is infallible")))),
None => match futures_core::stream::Stream::poll_next(this.stream, cx) {
Poll::Ready(Some(Ok(bytes))) => {
let mut bytes = std::io::Read::bytes(std::io::Cursor::new(bytes));
match bytes.next() {
Some(Ok(byte)) => {
*this.buffer = bytes;
Poll::Ready(Some(Ok(byte)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
}
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
}
}
}

/// A wrapper for WASI's `output-stream` resource that provides implementations of `AsyncWrite` and
/// `AsyncPollable`.
#[derive(Debug)]
Expand Down Expand Up @@ -104,7 +222,7 @@ impl AsyncOutputStream {
/// a `std::io::Error` indicating either an error returned by the stream write
/// using the debug string provided by the WASI error, or else that the,
/// indicated by `std::io::ErrorKind::ConnectionReset`.
pub async fn write(&self, buf: &[u8]) -> Result<usize> {
pub async fn write(&self, buf: &[u8]) -> std::io::Result<usize> {
// Loops at most twice.
loop {
match self.stream.check_write() {
Expand Down Expand Up @@ -145,7 +263,7 @@ impl AsyncOutputStream {
/// the stream flush, using the debug string provided by the WASI error,
/// or else that the stream is closed, indicated by
/// `std::io::ErrorKind::ConnectionReset`.
pub async fn flush(&self) -> Result<()> {
pub async fn flush(&self) -> std::io::Result<()> {
match self.stream.flush() {
Ok(()) => {
self.ready().await;
Expand All @@ -162,10 +280,10 @@ impl AsyncOutputStream {
}
impl AsyncWrite for AsyncOutputStream {
// Required methods
async fn write(&mut self, buf: &[u8]) -> Result<usize> {
async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Self::write(self, buf).await
}
async fn flush(&mut self) -> Result<()> {
async fn flush(&mut self) -> std::io::Result<()> {
Self::flush(self).await
}

Expand All @@ -180,7 +298,7 @@ pub(crate) async fn splice(
reader: &AsyncInputStream,
writer: &AsyncOutputStream,
len: u64,
) -> core::result::Result<u64, StreamError> {
) -> Result<u64, StreamError> {
// Wait for both streams to be ready.
let r = reader.ready();
writer.ready().await;
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ impl AsyncPollable {
pub fn new(pollable: Pollable) -> Self {
Reactor::current().schedule(pollable)
}
// TODO: can I instead return a Pin<&mut WaitFor> here? so we dont keep
// recreating this.
/// Create a Future that waits for the Pollable's readiness.
pub fn wait_for(&self) -> WaitFor {
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
Loading