Skip to content

Commit 8bafcb2

Browse files
authored
replace MyStd{in|out}Stream with shareable streams (#1982)
`MyStd{in|out}Stream` were written based on the assumption that `Std{in|out}Stream::stream` would only ever be called once. Turns out that's not true for guest components which are composed of multiple subcomponents, since each subcomponent will potentially want its own handle, so they need to be shareable. The easiest way to do that is provide cloneable implementations of `Host{In|Out}putStream` which operate synchronously. Note that this amounts to doing synchronous I/O in an asynchronous context, which we'd normally prefer to avoid, but the properly asynchronous implementations `Host{In|Out}putStream` based on `AsyncRead`/`AsyncWrite` are quite hairy and probably not worth it for "normal" stdio streams in Spin. If this does prove to be a performance bottleneck, though, we can certainly revisit it. Signed-off-by: Joel Dice <[email protected]>
1 parent 46da3d7 commit 8bafcb2

File tree

2 files changed

+104
-22
lines changed

2 files changed

+104
-22
lines changed

crates/core/src/io.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, HostOutputStream};
22

33
/// An in-memory stdio output buffer.
4+
#[derive(Clone)]
45
pub struct OutputBuffer(MemoryOutputPipe);
56

67
impl OutputBuffer {

crates/core/src/store.rs

Lines changed: 103 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use anyhow::{anyhow, Result};
2+
use bytes::Bytes;
23
use std::{
34
io::{Read, Write},
45
path::{Path, PathBuf},
5-
sync::Mutex,
6+
sync::{Arc, Mutex},
67
time::{Duration, Instant},
78
};
89
use system_interface::io::ReadReady;
@@ -11,10 +12,12 @@ use wasi_common_preview1 as wasi_preview1;
1112
use wasmtime_wasi as wasmtime_wasi_preview1;
1213
use wasmtime_wasi::preview2::{
1314
self as wasi_preview2, HostInputStream, HostOutputStream, StdinStream, StdoutStream,
15+
StreamError, StreamResult, Subscribe,
1416
};
1517
use wasmtime_wasi_http::types::WasiHttpCtx;
1618

1719
use crate::{
20+
async_trait,
1821
host_component::{HostComponents, HostComponentsData},
1922
io::OutputBuffer,
2023
limits::StoreLimitsAsync,
@@ -182,9 +185,7 @@ impl StoreBuilder {
182185
ctx.set_stdin(Box::new(wasi_preview1::pipe::ReadPipe::new(r)))
183186
}
184187
WasiCtxBuilder::Preview2(ctx) => {
185-
ctx.stdin(MyStdinStream(Mutex::new(Some(Box::new(
186-
wasi_preview2::pipe::AsyncReadStream::new(r),
187-
)))));
188+
ctx.stdin(PipeStdinStream::new(r));
188189
}
189190
})
190191
}
@@ -221,9 +222,7 @@ impl StoreBuilder {
221222
ctx.set_stdout(Box::new(wasi_preview1::pipe::WritePipe::new(w)))
222223
}
223224
WasiCtxBuilder::Preview2(ctx) => {
224-
ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new(
225-
wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w),
226-
)))));
225+
ctx.stdout(PipeStdoutStream::new(w));
227226
}
228227
})
229228
}
@@ -238,7 +237,7 @@ impl StoreBuilder {
238237
"`Store::stdout_buffered` only supported with WASI Preview 2"
239238
)),
240239
WasiCtxBuilder::Preview2(ctx) => {
241-
ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new(buffer.writer())))));
240+
ctx.stdout(BufferStdoutStream(buffer.clone()));
242241
Ok(())
243242
}
244243
})?;
@@ -264,9 +263,7 @@ impl StoreBuilder {
264263
ctx.set_stderr(Box::new(wasi_preview1::pipe::WritePipe::new(w)))
265264
}
266265
WasiCtxBuilder::Preview2(ctx) => {
267-
ctx.stderr(MyStdoutStream(Mutex::new(Some(Box::new(
268-
wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w),
269-
)))));
266+
ctx.stderr(PipeStdoutStream::new(w));
270267
}
271268
})
272269
}
@@ -426,31 +423,115 @@ impl StoreBuilder {
426423
}
427424
}
428425

429-
struct MyStdinStream(Mutex<Option<Box<dyn HostInputStream>>>);
426+
struct PipeStdinStream<T> {
427+
buffer: Vec<u8>,
428+
inner: Arc<Mutex<T>>,
429+
}
430430

431-
impl StdinStream for MyStdinStream {
432-
fn stream(&self) -> Box<dyn HostInputStream> {
433-
self.0
431+
impl<T> PipeStdinStream<T> {
432+
fn new(inner: T) -> Self {
433+
Self {
434+
buffer: vec![0_u8; 64 * 1024],
435+
inner: Arc::new(Mutex::new(inner)),
436+
}
437+
}
438+
}
439+
440+
impl<T> Clone for PipeStdinStream<T> {
441+
fn clone(&self) -> Self {
442+
Self {
443+
buffer: vec![0_u8; 64 * 1024],
444+
inner: self.inner.clone(),
445+
}
446+
}
447+
}
448+
449+
impl<T: Read + Send + Sync + 'static> HostInputStream for PipeStdinStream<T> {
450+
fn read(&mut self, size: usize) -> StreamResult<Bytes> {
451+
let size = size.min(self.buffer.len());
452+
453+
let count = self
454+
.inner
434455
.lock()
435456
.unwrap()
436-
.take()
437-
.expect("MyStdinStream::stream should only be called once")
457+
.read(&mut self.buffer[..size])
458+
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
459+
460+
Ok(Bytes::copy_from_slice(&self.buffer[..count]))
461+
}
462+
}
463+
464+
#[async_trait]
465+
impl<T: Read + Send + Sync + 'static> Subscribe for PipeStdinStream<T> {
466+
async fn ready(&mut self) {}
467+
}
468+
469+
impl<T: Read + Send + Sync + 'static> StdinStream for PipeStdinStream<T> {
470+
fn stream(&self) -> Box<dyn HostInputStream> {
471+
Box::new(self.clone())
438472
}
439473

440474
fn isatty(&self) -> bool {
441475
false
442476
}
443477
}
444478

445-
struct MyStdoutStream(Mutex<Option<Box<dyn HostOutputStream>>>);
479+
struct PipeStdoutStream<T>(Arc<Mutex<T>>);
446480

447-
impl StdoutStream for MyStdoutStream {
448-
fn stream(&self) -> Box<dyn HostOutputStream> {
481+
impl<T> Clone for PipeStdoutStream<T> {
482+
fn clone(&self) -> Self {
483+
Self(self.0.clone())
484+
}
485+
}
486+
487+
impl<T> PipeStdoutStream<T> {
488+
fn new(inner: T) -> Self {
489+
Self(Arc::new(Mutex::new(inner)))
490+
}
491+
}
492+
493+
impl<T: Write + Send + Sync + 'static> HostOutputStream for PipeStdoutStream<T> {
494+
fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
449495
self.0
450496
.lock()
451497
.unwrap()
452-
.take()
453-
.expect("MyStdoutStream::stream should only be called once")
498+
.write_all(&bytes)
499+
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
500+
}
501+
502+
fn flush(&mut self) -> Result<(), StreamError> {
503+
self.0
504+
.lock()
505+
.unwrap()
506+
.flush()
507+
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
508+
}
509+
510+
fn check_write(&mut self) -> Result<usize, StreamError> {
511+
Ok(1024 * 1024)
512+
}
513+
}
514+
515+
impl<T: Write + Send + Sync + 'static> StdoutStream for PipeStdoutStream<T> {
516+
fn stream(&self) -> Box<dyn HostOutputStream> {
517+
Box::new(self.clone())
518+
}
519+
520+
fn isatty(&self) -> bool {
521+
false
522+
}
523+
}
524+
525+
#[async_trait]
526+
impl<T: Write + Send + Sync + 'static> Subscribe for PipeStdoutStream<T> {
527+
async fn ready(&mut self) {}
528+
}
529+
530+
struct BufferStdoutStream(OutputBuffer);
531+
532+
impl StdoutStream for BufferStdoutStream {
533+
fn stream(&self) -> Box<dyn HostOutputStream> {
534+
Box::new(self.0.writer())
454535
}
455536

456537
fn isatty(&self) -> bool {

0 commit comments

Comments
 (0)