Skip to content

Commit ca0ba2d

Browse files
authored
Merge pull request #2738 from fermyon/working-stdout
Switch back to the old sync version of HostOutputStream.
2 parents 66f357a + 5c5e794 commit ca0ba2d

File tree

4 files changed

+147
-20
lines changed

4 files changed

+147
-20
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-wasi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = { workspace = true }
66

77
[dependencies]
88
async-trait = "0.1"
9+
bytes = "1.0"
910
cap-primitives = "3.0.0"
1011
spin-common = { path = "../common" }
1112
spin-factors = { path = "../factors" }

crates/factor-wasi/src/io.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use std::io::{Read, Write};
2+
use std::sync::{Arc, Mutex};
3+
4+
use async_trait::async_trait;
5+
use spin_factors::anyhow;
6+
use wasmtime_wasi::{
7+
HostInputStream, HostOutputStream, StdinStream, StdoutStream, StreamError, Subscribe,
8+
};
9+
10+
/// A [`HostOutputStream`] that writes to a `Write` type.
11+
///
12+
/// `StdinStream::stream` and `StdoutStream::new` can be called more than once in components
13+
/// which are composed of multiple subcomponents, since each subcomponent will potentially want
14+
/// its own handle. This means the streams need to be shareable. The easiest way to do that is
15+
/// provide cloneable implementations of streams which operate synchronously.
16+
///
17+
/// Note that this amounts to doing synchronous I/O in an asynchronous context, which we'd normally
18+
/// prefer to avoid, but the properly asynchronous implementations Host{In|Out}putStream based on
19+
/// `AsyncRead`/`AsyncWrite`` are quite hairy and probably not worth it for "normal" stdio streams in
20+
/// Spin. If this does prove to be a performance bottleneck, though, we can certainly revisit it.
21+
pub struct PipedWriteStream<T>(Arc<Mutex<T>>);
22+
23+
impl<T> PipedWriteStream<T> {
24+
pub fn new(inner: T) -> Self {
25+
Self(Arc::new(Mutex::new(inner)))
26+
}
27+
}
28+
29+
impl<T> Clone for PipedWriteStream<T> {
30+
fn clone(&self) -> Self {
31+
Self(self.0.clone())
32+
}
33+
}
34+
35+
impl<T: Write + Send + Sync + 'static> HostOutputStream for PipedWriteStream<T> {
36+
fn write(&mut self, bytes: bytes::Bytes) -> Result<(), StreamError> {
37+
self.0
38+
.lock()
39+
.unwrap()
40+
.write_all(&bytes)
41+
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
42+
}
43+
44+
fn flush(&mut self) -> Result<(), StreamError> {
45+
self.0
46+
.lock()
47+
.unwrap()
48+
.flush()
49+
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
50+
}
51+
52+
fn check_write(&mut self) -> Result<usize, StreamError> {
53+
Ok(1024 * 1024)
54+
}
55+
}
56+
57+
impl<T: Write + Send + Sync + 'static> StdoutStream for PipedWriteStream<T> {
58+
fn stream(&self) -> Box<dyn HostOutputStream> {
59+
Box::new(self.clone())
60+
}
61+
62+
fn isatty(&self) -> bool {
63+
false
64+
}
65+
}
66+
67+
#[async_trait]
68+
impl<T: Write + Send + Sync + 'static> Subscribe for PipedWriteStream<T> {
69+
async fn ready(&mut self) {}
70+
}
71+
72+
/// A [`HostInputStream`] that reads to a `Read` type.
73+
///
74+
/// See [`PipedWriteStream`] for more information on why this is synchronous.
75+
pub struct PipeReadStream<T> {
76+
buffer: Vec<u8>,
77+
inner: Arc<Mutex<T>>,
78+
}
79+
80+
impl<T> PipeReadStream<T> {
81+
pub fn new(inner: T) -> Self {
82+
Self {
83+
buffer: vec![0_u8; 64 * 1024],
84+
inner: Arc::new(Mutex::new(inner)),
85+
}
86+
}
87+
}
88+
89+
impl<T> Clone for PipeReadStream<T> {
90+
fn clone(&self) -> Self {
91+
Self {
92+
buffer: vec![0_u8; 64 * 1024],
93+
inner: self.inner.clone(),
94+
}
95+
}
96+
}
97+
98+
impl<T: Read + Send + Sync + 'static> HostInputStream for PipeReadStream<T> {
99+
fn read(&mut self, size: usize) -> wasmtime_wasi::StreamResult<bytes::Bytes> {
100+
let size = size.min(self.buffer.len());
101+
102+
let count = self
103+
.inner
104+
.lock()
105+
.unwrap()
106+
.read(&mut self.buffer[..size])
107+
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
108+
109+
Ok(bytes::Bytes::copy_from_slice(&self.buffer[..count]))
110+
}
111+
}
112+
113+
#[async_trait]
114+
impl<T: Read + Send + Sync + 'static> Subscribe for PipeReadStream<T> {
115+
async fn ready(&mut self) {}
116+
}
117+
118+
impl<T: Read + Send + Sync + 'static> StdinStream for PipeReadStream<T> {
119+
fn stream(&self) -> Box<dyn HostInputStream> {
120+
Box::new(self.clone())
121+
}
122+
123+
fn isatty(&self) -> bool {
124+
false
125+
}
126+
}

crates/factor-wasi/src/lib.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1+
mod io;
12
pub mod spin;
23
mod wasi_2023_10_18;
34
mod wasi_2023_11_10;
45

5-
use std::{future::Future, net::SocketAddr, path::Path};
6+
use std::{
7+
future::Future,
8+
io::{Read, Write},
9+
net::SocketAddr,
10+
path::Path,
11+
};
612

13+
use io::{PipeReadStream, PipedWriteStream};
714
use spin_factors::{
815
anyhow, AppComponent, Factor, FactorInstanceBuilder, InitContext, InstanceBuilders,
916
PrepareContext, RuntimeFactors, RuntimeFactorsInstanceState,
1017
};
11-
use tokio::io::{AsyncRead, AsyncWrite};
1218
use wasmtime_wasi::{
13-
pipe::{AsyncReadStream, AsyncWriteStream},
14-
AsyncStdinStream, AsyncStdoutStream, DirPerms, FilePerms, ResourceTable, StdinStream,
15-
StdoutStream, WasiCtx, WasiCtxBuilder, WasiImpl, WasiView,
19+
DirPerms, FilePerms, ResourceTable, StdinStream, StdoutStream, WasiCtx, WasiCtxBuilder,
20+
WasiImpl, WasiView,
1621
};
1722

1823
pub use wasmtime_wasi::SocketAddrUse;
@@ -179,35 +184,29 @@ impl InstanceBuilder {
179184
self.ctx.stdin(stdin);
180185
}
181186

182-
/// Sets the WASI `stdin` descriptor to the given [`AsyncRead`]er.
183-
pub fn stdin_pipe(&mut self, r: impl AsyncRead + Send + Unpin + 'static) {
184-
self.stdin(AsyncStdinStream::new(AsyncReadStream::new(r)));
187+
/// Sets the WASI `stdin` descriptor to the given [`Read`]er.
188+
pub fn stdin_pipe(&mut self, r: impl Read + Send + Sync + Unpin + 'static) {
189+
self.stdin(PipeReadStream::new(r));
185190
}
186191

187192
/// Sets the WASI `stdout` descriptor to the given [`StdoutStream`].
188193
pub fn stdout(&mut self, stdout: impl StdoutStream + 'static) {
189194
self.ctx.stdout(stdout);
190195
}
191196

192-
/// Sets the WASI `stdout` descriptor to the given [`AsyncWrite`]r.
193-
pub fn stdout_pipe(&mut self, w: impl AsyncWrite + Send + Unpin + 'static) {
194-
self.stdout(AsyncStdoutStream::new(AsyncWriteStream::new(
195-
1024 * 1024,
196-
w,
197-
)));
197+
/// Sets the WASI `stdout` descriptor to the given [`Write`]r.
198+
pub fn stdout_pipe(&mut self, w: impl Write + Send + Sync + Unpin + 'static) {
199+
self.stdout(PipedWriteStream::new(w));
198200
}
199201

200202
/// Sets the WASI `stderr` descriptor to the given [`StdoutStream`].
201203
pub fn stderr(&mut self, stderr: impl StdoutStream + 'static) {
202204
self.ctx.stderr(stderr);
203205
}
204206

205-
/// Sets the WASI `stderr` descriptor to the given [`AsyncWrite`]r.
206-
pub fn stderr_pipe(&mut self, w: impl AsyncWrite + Send + Unpin + 'static) {
207-
self.stderr(AsyncStdoutStream::new(AsyncWriteStream::new(
208-
1024 * 1024,
209-
w,
210-
)));
207+
/// Sets the WASI `stderr` descriptor to the given [`Write`]r.
208+
pub fn stderr_pipe(&mut self, w: impl Write + Send + Sync + Unpin + 'static) {
209+
self.stderr(PipedWriteStream::new(w));
211210
}
212211

213212
/// Appends the given strings to the WASI 'args'.

0 commit comments

Comments
 (0)