From e781d6bb84fad9e5b955927ce6d14a691e1eeb73 Mon Sep 17 00:00:00 2001
From: Pat Hickey
Date: Fri, 8 Aug 2025 14:45:40 -0700
Subject: [PATCH 01/13] add some futures_core::stream::Stream impls for
AsyncInputStream
---
src/io/streams.rs | 140 +++++++++++++++++++++++++++++++++++++----
src/runtime/reactor.rs | 2 +
2 files changed, 131 insertions(+), 11 deletions(-)
diff --git a/src/io/streams.rs b/src/io/streams.rs
index d7139c1..c437dfa 100644
--- a/src/io/streams.rs
+++ b/src/io/streams.rs
@@ -1,6 +1,8 @@
use super::{AsyncPollable, AsyncRead, AsyncWrite};
use std::cell::OnceCell;
-use std::io::Result;
+use std::future::{poll_fn, Future};
+use std::pin::Pin;
+use std::task::{Context, Poll};
use wasi::io::streams::{InputStream, OutputStream, StreamError};
/// A wrapper for WASI's `InputStream` resource that provides implementations of `AsyncRead` and
@@ -21,18 +23,23 @@ impl AsyncInputStream {
stream,
}
}
- /// Await for read readiness.
- async fn ready(&self) {
+ fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
// Lazily initialize the AsyncPollable
let subscription = self
.subscription
.get_or_init(|| AsyncPollable::new(self.stream.subscribe()));
// Wait on readiness
- subscription.wait_for().await;
+ let wait_for = subscription.wait_for();
+ let mut pinned = std::pin::pin!(wait_for);
+ pinned.as_mut().poll(cx)
+ }
+ /// Await for read readiness.
+ async fn ready(&self) {
+ poll_fn(|cx| self.poll_ready(cx)).await
}
/// Asynchronously read from the input stream.
/// This method is the same as [`AsyncRead::read`], but doesn't require a `&mut self`.
- pub async fn read(&self, buf: &mut [u8]) -> Result {
+ pub async fn read(&self, buf: &mut [u8]) -> std::io::Result {
let read = loop {
self.ready().await;
// Ideally, the ABI would be able to read directly into buf.
@@ -56,10 +63,40 @@ impl AsyncInputStream {
buf[0..len].copy_from_slice(&read);
Ok(len)
}
+
+ /// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
+ /// items of `Result, std::io::Error>`. The returned byte vectors
+ /// will be at most 8k. If you want to control chunk size, use
+ /// `Self::into_stream_of`.
+ pub fn into_stream(self) -> AsyncInputChunkStream {
+ AsyncInputChunkStream {
+ stream: self,
+ chunk_size: 8 * 1024,
+ }
+ }
+
+ /// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
+ /// items of `Result, std::io::Error>`. The returned byte vectors
+ /// will be at most the `chunk_size` argument specified.
+ pub fn into_stream_of(self, chunk_size: usize) -> AsyncInputChunkStream {
+ AsyncInputChunkStream {
+ stream: self,
+ chunk_size,
+ }
+ }
+
+ /// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
+ /// items of `Result`.
+ pub fn into_bytestream(self) -> AsyncInputByteStream {
+ AsyncInputByteStream {
+ stream: self.into_stream(),
+ buffer: std::io::Read::bytes(std::io::Cursor::new(Vec::new())),
+ }
+ }
}
impl AsyncRead for AsyncInputStream {
- async fn read(&mut self, buf: &mut [u8]) -> Result {
+ async fn read(&mut self, buf: &mut [u8]) -> std::io::Result {
Self::read(self, buf).await
}
@@ -69,6 +106,87 @@ impl AsyncRead for AsyncInputStream {
}
}
+/// Wrapper of `AsyncInputStream` that impls `futures_core::stream::Stream`
+/// with an item of `Result, std::io::Error>`
+pub struct AsyncInputChunkStream {
+ stream: AsyncInputStream,
+ chunk_size: usize,
+}
+
+impl AsyncInputChunkStream {
+ /// Extract the `AsyncInputStream` which backs this stream.
+ pub fn into_inner(self) -> AsyncInputStream {
+ self.stream
+ }
+}
+
+impl futures_core::stream::Stream for AsyncInputChunkStream {
+ type Item = Result, std::io::Error>;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll