From 17bed2319ee722b5f2240bb7f4668733ab929e3c Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Sun, 7 Jan 2024 16:56:38 +0100 Subject: [PATCH 01/12] Required changes for Golem's durable worker executor --- crates/wasi-http/src/body.rs | 3 ++ crates/wasi-http/src/types_impl.rs | 4 +-- crates/wasi/src/preview2/ctx.rs | 21 +++++++++++ crates/wasi/src/preview2/filesystem.rs | 5 +++ crates/wasi/src/preview2/host/clocks.rs | 31 ++++++++++------ crates/wasi/src/preview2/host/env.rs | 8 +++-- crates/wasi/src/preview2/host/io.rs | 4 +-- crates/wasi/src/preview2/host/random.rs | 14 +++++--- crates/wasi/src/preview2/host/tcp.rs | 2 +- crates/wasi/src/preview2/host/udp.rs | 6 ++-- crates/wasi/src/preview2/ip_name_lookup.rs | 2 +- crates/wasi/src/preview2/mod.rs | 18 ++++++++++ crates/wasi/src/preview2/pipe.rs | 13 +++++++ crates/wasi/src/preview2/poll.rs | 42 +++++++++++++++++----- crates/wasi/src/preview2/preview1.rs | 35 +++++++++++++----- crates/wasi/src/preview2/stdio.rs | 4 +++ crates/wasi/src/preview2/stream.rs | 5 ++- crates/wasi/src/preview2/tcp.rs | 5 +++ crates/wasi/src/preview2/write_stream.rs | 5 +++ crates/wit-bindgen/src/lib.rs | 11 +++--- 20 files changed, 187 insertions(+), 51 deletions(-) diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index 92e73d420534..8f0f8938e17c 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -8,6 +8,7 @@ use std::future::Future; use std::mem; use std::task::{Context, Poll}; use std::{pin::Pin, sync::Arc, time::Duration}; +use std::any::Any; use tokio::sync::{mpsc, oneshot}; use wasmtime_wasi::preview2::{ self, poll_noop, AbortOnDropJoinHandle, HostInputStream, HostOutputStream, StreamError, @@ -567,6 +568,8 @@ impl BodyWriteStream { #[async_trait::async_trait] impl HostOutputStream for BodyWriteStream { + fn as_any(&self) -> &dyn Any { self } + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let len = bytes.len(); match self.writer.try_send(bytes) { diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 351f8f59d775..6cf1d7546284 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -644,7 +644,7 @@ impl crate::bindings::http::types::HostFutureTrailers for T { &mut self, index: Resource, ) -> wasmtime::Result> { - wasmtime_wasi::preview2::subscribe(self.table(), index) + wasmtime_wasi::preview2::subscribe(self.table(), index, None) } fn get( @@ -852,7 +852,7 @@ impl crate::bindings::http::types::HostFutureIncomingResponse f &mut self, id: Resource, ) -> wasmtime::Result> { - wasmtime_wasi::preview2::subscribe(self.table(), id) + wasmtime_wasi::preview2::subscribe(self.table(), id, None) } } diff --git a/crates/wasi/src/preview2/ctx.rs b/crates/wasi/src/preview2/ctx.rs index a0447711adf4..4a77917ad609 100644 --- a/crates/wasi/src/preview2/ctx.rs +++ b/crates/wasi/src/preview2/ctx.rs @@ -12,6 +12,7 @@ use crate::preview2::{ use cap_rand::{Rng, RngCore, SeedableRng}; use std::sync::Arc; use std::{mem, net::SocketAddr}; +use std::time::Duration; use wasmtime::component::ResourceTable; pub struct WasiCtxBuilder { @@ -28,6 +29,8 @@ pub struct WasiCtxBuilder { wall_clock: Box, monotonic_clock: Box, allowed_network_uses: AllowedNetworkUses, + suspend_threshold: Duration, + suspend_signal: Box anyhow::Error + Send + Sync + 'static>, built: bool, } @@ -78,6 +81,8 @@ impl WasiCtxBuilder { wall_clock: wall_clock(), monotonic_clock: monotonic_clock(), allowed_network_uses: AllowedNetworkUses::default(), + suspend_threshold: Duration::MAX, + suspend_signal: Box::new(|_| unreachable!("suspend_signal not set")), built: false, } } @@ -222,6 +227,16 @@ impl WasiCtxBuilder { self } + pub fn set_suspend( + &mut self, + suspend_threshold: Duration, + suspend_signal: impl Fn(Duration) -> anyhow::Error + Send + Sync + 'static, + ) -> &mut Self { + self.suspend_threshold = suspend_threshold; + self.suspend_signal = Box::new(suspend_signal); + self + } + /// Uses the configured context so far to construct the final `WasiCtx`. /// /// Note that each `WasiCtxBuilder` can only be used to "build" once, and @@ -247,6 +262,8 @@ impl WasiCtxBuilder { wall_clock, monotonic_clock, allowed_network_uses, + suspend_threshold, + suspend_signal, built: _, } = mem::replace(self, Self::new()); self.built = true; @@ -265,6 +282,8 @@ impl WasiCtxBuilder { wall_clock, monotonic_clock, allowed_network_uses, + suspend_signal, + suspend_threshold, } } } @@ -290,6 +309,8 @@ pub struct WasiCtx { pub(crate) stderr: Box, pub(crate) socket_addr_check: SocketAddrCheck, pub(crate) allowed_network_uses: AllowedNetworkUses, + pub(crate) suspend_threshold: Duration, + pub(crate) suspend_signal: Box anyhow::Error + Send + Sync + 'static>, } pub struct AllowedNetworkUses { diff --git a/crates/wasi/src/preview2/filesystem.rs b/crates/wasi/src/preview2/filesystem.rs index 1f1d149dd93f..b29f8f7f7712 100644 --- a/crates/wasi/src/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/filesystem.rs @@ -4,6 +4,7 @@ use crate::preview2::{ }; use anyhow::anyhow; use bytes::{Bytes, BytesMut}; +use std::any::Any; use std::io; use std::mem; use std::sync::Arc; @@ -215,6 +216,10 @@ impl FileOutputStream { const FILE_WRITE_CAPACITY: usize = 1024 * 1024; impl HostOutputStream for FileOutputStream { + fn as_any(&self) -> &dyn Any { + self + } + fn write(&mut self, buf: Bytes) -> Result<(), StreamError> { use system_interface::fs::FileIoExt; match self.state { diff --git a/crates/wasi/src/preview2/host/clocks.rs b/crates/wasi/src/preview2/host/clocks.rs index 2f16b9e10490..f67a319488b6 100644 --- a/crates/wasi/src/preview2/host/clocks.rs +++ b/crates/wasi/src/preview2/host/clocks.rs @@ -6,6 +6,7 @@ use crate::preview2::bindings::{ }; use crate::preview2::poll::{subscribe, Subscribe}; use crate::preview2::{Pollable, WasiView}; +use async_trait::async_trait; use cap_std::time::SystemTime; use std::time::Duration; use wasmtime::component::Resource; @@ -24,8 +25,9 @@ impl TryFrom for Datetime { } } +#[async_trait] impl wall_clock::Host for T { - fn now(&mut self) -> anyhow::Result { + async fn now(&mut self) -> anyhow::Result { let now = self.ctx().wall_clock.now(); Ok(Datetime { seconds: now.as_secs(), @@ -33,7 +35,7 @@ impl wall_clock::Host for T { }) } - fn resolution(&mut self) -> anyhow::Result { + async fn resolution(&mut self) -> anyhow::Result { let res = self.ctx().wall_clock.resolution(); Ok(Datetime { seconds: res.as_secs(), @@ -46,31 +48,35 @@ fn subscribe_to_duration( table: &mut wasmtime::component::ResourceTable, duration: tokio::time::Duration, ) -> anyhow::Result> { - let sleep = if duration.is_zero() { - table.push(Deadline::Past)? + let (sleep, suspend_until) = if duration.is_zero() { + (table.push(Deadline::Past)?, None) } else if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) { // NB: this resource created here is not actually exposed to wasm, it's // only an internal implementation detail used to match the signature // expected by `subscribe`. - table.push(Deadline::Instant(deadline))? + ( + table.push(Deadline::Instant(deadline))?, + Some(deadline.into()), + ) } else { // If the user specifies a time so far in the future we can't // represent it, wait forever rather than trap. - table.push(Deadline::Never)? + (table.push(Deadline::Never)?, None) }; - subscribe(table, sleep) + subscribe(table, sleep, suspend_until) } +#[async_trait] impl monotonic_clock::Host for T { - fn now(&mut self) -> anyhow::Result { + async fn now(&mut self) -> anyhow::Result { Ok(self.ctx().monotonic_clock.now()) } - fn resolution(&mut self) -> anyhow::Result { + async fn resolution(&mut self) -> anyhow::Result { Ok(self.ctx().monotonic_clock.resolution()) } - fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result> { + async fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result> { let clock_now = self.ctx().monotonic_clock.now(); let duration = if when > clock_now { Duration::from_nanos(when - clock_now) @@ -80,7 +86,10 @@ impl monotonic_clock::Host for T { subscribe_to_duration(&mut self.table_mut(), duration) } - fn subscribe_duration(&mut self, duration: WasiDuration) -> anyhow::Result> { + async fn subscribe_duration( + &mut self, + duration: WasiDuration, + ) -> anyhow::Result> { subscribe_to_duration(&mut self.table_mut(), Duration::from_nanos(duration)) } } diff --git a/crates/wasi/src/preview2/host/env.rs b/crates/wasi/src/preview2/host/env.rs index e765dbd817f0..af1567d9d962 100644 --- a/crates/wasi/src/preview2/host/env.rs +++ b/crates/wasi/src/preview2/host/env.rs @@ -1,14 +1,16 @@ use crate::preview2::bindings::cli::environment; use crate::preview2::WasiView; +use async_trait::async_trait; +#[async_trait] impl environment::Host for T { - fn get_environment(&mut self) -> anyhow::Result> { + async fn get_environment(&mut self) -> anyhow::Result> { Ok(self.ctx().env.clone()) } - fn get_arguments(&mut self) -> anyhow::Result> { + async fn get_arguments(&mut self) -> anyhow::Result> { Ok(self.ctx().args.clone()) } - fn initial_cwd(&mut self) -> anyhow::Result> { + async fn initial_cwd(&mut self) -> anyhow::Result> { // FIXME: expose cwd in builder and save in ctx Ok(None) } diff --git a/crates/wasi/src/preview2/host/io.rs b/crates/wasi/src/preview2/host/io.rs index d04747fc6ddd..499b216cfd18 100644 --- a/crates/wasi/src/preview2/host/io.rs +++ b/crates/wasi/src/preview2/host/io.rs @@ -49,7 +49,7 @@ impl streams::HostOutputStream for T { } fn subscribe(&mut self, stream: Resource) -> anyhow::Result> { - subscribe(self.table_mut(), stream) + subscribe(self.table_mut(), stream, None) } async fn blocking_write_and_flush( @@ -222,7 +222,7 @@ impl streams::HostInputStream for T { } fn subscribe(&mut self, stream: Resource) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table_mut(), stream) + crate::preview2::poll::subscribe(self.table_mut(), stream, None) } } diff --git a/crates/wasi/src/preview2/host/random.rs b/crates/wasi/src/preview2/host/random.rs index d2483b39f9b9..7b2684b2eb4e 100644 --- a/crates/wasi/src/preview2/host/random.rs +++ b/crates/wasi/src/preview2/host/random.rs @@ -1,35 +1,39 @@ use crate::preview2::bindings::random::{insecure, insecure_seed, random}; use crate::preview2::WasiView; +use async_trait::async_trait; use cap_rand::{distributions::Standard, Rng}; +#[async_trait] impl random::Host for T { - fn get_random_bytes(&mut self, len: u64) -> anyhow::Result> { + async fn get_random_bytes(&mut self, len: u64) -> anyhow::Result> { Ok((&mut self.ctx_mut().random) .sample_iter(Standard) .take(len as usize) .collect()) } - fn get_random_u64(&mut self) -> anyhow::Result { + async fn get_random_u64(&mut self) -> anyhow::Result { Ok(self.ctx_mut().random.sample(Standard)) } } +#[async_trait] impl insecure::Host for T { - fn get_insecure_random_bytes(&mut self, len: u64) -> anyhow::Result> { + async fn get_insecure_random_bytes(&mut self, len: u64) -> anyhow::Result> { Ok((&mut self.ctx_mut().insecure_random) .sample_iter(Standard) .take(len as usize) .collect()) } - fn get_insecure_random_u64(&mut self) -> anyhow::Result { + async fn get_insecure_random_u64(&mut self) -> anyhow::Result { Ok(self.ctx_mut().insecure_random.sample(Standard)) } } +#[async_trait] impl insecure_seed::Host for T { - fn insecure_seed(&mut self) -> anyhow::Result<(u64, u64)> { + async fn insecure_seed(&mut self) -> anyhow::Result<(u64, u64)> { let seed: u128 = self.ctx_mut().insecure_random_seed; Ok((seed as u64, (seed >> 64) as u64)) } diff --git a/crates/wasi/src/preview2/host/tcp.rs b/crates/wasi/src/preview2/host/tcp.rs index 67bd04429c64..24ad91e09a07 100644 --- a/crates/wasi/src/preview2/host/tcp.rs +++ b/crates/wasi/src/preview2/host/tcp.rs @@ -560,7 +560,7 @@ impl crate::preview2::host::tcp::tcp::HostTcpSocket for T { } fn subscribe(&mut self, this: Resource) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table_mut(), this) + crate::preview2::poll::subscribe(self.table_mut(), this, None) } fn shutdown( diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index 8b96a686a089..8e1de1fd0972 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -283,7 +283,7 @@ impl udp::HostUdpSocket for T { } fn subscribe(&mut self, this: Resource) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table_mut(), this) + crate::preview2::poll::subscribe(self.table_mut(), this, None) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { @@ -363,7 +363,7 @@ impl udp::HostIncomingDatagramStream for T { &mut self, this: Resource, ) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table_mut(), this) + crate::preview2::poll::subscribe(self.table_mut(), this, None) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { @@ -497,7 +497,7 @@ impl udp::HostOutgoingDatagramStream for T { &mut self, this: Resource, ) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table_mut(), this) + crate::preview2::poll::subscribe(self.table_mut(), this, None) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { diff --git a/crates/wasi/src/preview2/ip_name_lookup.rs b/crates/wasi/src/preview2/ip_name_lookup.rs index ab69f8f1bf9c..e3cd01f60207 100644 --- a/crates/wasi/src/preview2/ip_name_lookup.rs +++ b/crates/wasi/src/preview2/ip_name_lookup.rs @@ -69,7 +69,7 @@ impl HostResolveAddressStream for T { &mut self, resource: Resource, ) -> Result> { - subscribe(self.table_mut(), resource) + subscribe(self.table_mut(), resource, None) } fn drop(&mut self, resource: Resource) -> Result<()> { diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 16087504364c..09acf063317e 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -89,6 +89,7 @@ pub mod bindings { } }); } + pub use self::_internal::wasi::{filesystem, io}; } @@ -152,6 +153,18 @@ pub mod bindings { "poll", "[method]pollable.block", "[method]pollable.ready", + "get-random-bytes", + "get-random-u64", + "insecure-seed", + "get-insecure-random-bytes", + "get-insecure-random-u64", + "now", + "resolution", + "subscribe-instant", + "subscribe-duration", + "get-environment", + "get-arguments", + "initial-cwd" ], }, trappable_error_type: { @@ -190,27 +203,32 @@ pub(crate) static RUNTIME: once_cell::sync::Lazy = }); pub struct AbortOnDropJoinHandle(tokio::task::JoinHandle); + impl Drop for AbortOnDropJoinHandle { fn drop(&mut self) { self.0.abort() } } + impl std::ops::Deref for AbortOnDropJoinHandle { type Target = tokio::task::JoinHandle; fn deref(&self) -> &Self::Target { &self.0 } } + impl std::ops::DerefMut for AbortOnDropJoinHandle { fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle { &mut self.0 } } + impl From> for AbortOnDropJoinHandle { fn from(jh: tokio::task::JoinHandle) -> Self { AbortOnDropJoinHandle(jh) } } + impl Future for AbortOnDropJoinHandle { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/crates/wasi/src/preview2/pipe.rs b/crates/wasi/src/preview2/pipe.rs index bc8d20b1de56..c88b66365999 100644 --- a/crates/wasi/src/preview2/pipe.rs +++ b/crates/wasi/src/preview2/pipe.rs @@ -11,6 +11,7 @@ use crate::preview2::poll::Subscribe; use crate::preview2::{HostInputStream, HostOutputStream, StreamError}; use anyhow::anyhow; use bytes::Bytes; +use std::any::Any; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; @@ -76,6 +77,10 @@ impl MemoryOutputPipe { } impl HostOutputStream for MemoryOutputPipe { + fn as_any(&self) -> &dyn Any { + self + } + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let mut buf = self.buffer.lock().unwrap(); if bytes.len() > self.capacity - buf.len() { @@ -211,6 +216,10 @@ impl Subscribe for AsyncReadStream { pub struct SinkOutputStream; impl HostOutputStream for SinkOutputStream { + fn as_any(&self) -> &dyn Any { + self + } + fn write(&mut self, _buf: Bytes) -> Result<(), StreamError> { Ok(()) } @@ -251,6 +260,10 @@ impl Subscribe for ClosedInputStream { pub struct ClosedOutputStream; impl HostOutputStream for ClosedOutputStream { + fn as_any(&self) -> &dyn Any { + self + } + fn write(&mut self, _: Bytes) -> Result<(), StreamError> { Err(StreamError::Closed) } diff --git a/crates/wasi/src/preview2/poll.rs b/crates/wasi/src/preview2/poll.rs index 4fba2a9940e0..e85031fe05ae 100644 --- a/crates/wasi/src/preview2/poll.rs +++ b/crates/wasi/src/preview2/poll.rs @@ -6,8 +6,9 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use wasmtime::component::{Resource, ResourceTable}; +use std::time::Instant; -pub type PollableFuture<'a> = Pin + Send + 'a>>; +pub type PollableFuture<'a> = Pin + Send + 'a>>; pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>; pub type ClosureFuture = Box PollableFuture<'static> + Send + Sync + 'static>; @@ -21,6 +22,7 @@ pub struct Pollable { index: u32, make_future: MakeFuture, remove_index_on_delete: Option Result<()>>, + pub supports_suspend: Option, } #[async_trait::async_trait] @@ -35,13 +37,17 @@ pub trait Subscribe: Send + Sync + 'static { /// resource is deleted. Otherwise the returned resource is considered a "child" /// of the given `resource` which means that the given resource cannot be /// deleted while the `pollable` is still alive. -pub fn subscribe(table: &mut ResourceTable, resource: Resource) -> Result> +pub fn subscribe( + table: &mut ResourceTable, + resource: Resource, + supports_suspend: Option +) -> Result> where T: Subscribe, { fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a> - where - T: Subscribe, + where + T: Subscribe, { stream.downcast_mut::().unwrap().ready() } @@ -58,6 +64,7 @@ where None }, make_future: make_future::, + supports_suspend, }; Ok(table.push_child(pollable, &resource)?) @@ -68,22 +75,41 @@ impl poll::Host for T { async fn poll(&mut self, pollables: Vec>) -> Result> { type ReadylistIndex = u32; - let table = self.table_mut(); - let mut table_futures: HashMap)> = HashMap::new(); + let mut all_supports_suspend = Some(None); for (ix, p) in pollables.iter().enumerate() { let ix: u32 = ix.try_into()?; - let pollable = table.get(p)?; + let pollable = self.table_mut().get(p)?; let (_, list) = table_futures .entry(pollable.index) .or_insert((pollable.make_future, Vec::new())); list.push(ix); + + match pollable.supports_suspend { + None => { + all_supports_suspend = None; + } + Some(maximum_suspend_time) => { + all_supports_suspend = all_supports_suspend.map(|maybe_max| match maybe_max { + None => Some(maximum_suspend_time), + Some(max) => Some(std::cmp::min(max, maximum_suspend_time)), + }); + } + } } + if let Some(Some(deadline)) = all_supports_suspend { + let duration = deadline.duration_since(Instant::now()); + if duration >= self.ctx().suspend_threshold { + return Err((self.ctx().suspend_signal)(duration)); + } + } let mut futures: Vec<(PollableFuture<'_>, Vec)> = Vec::new(); - for (entry, (make_future, readylist_indices)) in table.iter_entries(table_futures) { + for (entry, (make_future, readylist_indices)) in + self.table_mut().iter_entries(table_futures) + { let entry = entry?; futures.push((make_future(entry), readylist_indices)); } diff --git a/crates/wasi/src/preview2/preview1.rs b/crates/wasi/src/preview2/preview1.rs index c692bf0b34b3..b098b2a07820 100644 --- a/crates/wasi/src/preview2/preview1.rs +++ b/crates/wasi/src/preview2/preview1.rs @@ -523,7 +523,8 @@ wiggle::from_witx!({ fd_filestat_set_times, fd_read, fd_pread, fd_seek, fd_sync, fd_readdir, fd_write, fd_pwrite, poll_oneoff, path_create_directory, path_filestat_get, path_filestat_set_times, path_link, path_open, path_readlink, path_remove_directory, - path_rename, path_symlink, path_unlink_file + path_rename, path_symlink, path_unlink_file, + random_get, clock_res_get, clock_time_get, args_get, args_sizes_get, environ_get, environ_sizes_get } }, errors: { errno => trappable Error }, @@ -542,7 +543,8 @@ mod sync { fd_filestat_set_times, fd_read, fd_pread, fd_seek, fd_sync, fd_readdir, fd_write, fd_pwrite, poll_oneoff, path_create_directory, path_filestat_get, path_filestat_set_times, path_link, path_open, path_readlink, path_remove_directory, - path_rename, path_symlink, path_unlink_file + path_rename, path_symlink, path_unlink_file, + random_get, clock_res_get, clock_time_get, args_get, args_sizes_get, environ_get, environ_sizes_get } }, errors: { errno => trappable Error }, @@ -861,12 +863,13 @@ impl< > wasi_snapshot_preview1::WasiSnapshotPreview1 for T { #[instrument(skip(self))] - fn args_get<'b>( + async fn args_get<'b>( &mut self, argv: &GuestPtr<'b, GuestPtr<'b, u8>>, argv_buf: &GuestPtr<'b, u8>, ) -> Result<(), types::Error> { self.get_arguments() + .await .context("failed to call `get-arguments`") .map_err(types::Error::trap)? .into_iter() @@ -883,9 +886,10 @@ impl< } #[instrument(skip(self))] - fn args_sizes_get(&mut self) -> Result<(types::Size, types::Size), types::Error> { + async fn args_sizes_get(&mut self) -> Result<(types::Size, types::Size), types::Error> { let args = self .get_arguments() + .await .context("failed to call `get-arguments`") .map_err(types::Error::trap)?; let num = args.len().try_into().map_err(|_| types::Errno::Overflow)?; @@ -899,12 +903,13 @@ impl< } #[instrument(skip(self))] - fn environ_get<'b>( + async fn environ_get<'b>( &mut self, environ: &GuestPtr<'b, GuestPtr<'b, u8>>, environ_buf: &GuestPtr<'b, u8>, ) -> Result<(), types::Error> { self.get_environment() + .await .context("failed to call `get-environment`") .map_err(types::Error::trap)? .into_iter() @@ -926,9 +931,10 @@ impl< } #[instrument(skip(self))] - fn environ_sizes_get(&mut self) -> Result<(types::Size, types::Size), types::Error> { + async fn environ_sizes_get(&mut self) -> Result<(types::Size, types::Size), types::Error> { let environ = self .get_environment() + .await .context("failed to call `get-environment`") .map_err(types::Error::trap)?; let num = environ.len().try_into()?; @@ -941,13 +947,18 @@ impl< } #[instrument(skip(self))] - fn clock_res_get(&mut self, id: types::Clockid) -> Result { + async fn clock_res_get( + &mut self, + id: types::Clockid, + ) -> Result { let res = match id { types::Clockid::Realtime => wall_clock::Host::resolution(self) + .await .context("failed to call `wall_clock::resolution`") .map_err(types::Error::trap)? .try_into()?, types::Clockid::Monotonic => monotonic_clock::Host::resolution(self) + .await .context("failed to call `monotonic_clock::resolution`") .map_err(types::Error::trap)?, types::Clockid::ProcessCputimeId | types::Clockid::ThreadCputimeId => { @@ -958,17 +969,19 @@ impl< } #[instrument(skip(self))] - fn clock_time_get( + async fn clock_time_get( &mut self, id: types::Clockid, _precision: types::Timestamp, ) -> Result { let now = match id { types::Clockid::Realtime => wall_clock::Host::now(self) + .await .context("failed to call `wall_clock::now`") .map_err(types::Error::trap)? .try_into()?, types::Clockid::Monotonic => monotonic_clock::Host::now(self) + .await .context("failed to call `monotonic_clock::now`") .map_err(types::Error::trap)?, types::Clockid::ProcessCputimeId | types::Clockid::ThreadCputimeId => { @@ -2064,6 +2077,7 @@ impl< types::Clockid::Realtime if !absolute => (timeout, false), types::Clockid::Realtime => { let now = wall_clock::Host::now(self) + .await .context("failed to call `wall_clock::now`") .map_err(types::Error::trap)?; @@ -2087,10 +2101,12 @@ impl< }; if absolute { monotonic_clock::Host::subscribe_instant(self, timeout) + .await .context("failed to call `monotonic_clock::subscribe_instant`") .map_err(types::Error::trap)? } else { monotonic_clock::Host::subscribe_duration(self, timeout) + .await .context("failed to call `monotonic_clock::subscribe_duration`") .map_err(types::Error::trap)? } @@ -2299,13 +2315,14 @@ impl< } #[instrument(skip(self))] - fn random_get<'a>( + async fn random_get<'a>( &mut self, buf: &GuestPtr<'a, u8>, buf_len: types::Size, ) -> Result<(), types::Error> { let rand = self .get_random_bytes(buf_len.into()) + .await .context("failed to call `get-random-bytes`") .map_err(types::Error::trap)?; write_bytes(buf, &rand)?; diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index f99a5d2a245c..a09f24902bf1 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -8,6 +8,7 @@ use crate::preview2::{ HostInputStream, HostOutputStream, StreamError, StreamResult, Subscribe, WasiView, }; use bytes::Bytes; +use std::any::Any; use std::io::IsTerminal; use wasmtime::component::Resource; @@ -155,6 +156,9 @@ enum OutputStream { } impl HostOutputStream for OutputStream { + fn as_any(&self) -> &dyn Any { + self + } fn write(&mut self, bytes: Bytes) -> StreamResult<()> { use std::io::Write; match self { diff --git a/crates/wasi/src/preview2/stream.rs b/crates/wasi/src/preview2/stream.rs index bf2ce502ba5c..a49aba8143a8 100644 --- a/crates/wasi/src/preview2/stream.rs +++ b/crates/wasi/src/preview2/stream.rs @@ -2,6 +2,7 @@ use crate::preview2::filesystem::FileInputStream; use crate::preview2::poll::Subscribe; use anyhow::Result; use bytes::Bytes; +use std::any::Any; /// Host trait for implementing the `wasi:io/streams.input-stream` resource: A /// bytestream which can be read from. @@ -81,7 +82,7 @@ impl From for StreamError { /// Host trait for implementing the `wasi:io/streams.output-stream` resource: /// A bytestream which can be written to. #[async_trait::async_trait] -pub trait HostOutputStream: Subscribe { +pub trait HostOutputStream: Subscribe + Any { /// Write bytes after obtaining a permit to write those bytes /// /// Prior to calling [`write`](Self::write) the caller must call @@ -153,6 +154,8 @@ pub trait HostOutputStream: Subscribe { self.ready().await; self.check_write() } + + fn as_any(&self) -> &dyn Any; } #[async_trait::async_trait] diff --git a/crates/wasi/src/preview2/tcp.rs b/crates/wasi/src/preview2/tcp.rs index fc766b167cf0..d7450812a2ee 100644 --- a/crates/wasi/src/preview2/tcp.rs +++ b/crates/wasi/src/preview2/tcp.rs @@ -8,6 +8,7 @@ use anyhow::{Error, Result}; use cap_net_ext::{AddressFamily, Blocking}; use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike}; use rustix::net::sockopt; +use std::any::Any; use std::io; use std::mem; use std::sync::Arc; @@ -185,6 +186,10 @@ impl TcpWriteStream { } impl HostOutputStream for TcpWriteStream { + fn as_any(&self) -> &dyn Any { + self + } + fn write(&mut self, mut bytes: bytes::Bytes) -> Result<(), StreamError> { match self.last_write { LastWrite::Done => {} diff --git a/crates/wasi/src/preview2/write_stream.rs b/crates/wasi/src/preview2/write_stream.rs index afc2305f007a..3692172adefa 100644 --- a/crates/wasi/src/preview2/write_stream.rs +++ b/crates/wasi/src/preview2/write_stream.rs @@ -1,6 +1,7 @@ use crate::preview2::{HostOutputStream, StreamError, Subscribe}; use anyhow::anyhow; use bytes::Bytes; +use std::any::Any; use std::sync::{Arc, Mutex}; #[derive(Debug)] @@ -162,6 +163,10 @@ impl AsyncWriteStream { } impl HostOutputStream for AsyncWriteStream { + fn as_any(&self) -> &dyn Any { + self + } + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let mut state = self.worker.state(); state.check_error()?; diff --git a/crates/wit-bindgen/src/lib.rs b/crates/wit-bindgen/src/lib.rs index a73f6bcca4e5..1a88d63412e9 100644 --- a/crates/wit-bindgen/src/lib.rs +++ b/crates/wit-bindgen/src/lib.rs @@ -25,6 +25,7 @@ macro_rules! uwriteln { mod rust; mod source; mod types; + use source::Source; #[derive(Clone)] @@ -998,7 +999,7 @@ impl<'a> InterfaceGenerator<'a> { FunctionKind::Method(resource) | FunctionKind::Static(resource) | FunctionKind::Constructor(resource) - if id == resource => {} + if id == resource => {} _ => continue, } @@ -1194,7 +1195,7 @@ impl<'a> InterfaceGenerator<'a> { fn print_rust_enum<'b>( &mut self, id: TypeId, - cases: impl IntoIterator, &'b Docs, Option<&'b Type>)> + Clone, + cases: impl IntoIterator, &'b Docs, Option<&'b Type>)> + Clone, docs: &Docs, derive_component: &str, ) where @@ -1278,7 +1279,7 @@ impl<'a> InterfaceGenerator<'a> { id: TypeId, mode: TypeMode, name: &str, - cases: impl IntoIterator)>, + cases: impl IntoIterator)>, ) where Self: Sized, { @@ -2067,7 +2068,7 @@ fn func_field_name(resolve: &Resolve, func: &Function) -> String { name.to_snake_case() } -fn get_resources<'a>(resolve: &'a Resolve, id: InterfaceId) -> impl Iterator + 'a { +fn get_resources<'a>(resolve: &'a Resolve, id: InterfaceId) -> impl Iterator + 'a { resolve.interfaces[id] .types .iter() @@ -2080,7 +2081,7 @@ fn get_resources<'a>(resolve: &'a Resolve, id: InterfaceId) -> impl Iterator( resolve: &'a Resolve, id: WorldId, -) -> impl Iterator + 'a { +) -> impl Iterator + 'a { resolve.worlds[id] .imports .iter() From dfdd3f0f0c61f4fbce69d8452ec2ae7a3da86dad Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 8 Jan 2024 13:25:31 +0100 Subject: [PATCH 02/12] Making the wasi-http handler function public --- crates/wasi-http/src/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index 2a9a92b98125..efc67775a95e 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -139,7 +139,7 @@ pub fn default_send_request( Ok(fut) } -async fn handler( +pub async fn handler( authority: String, use_tls: bool, connect_timeout: Duration, From 0b8218ac050b4660f3d8ec7f5ca198d59a515374 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 11 Jan 2024 16:24:23 +0100 Subject: [PATCH 03/12] More changes for wasi-http durability --- crates/wasi-http/src/body.rs | 2 + crates/wasi-http/src/lib.rs | 7 ++- crates/wasi-http/src/types.rs | 9 +++- crates/wasi-http/src/types_impl.rs | 45 +++++++++++++++++-- crates/wasi/src/preview2/pipe.rs | 12 +++++ .../src/preview2/stdio/worker_thread_stdin.rs | 4 ++ crates/wasi/src/preview2/stream.rs | 4 +- crates/wasi/src/preview2/tcp.rs | 4 ++ 8 files changed, 79 insertions(+), 8 deletions(-) diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index 8f0f8938e17c..a744e209c76c 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -204,6 +204,8 @@ impl HostInputStream for HostIncomingBodyStream { } } } + + fn as_any(&self) -> &dyn Any { self } } #[async_trait::async_trait] diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index e7218250a225..e1df49c7d75c 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -16,7 +16,12 @@ pub mod bindings { import wasi:http/types@0.2.0; ", tracing: true, - async: false, + async: { + only_imports: [ + "[method]future-incoming-response.get", + "[method]future-trailers.get", + ], + }, with: { "wasi:io/error": wasmtime_wasi::preview2::bindings::io::error, "wasi:io/streams": wasmtime_wasi::preview2::bindings::io::streams, diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index efc67775a95e..076898b04cf8 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -139,7 +139,7 @@ pub fn default_send_request( Ok(fut) } -pub async fn handler( +pub(crate) async fn handler( authority: String, use_tls: bool, connect_timeout: Duration, @@ -405,6 +405,7 @@ pub enum HostFutureIncomingResponse { Pending(FutureIncomingResponseHandle), Ready(anyhow::Result>), Consumed, + Deferred(OutgoingRequest) } impl HostFutureIncomingResponse { @@ -412,6 +413,10 @@ impl HostFutureIncomingResponse { Self::Pending(handle) } + pub fn deferred(request: OutgoingRequest) -> Self { + Self::Deferred(request) + } + pub fn is_ready(&self) -> bool { matches!(self, Self::Ready(_)) } @@ -421,7 +426,7 @@ impl HostFutureIncomingResponse { ) -> anyhow::Result> { match self { Self::Ready(res) => res, - Self::Pending(_) | Self::Consumed => { + Self::Pending(_) | Self::Consumed | Self::Deferred(_) => { panic!("unwrap_ready called on a pending HostFutureIncomingResponse") } } diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 6cf1d7546284..e0a5160fcb0b 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -1,3 +1,4 @@ +use crate::types::OutgoingRequest; use crate::{ bindings::http::types::{self, Headers, Method, Scheme, StatusCode, Trailers}, body::{HostFutureTrailers, HostIncomingBody, HostOutgoingBody, StreamContext}, @@ -8,7 +9,8 @@ use crate::{ }, WasiHttpView, }; -use anyhow::Context; +use anyhow::{anyhow, Context}; +use async_trait::async_trait; use std::any::Any; use std::str::FromStr; use wasmtime::component::{Resource, ResourceTable}; @@ -60,7 +62,7 @@ fn move_fields(table: &mut ResourceTable, id: Resource) -> wasmtime: } } -fn get_fields<'a>( +pub fn get_fields<'a>( table: &'a mut ResourceTable, id: &Resource, ) -> wasmtime::Result<&'a FieldMap> { @@ -631,6 +633,7 @@ impl crate::bindings::http::types::HostIncomingResponse for T { } } +#[async_trait] impl crate::bindings::http::types::HostFutureTrailers for T { fn drop(&mut self, id: Resource) -> wasmtime::Result<()> { let _ = self @@ -647,7 +650,7 @@ impl crate::bindings::http::types::HostFutureTrailers for T { wasmtime_wasi::preview2::subscribe(self.table(), index, None) } - fn get( + async fn get( &mut self, id: Resource, ) -> wasmtime::Result>, types::ErrorCode>, ()>>> @@ -798,13 +801,14 @@ impl crate::bindings::http::types::HostOutgoingResponse for T { } } +#[async_trait] impl crate::bindings::http::types::HostFutureIncomingResponse for T { fn drop(&mut self, id: Resource) -> wasmtime::Result<()> { let _ = self.table().delete(id)?; Ok(()) } - fn get( + async fn get( &mut self, id: Resource, ) -> wasmtime::Result< @@ -816,6 +820,39 @@ impl crate::bindings::http::types::HostFutureIncomingResponse f HostFutureIncomingResponse::Pending(_) => return Ok(None), HostFutureIncomingResponse::Consumed => return Ok(Some(Err(()))), HostFutureIncomingResponse::Ready(_) => {} + HostFutureIncomingResponse::Deferred(_) => { + let (tx, rx) = tokio::sync::oneshot::channel(); + let handle = wasmtime_wasi::preview2::spawn(async move { + let request = rx.await.map_err(|err| anyhow!(err))?; + let HostFutureIncomingResponse::Deferred(OutgoingRequest { + use_tls, + authority, + request, + connect_timeout, + first_byte_timeout, + between_bytes_timeout, + }) = request + else { + return Err(anyhow!("unexpected incoming response state".to_string())); + }; + let resp = crate::types::handler( + authority, + use_tls, + connect_timeout, + first_byte_timeout, + request, + between_bytes_timeout, + ) + .await; + Ok(resp) + }); + tx.send(std::mem::replace( + resp, + HostFutureIncomingResponse::Pending(handle), + )) + .map_err(|_| anyhow!("failed to send request to handler"))?; + return Ok(None); + } } let resp = diff --git a/crates/wasi/src/preview2/pipe.rs b/crates/wasi/src/preview2/pipe.rs index c88b66365999..7a6cd608277e 100644 --- a/crates/wasi/src/preview2/pipe.rs +++ b/crates/wasi/src/preview2/pipe.rs @@ -46,6 +46,10 @@ impl HostInputStream for MemoryInputPipe { let read = buffer.split_to(size); Ok(read) } + + fn as_any(&self) -> &dyn Any { + self + } } #[async_trait::async_trait] @@ -195,6 +199,10 @@ impl HostInputStream for AsyncReadStream { ))), } } + + fn as_any(&self) -> &dyn Any { + self + } } #[async_trait::async_trait] impl Subscribe for AsyncReadStream { @@ -248,6 +256,10 @@ impl HostInputStream for ClosedInputStream { fn read(&mut self, _size: usize) -> Result { Err(StreamError::Closed) } + + fn as_any(&self) -> &dyn Any { + self + } } #[async_trait::async_trait] diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index cb2f50f2c61b..e78b18238e30 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -145,6 +145,10 @@ impl HostInputStream for Stdin { } } } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } #[async_trait::async_trait] diff --git a/crates/wasi/src/preview2/stream.rs b/crates/wasi/src/preview2/stream.rs index a49aba8143a8..acb037d9b452 100644 --- a/crates/wasi/src/preview2/stream.rs +++ b/crates/wasi/src/preview2/stream.rs @@ -7,7 +7,7 @@ use std::any::Any; /// Host trait for implementing the `wasi:io/streams.input-stream` resource: A /// bytestream which can be read from. #[async_trait::async_trait] -pub trait HostInputStream: Subscribe { +pub trait HostInputStream: Subscribe + Any { /// Reads up to `size` bytes, returning a buffer holding these bytes on /// success. /// @@ -30,6 +30,8 @@ pub trait HostInputStream: Subscribe { let bs = self.read(nelem)?; Ok(bs.len()) } + + fn as_any(&self) -> &dyn Any; } /// Representation of the `error` resource type in the `wasi:io/error` diff --git a/crates/wasi/src/preview2/tcp.rs b/crates/wasi/src/preview2/tcp.rs index d7450812a2ee..3d3a1a6e2f17 100644 --- a/crates/wasi/src/preview2/tcp.rs +++ b/crates/wasi/src/preview2/tcp.rs @@ -124,6 +124,10 @@ impl HostInputStream for TcpReadStream { buf.truncate(n); Ok(buf.freeze()) } + + fn as_any(&self) -> &dyn Any { + self + } } #[async_trait::async_trait] From 5c365a9db7da54bea59dbf3000feb9a9ac0d7398 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 15 Jan 2024 12:50:35 +0100 Subject: [PATCH 04/12] Support failing response bodies --- crates/wasi-http/src/body.rs | 59 ++++++++++++++++++++++++++---- crates/wasi-http/src/types_impl.rs | 2 +- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index a744e209c76c..a8921bdbfb68 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -9,11 +9,9 @@ use std::mem; use std::task::{Context, Poll}; use std::{pin::Pin, sync::Arc, time::Duration}; use std::any::Any; +use async_trait::async_trait; use tokio::sync::{mpsc, oneshot}; -use wasmtime_wasi::preview2::{ - self, poll_noop, AbortOnDropJoinHandle, HostInputStream, HostOutputStream, StreamError, - Subscribe, -}; +use wasmtime_wasi::preview2::{self, poll_noop, AbortOnDropJoinHandle, HostInputStream, HostOutputStream, StreamError, Subscribe, StreamResult, InputStream}; pub type HyperIncomingBody = BoxBody; @@ -86,6 +84,16 @@ pub struct HostIncomingBody { worker: Option>>, } +impl std::fmt::Debug for HostIncomingBody { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.body { + IncomingBodyState::Start(_) => write!(f, "HostIncomingBody {{ Start }}"), + IncomingBodyState::InBodyStream(_) => write!(f, "HostIncomingBody {{ InBodyStream }}"), + IncomingBodyState::Failing(_) => write!(f, "HostIncomingBody {{ Failing }}"), + } + } +} + enum IncomingBodyState { /// The body is stored here meaning that within `HostIncomingBody` the /// `take_stream` method can be called for example. @@ -95,6 +103,8 @@ enum IncomingBodyState { /// currently owned here. The body will be sent back over this channel when /// it's done, however. InBodyStream(oneshot::Receiver), + + Failing(String), } /// Message sent when a `HostIncomingBodyStream` is done to the @@ -118,26 +128,35 @@ impl HostIncomingBody { } } + pub fn failing(error: String) -> HostIncomingBody { + HostIncomingBody { + body: IncomingBodyState::Failing(error), + worker: None, + } + } + pub fn retain_worker(&mut self, worker: &Arc>) { assert!(self.worker.is_none()); self.worker = Some(worker.clone()); } - pub fn take_stream(&mut self) -> Option { + pub fn take_stream(&mut self) -> Option { match &mut self.body { IncomingBodyState::Start(_) => {} + IncomingBodyState::Failing(error) => return Some(InputStream::Host(Box::new(FailingStream { error: error.clone() }))), IncomingBodyState::InBodyStream(_) => return None, } let (tx, rx) = oneshot::channel(); let body = match mem::replace(&mut self.body, IncomingBodyState::InBodyStream(rx)) { IncomingBodyState::Start(b) => b, IncomingBodyState::InBodyStream(_) => unreachable!(), + IncomingBodyState::Failing(_) => unreachable!(), }; - Some(HostIncomingBodyStream { + Some(InputStream::Host(Box::new(HostIncomingBodyStream { state: IncomingBodyStreamState::Open { body, tx }, buffer: Bytes::new(), error: None, - }) + }))) } pub fn into_future_trailers(self) -> HostFutureTrailers { @@ -345,9 +364,14 @@ impl Subscribe for HostFutureTrailers { HostFutureTrailers::Done(_) => return, HostFutureTrailers::Consumed => return, }; + if let IncomingBodyState::Failing(_) = &mut body.body { + *self = HostFutureTrailers::Done(Err(types::ErrorCode::ConnectionTerminated)); + return; + } let hyper_body = match &mut body.body { IncomingBodyState::Start(body) => body, IncomingBodyState::InBodyStream(_) => unreachable!(), + IncomingBodyState::Failing(_) => unreachable!(), }; let result = loop { match hyper_body.frame().await { @@ -639,3 +663,24 @@ impl Subscribe for BodyWriteStream { let _ = self.writer.reserve().await; } } + + +pub struct FailingStream { + error: String +} + +#[async_trait] +impl Subscribe for FailingStream { + async fn ready(&mut self) { + } +} + +impl HostInputStream for FailingStream { + fn read(&mut self, _size: usize) -> StreamResult { + Err(StreamError::trap(&self.error)) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index e0a5160fcb0b..8aa895934ceb 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -624,6 +624,7 @@ impl crate::bindings::http::types::HostIncomingResponse for T { match r.body.take() { Some(body) => { + println!("Got incoming body {body:?}"); let id = self.table().push(body)?; Ok(Ok(id)) } @@ -689,7 +690,6 @@ impl crate::bindings::http::types::HostIncomingBody for T { let body = self.table().get_mut(&id)?; if let Some(stream) = body.take_stream() { - let stream = InputStream::Host(Box::new(stream)); let stream = self.table().push_child(stream, &id)?; return Ok(Ok(stream)); } From e5172dea66bd4cca3c8118f097061511814edffe Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 15 Jan 2024 16:47:11 +0100 Subject: [PATCH 05/12] Removed println --- crates/wasi-http/src/types_impl.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 8aa895934ceb..de74394c265b 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -624,7 +624,6 @@ impl crate::bindings::http::types::HostIncomingResponse for T { match r.body.take() { Some(body) => { - println!("Got incoming body {body:?}"); let id = self.table().push(body)?; Ok(Ok(id)) } From 5aceb5f48117e4601105a4e0d26008e1bcd8109b Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 25 Jan 2024 17:48:52 +0100 Subject: [PATCH 06/12] get-directories made async and removed the preview1 support --- crates/wasi/src/preview2/host/filesystem.rs | 4 +- crates/wasi/src/preview2/mod.rs | 7 +- crates/wasi/src/preview2/preview1.rs | 2379 ------------------- 3 files changed, 5 insertions(+), 2385 deletions(-) delete mode 100644 crates/wasi/src/preview2/preview1.rs diff --git a/crates/wasi/src/preview2/host/filesystem.rs b/crates/wasi/src/preview2/host/filesystem.rs index 4a70efee51e3..02bd323d6132 100644 --- a/crates/wasi/src/preview2/host/filesystem.rs +++ b/crates/wasi/src/preview2/host/filesystem.rs @@ -9,11 +9,13 @@ use crate::preview2::filesystem::{FileInputStream, FileOutputStream}; use crate::preview2::{DirPerms, FilePerms, FsError, FsResult, WasiView}; use anyhow::Context; use wasmtime::component::{Resource, ResourceTable}; +use async_trait::async_trait; mod sync; +#[async_trait] impl preopens::Host for T { - fn get_directories( + async fn get_directories( &mut self, ) -> Result, String)>, anyhow::Error> { let mut results = Vec::new(); diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 09acf063317e..8c9fd90d7458 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -29,10 +29,6 @@ mod ip_name_lookup; mod network; pub mod pipe; mod poll; -#[cfg(feature = "preview1-on-preview2")] -pub mod preview0; -#[cfg(feature = "preview1-on-preview2")] -pub mod preview1; mod random; mod stdio; mod stream; @@ -164,7 +160,8 @@ pub mod bindings { "subscribe-duration", "get-environment", "get-arguments", - "initial-cwd" + "initial-cwd", + "get-directories" ], }, trappable_error_type: { diff --git a/crates/wasi/src/preview2/preview1.rs b/crates/wasi/src/preview2/preview1.rs deleted file mode 100644 index b098b2a07820..000000000000 --- a/crates/wasi/src/preview2/preview1.rs +++ /dev/null @@ -1,2379 +0,0 @@ -use crate::preview2::bindings::{ - self, - cli::{ - stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin, - terminal_stdout, - }, - clocks::{monotonic_clock, wall_clock}, - filesystem::{preopens, types as filesystem}, - io::{poll, streams}, -}; -use crate::preview2::{FsError, IsATTY, StreamError, StreamResult, WasiView}; -use anyhow::{bail, Context}; -use std::borrow::Borrow; -use std::collections::{BTreeMap, HashSet}; -use std::mem::{self, size_of, size_of_val}; -use std::ops::{Deref, DerefMut}; -use std::slice; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use wasmtime::component::Resource; -use wiggle::tracing::instrument; -use wiggle::{GuestError, GuestPtr, GuestStrCow, GuestType}; - -#[derive(Debug)] -struct File { - /// The handle to the preview2 descriptor that this file is referencing. - fd: Resource, - - /// The current-position pointer. - position: Arc, - - /// In append mode, all writes append to the file. - append: bool, - - /// When blocking, read and write calls dispatch to blocking_read and - /// blocking_check_write on the underlying streams. When false, read and write - /// dispatch to stream's plain read and check_write. - blocking_mode: BlockingMode, -} - -#[derive(Clone, Copy, Debug)] -enum BlockingMode { - Blocking, - NonBlocking, -} -impl BlockingMode { - fn from_fdflags(flags: &types::Fdflags) -> Self { - if flags.contains(types::Fdflags::NONBLOCK) { - BlockingMode::NonBlocking - } else { - BlockingMode::Blocking - } - } - async fn read( - &self, - host: &mut impl streams::Host, - input_stream: Resource, - max_size: usize, - ) -> Result, types::Error> { - let max_size = max_size.try_into().unwrap_or(u64::MAX); - match self { - BlockingMode::Blocking => { - match streams::HostInputStream::blocking_read(host, input_stream, max_size).await { - Ok(r) if r.is_empty() => Err(types::Errno::Intr.into()), - Ok(r) => Ok(r), - Err(StreamError::Closed) => Ok(Vec::new()), - Err(e) => Err(e.into()), - } - } - - BlockingMode::NonBlocking => { - match streams::HostInputStream::read(host, input_stream, max_size).await { - Ok(r) => Ok(r), - Err(StreamError::Closed) => Ok(Vec::new()), - Err(e) => Err(e.into()), - } - } - } - } - async fn write( - &self, - host: &mut (impl streams::Host + poll::Host), - output_stream: Resource, - bytes: GuestPtr<'_, [u8]>, - ) -> StreamResult { - use streams::HostOutputStream as Streams; - - let bytes = bytes.as_cow().map_err(|e| StreamError::Trap(e.into()))?; - let mut bytes = &bytes[..]; - - match self { - BlockingMode::Blocking => { - let total = bytes.len(); - while !bytes.is_empty() { - // NOTE: blocking_write_and_flush takes at most one 4k buffer. - let len = bytes.len().min(4096); - let (chunk, rest) = bytes.split_at(len); - bytes = rest; - - Streams::blocking_write_and_flush( - host, - output_stream.borrowed(), - Vec::from(chunk), - ) - .await? - } - - Ok(total) - } - BlockingMode::NonBlocking => { - let n = match Streams::check_write(host, output_stream.borrowed()) { - Ok(n) => n, - Err(StreamError::Closed) => 0, - Err(e) => Err(e)?, - }; - - let len = bytes.len().min(n as usize); - if len == 0 { - return Ok(0); - } - - match Streams::write(host, output_stream.borrowed(), bytes[..len].to_vec()) { - Ok(()) => {} - Err(StreamError::Closed) => return Ok(0), - Err(e) => Err(e)?, - } - - match Streams::blocking_flush(host, output_stream.borrowed()).await { - Ok(()) => {} - Err(StreamError::Closed) => return Ok(0), - Err(e) => Err(e)?, - }; - - Ok(len) - } - } - } -} - -#[derive(Debug)] -enum Descriptor { - Stdin { - stream: Resource, - isatty: IsATTY, - }, - Stdout { - stream: Resource, - isatty: IsATTY, - }, - Stderr { - stream: Resource, - isatty: IsATTY, - }, - PreopenDirectory((Resource, String)), - File(File), -} - -#[derive(Debug, Default)] -pub struct WasiPreview1Adapter { - descriptors: Option, -} - -#[derive(Debug, Default)] -struct Descriptors { - used: BTreeMap, - free: Vec, -} - -impl Deref for Descriptors { - type Target = BTreeMap; - - fn deref(&self) -> &Self::Target { - &self.used - } -} - -impl DerefMut for Descriptors { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.used - } -} - -impl Descriptors { - /// Initializes [Self] using `preopens` - fn new( - host: &mut (impl preopens::Host - + stdin::Host - + stdout::Host - + stderr::Host - + terminal_stdin::Host - + terminal_stdout::Host - + terminal_stderr::Host - + terminal_input::Host - + terminal_output::Host - + ?Sized), - ) -> Result { - let mut descriptors = Self::default(); - descriptors.push(Descriptor::Stdin { - stream: host - .get_stdin() - .context("failed to call `get-stdin`") - .map_err(types::Error::trap)?, - isatty: if let Some(term_in) = host - .get_terminal_stdin() - .context("failed to call `get-terminal-stdin`") - .map_err(types::Error::trap)? - { - terminal_input::HostTerminalInput::drop(host, term_in) - .context("failed to call `drop-terminal-input`") - .map_err(types::Error::trap)?; - IsATTY::Yes - } else { - IsATTY::No - }, - })?; - descriptors.push(Descriptor::Stdout { - stream: host - .get_stdout() - .context("failed to call `get-stdout`") - .map_err(types::Error::trap)?, - isatty: if let Some(term_out) = host - .get_terminal_stdout() - .context("failed to call `get-terminal-stdout`") - .map_err(types::Error::trap)? - { - terminal_output::HostTerminalOutput::drop(host, term_out) - .context("failed to call `drop-terminal-output`") - .map_err(types::Error::trap)?; - IsATTY::Yes - } else { - IsATTY::No - }, - })?; - descriptors.push(Descriptor::Stderr { - stream: host - .get_stderr() - .context("failed to call `get-stderr`") - .map_err(types::Error::trap)?, - isatty: if let Some(term_out) = host - .get_terminal_stderr() - .context("failed to call `get-terminal-stderr`") - .map_err(types::Error::trap)? - { - terminal_output::HostTerminalOutput::drop(host, term_out) - .context("failed to call `drop-terminal-output`") - .map_err(types::Error::trap)?; - IsATTY::Yes - } else { - IsATTY::No - }, - })?; - - for dir in host - .get_directories() - .context("failed to call `get-directories`") - .map_err(types::Error::trap)? - { - descriptors.push(Descriptor::PreopenDirectory((dir.0, dir.1)))?; - } - Ok(descriptors) - } - - /// Returns next descriptor number, which was never assigned - fn unused(&self) -> Result { - match self.last_key_value() { - Some((fd, _)) => { - if let Some(fd) = fd.checked_add(1) { - return Ok(fd); - } - if self.len() == u32::MAX as usize { - return Err(types::Errno::Loop.into()); - } - // TODO: Optimize - Ok((0..u32::MAX) - .rev() - .find(|fd| !self.contains_key(fd)) - .expect("failed to find an unused file descriptor")) - } - None => Ok(0), - } - } - - /// Removes the [Descriptor] corresponding to `fd` - fn remove(&mut self, fd: types::Fd) -> Option { - let fd = fd.into(); - let desc = self.used.remove(&fd)?; - self.free.push(fd); - Some(desc) - } - - /// Pushes the [Descriptor] returning corresponding number. - /// This operation will try to reuse numbers previously removed via [`Self::remove`] - /// and rely on [`Self::unused`] if no free numbers are recorded - fn push(&mut self, desc: Descriptor) -> Result { - let fd = if let Some(fd) = self.free.pop() { - fd - } else { - self.unused()? - }; - assert!(self.insert(fd, desc).is_none()); - Ok(fd) - } - - /// Like [Self::push], but for [`File`] - fn push_file(&mut self, file: File) -> Result { - self.push(Descriptor::File(file)) - } -} - -impl WasiPreview1Adapter { - pub fn new() -> Self { - Self::default() - } -} - -// Any context that needs to support preview 1 will impl this trait. They can -// construct the needed member with WasiPreview1Adapter::new(). -pub trait WasiPreview1View: WasiView { - fn adapter(&self) -> &WasiPreview1Adapter; - fn adapter_mut(&mut self) -> &mut WasiPreview1Adapter; -} - -/// A mutably-borrowed [`WasiPreview1View`] implementation, which provides access to the stored -/// state. It can be thought of as an in-flight [`WasiPreview1Adapter`] transaction, all -/// changes will be recorded in the underlying [`WasiPreview1Adapter`] returned by -/// [`WasiPreview1View::adapter_mut`] on [`Drop`] of this struct. -// NOTE: This exists for the most part just due to the fact that `bindgen` generates methods with -// `&mut self` receivers and so this struct lets us extend the lifetime of the `&mut self` borrow -// of the [`WasiPreview1View`] to provide means to return mutably and immutably borrowed [`Descriptors`] -// without having to rely on something like `Arc>`, while also being able to -// call methods like [`Descriptor::is_file`] and hiding complexity from preview1 method implementations. -struct Transaction<'a, T: WasiPreview1View + ?Sized> { - view: &'a mut T, - descriptors: Descriptors, -} - -impl Drop for Transaction<'_, T> { - /// Record changes in the [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] - fn drop(&mut self) { - let descriptors = mem::take(&mut self.descriptors); - self.view.adapter_mut().descriptors = Some(descriptors); - } -} - -impl Transaction<'_, T> { - /// Borrows [`Descriptor`] corresponding to `fd`. - /// - /// # Errors - /// - /// Returns [`types::Errno::Badf`] if no [`Descriptor`] is found - fn get_descriptor(&self, fd: types::Fd) -> Result<&Descriptor> { - let fd = fd.into(); - let desc = self.descriptors.get(&fd).ok_or(types::Errno::Badf)?; - Ok(desc) - } - - /// Borrows [`File`] corresponding to `fd` - /// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type - fn get_file(&self, fd: types::Fd) -> Result<&File> { - let fd = fd.into(); - match self.descriptors.get(&fd) { - Some(Descriptor::File(file @ File { fd, .. })) => { - self.view.table().get(fd)?.file()?; - Ok(file) - } - _ => Err(types::Errno::Badf.into()), - } - } - - /// Mutably borrows [`File`] corresponding to `fd` - /// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type - fn get_file_mut(&mut self, fd: types::Fd) -> Result<&mut File> { - let fd = fd.into(); - match self.descriptors.get_mut(&fd) { - Some(Descriptor::File(file)) => { - self.view.table().get(&file.fd)?.file()?; - Ok(file) - } - _ => Err(types::Errno::Badf.into()), - } - } - - /// Borrows [`File`] corresponding to `fd` - /// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type. - /// - /// # Errors - /// - /// Returns [`types::Errno::Spipe`] if the descriptor corresponds to stdio - fn get_seekable(&self, fd: types::Fd) -> Result<&File> { - let fd = fd.into(); - match self.descriptors.get(&fd) { - Some(Descriptor::File(file @ File { fd, .. })) - if self.view.table().get(fd)?.is_file() => - { - Ok(file) - } - Some( - Descriptor::Stdin { .. } | Descriptor::Stdout { .. } | Descriptor::Stderr { .. }, - ) => { - // NOTE: legacy implementation returns SPIPE here - Err(types::Errno::Spipe.into()) - } - _ => Err(types::Errno::Badf.into()), - } - } - - /// Returns [`filesystem::Descriptor`] corresponding to `fd` - fn get_fd(&self, fd: types::Fd) -> Result> { - match self.get_descriptor(fd)? { - Descriptor::File(File { fd, .. }) => Ok(fd.borrowed()), - Descriptor::PreopenDirectory((fd, _)) => Ok(fd.borrowed()), - Descriptor::Stdin { .. } | Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => { - Err(types::Errno::Badf.into()) - } - } - } - - /// Returns [`filesystem::Descriptor`] corresponding to `fd` - /// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type - fn get_file_fd(&self, fd: types::Fd) -> Result> { - self.get_file(fd).map(|File { fd, .. }| fd.borrowed()) - } - - /// Returns [`filesystem::Descriptor`] corresponding to `fd` - /// if it describes a [`Descriptor::File`] or [`Descriptor::PreopenDirectory`] - /// of [`crate::preview2::filesystem::Dir`] type - fn get_dir_fd(&self, fd: types::Fd) -> Result> { - let fd = fd.into(); - match self.descriptors.get(&fd) { - Some(Descriptor::File(File { fd, .. })) if self.view.table().get(fd)?.is_dir() => { - Ok(fd.borrowed()) - } - Some(Descriptor::PreopenDirectory((fd, _))) => Ok(fd.borrowed()), - _ => Err(types::Errno::Badf.into()), - } - } -} - -trait WasiPreview1ViewExt: - WasiPreview1View - + preopens::Host - + stdin::Host - + stdout::Host - + stderr::Host - + terminal_input::Host - + terminal_output::Host - + terminal_stdin::Host - + terminal_stdout::Host - + terminal_stderr::Host -{ - /// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] - /// and returns [`Transaction`] on success - fn transact(&mut self) -> Result, types::Error> { - let descriptors = if let Some(descriptors) = self.adapter_mut().descriptors.take() { - descriptors - } else { - Descriptors::new(self)? - } - .into(); - Ok(Transaction { - view: self, - descriptors, - }) - } - - /// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] - /// and returns [`filesystem::Descriptor`] corresponding to `fd` - fn get_fd(&mut self, fd: types::Fd) -> Result, types::Error> { - let st = self.transact()?; - let fd = st.get_fd(fd)?; - Ok(fd) - } - - /// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] - /// and returns [`filesystem::Descriptor`] corresponding to `fd` - /// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type - fn get_file_fd( - &mut self, - fd: types::Fd, - ) -> Result, types::Error> { - let st = self.transact()?; - let fd = st.get_file_fd(fd)?; - Ok(fd) - } - - /// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`] - /// and returns [`filesystem::Descriptor`] corresponding to `fd` - /// if it describes a [`Descriptor::File`] or [`Descriptor::PreopenDirectory`] - /// of [`crate::preview2::filesystem::Dir`] type - fn get_dir_fd( - &mut self, - fd: types::Fd, - ) -> Result, types::Error> { - let st = self.transact()?; - let fd = st.get_dir_fd(fd)?; - Ok(fd) - } -} - -impl WasiPreview1ViewExt for T {} - -pub fn add_to_linker_async( - linker: &mut wasmtime::Linker, -) -> anyhow::Result<()> { - wasi_snapshot_preview1::add_to_linker(linker, |t| t) -} - -pub fn add_to_linker_sync( - linker: &mut wasmtime::Linker, -) -> anyhow::Result<()> { - sync::add_wasi_snapshot_preview1_to_linker(linker, |t| t) -} - -// Generate the wasi_snapshot_preview1::WasiSnapshotPreview1 trait, -// and the module types. -// None of the generated modules, traits, or types should be used externally -// to this module. -wiggle::from_witx!({ - witx: ["$CARGO_MANIFEST_DIR/witx/preview1/wasi_snapshot_preview1.witx"], - async: { - wasi_snapshot_preview1::{ - fd_advise, fd_close, fd_datasync, fd_fdstat_get, fd_filestat_get, fd_filestat_set_size, - fd_filestat_set_times, fd_read, fd_pread, fd_seek, fd_sync, fd_readdir, fd_write, - fd_pwrite, poll_oneoff, path_create_directory, path_filestat_get, - path_filestat_set_times, path_link, path_open, path_readlink, path_remove_directory, - path_rename, path_symlink, path_unlink_file, - random_get, clock_res_get, clock_time_get, args_get, args_sizes_get, environ_get, environ_sizes_get - } - }, - errors: { errno => trappable Error }, -}); - -mod sync { - use anyhow::Result; - use std::future::Future; - - wiggle::wasmtime_integration!({ - witx: ["$CARGO_MANIFEST_DIR/witx/preview1/wasi_snapshot_preview1.witx"], - target: super, - block_on[in_tokio]: { - wasi_snapshot_preview1::{ - fd_advise, fd_close, fd_datasync, fd_fdstat_get, fd_filestat_get, fd_filestat_set_size, - fd_filestat_set_times, fd_read, fd_pread, fd_seek, fd_sync, fd_readdir, fd_write, - fd_pwrite, poll_oneoff, path_create_directory, path_filestat_get, - path_filestat_set_times, path_link, path_open, path_readlink, path_remove_directory, - path_rename, path_symlink, path_unlink_file, - random_get, clock_res_get, clock_time_get, args_get, args_sizes_get, environ_get, environ_sizes_get - } - }, - errors: { errno => trappable Error }, - }); - - // Small wrapper around `in_tokio` to add a `Result` layer which is always - // `Ok` - fn in_tokio(future: F) -> Result { - Ok(crate::preview2::in_tokio(future)) - } -} - -impl wiggle::GuestErrorType for types::Errno { - fn success() -> Self { - Self::Success - } -} - -impl From for types::Error { - fn from(err: StreamError) -> Self { - match err { - StreamError::Closed => types::Errno::Io.into(), - StreamError::LastOperationFailed(e) => match e.downcast::() { - Ok(err) => filesystem::ErrorCode::from(err).into(), - Err(e) => { - log::debug!("dropping error {e:?}"); - types::Errno::Io.into() - } - }, - StreamError::Trap(e) => types::Error::trap(e), - } - } -} - -impl From for types::Error { - fn from(err: FsError) -> Self { - match err.downcast() { - Ok(code) => code.into(), - Err(e) => types::Error::trap(e), - } - } -} - -fn systimespec(set: bool, ts: types::Timestamp, now: bool) -> Result { - if set && now { - Err(types::Errno::Inval.into()) - } else if set { - Ok(filesystem::NewTimestamp::Timestamp(filesystem::Datetime { - seconds: ts / 1_000_000_000, - nanoseconds: (ts % 1_000_000_000) as _, - })) - } else if now { - Ok(filesystem::NewTimestamp::Now) - } else { - Ok(filesystem::NewTimestamp::NoChange) - } -} - -impl TryFrom for types::Timestamp { - type Error = types::Errno; - - fn try_from( - wall_clock::Datetime { - seconds, - nanoseconds, - }: wall_clock::Datetime, - ) -> Result { - types::Timestamp::from(seconds) - .checked_mul(1_000_000_000) - .and_then(|ns| ns.checked_add(nanoseconds.into())) - .ok_or(types::Errno::Overflow) - } -} - -impl From for filesystem::PathFlags { - fn from(flags: types::Lookupflags) -> Self { - if flags.contains(types::Lookupflags::SYMLINK_FOLLOW) { - filesystem::PathFlags::SYMLINK_FOLLOW - } else { - filesystem::PathFlags::empty() - } - } -} - -impl From for filesystem::OpenFlags { - fn from(flags: types::Oflags) -> Self { - let mut out = filesystem::OpenFlags::empty(); - if flags.contains(types::Oflags::CREAT) { - out |= filesystem::OpenFlags::CREATE; - } - if flags.contains(types::Oflags::DIRECTORY) { - out |= filesystem::OpenFlags::DIRECTORY; - } - if flags.contains(types::Oflags::EXCL) { - out |= filesystem::OpenFlags::EXCLUSIVE; - } - if flags.contains(types::Oflags::TRUNC) { - out |= filesystem::OpenFlags::TRUNCATE; - } - out - } -} - -impl From for filesystem::Advice { - fn from(advice: types::Advice) -> Self { - match advice { - types::Advice::Normal => filesystem::Advice::Normal, - types::Advice::Sequential => filesystem::Advice::Sequential, - types::Advice::Random => filesystem::Advice::Random, - types::Advice::Willneed => filesystem::Advice::WillNeed, - types::Advice::Dontneed => filesystem::Advice::DontNeed, - types::Advice::Noreuse => filesystem::Advice::NoReuse, - } - } -} - -impl TryFrom for types::Filetype { - type Error = anyhow::Error; - - fn try_from(ty: filesystem::DescriptorType) -> Result { - match ty { - filesystem::DescriptorType::RegularFile => Ok(types::Filetype::RegularFile), - filesystem::DescriptorType::Directory => Ok(types::Filetype::Directory), - filesystem::DescriptorType::BlockDevice => Ok(types::Filetype::BlockDevice), - filesystem::DescriptorType::CharacterDevice => Ok(types::Filetype::CharacterDevice), - // preview1 never had a FIFO code. - filesystem::DescriptorType::Fifo => Ok(types::Filetype::Unknown), - // TODO: Add a way to disginguish between FILETYPE_SOCKET_STREAM and - // FILETYPE_SOCKET_DGRAM. - filesystem::DescriptorType::Socket => { - bail!("sockets are not currently supported") - } - filesystem::DescriptorType::SymbolicLink => Ok(types::Filetype::SymbolicLink), - filesystem::DescriptorType::Unknown => Ok(types::Filetype::Unknown), - } - } -} - -impl From for types::Filetype { - fn from(isatty: IsATTY) -> Self { - match isatty { - IsATTY::Yes => types::Filetype::CharacterDevice, - IsATTY::No => types::Filetype::Unknown, - } - } -} - -impl From for types::Errno { - fn from(code: filesystem::ErrorCode) -> Self { - match code { - filesystem::ErrorCode::Access => types::Errno::Acces, - filesystem::ErrorCode::WouldBlock => types::Errno::Again, - filesystem::ErrorCode::Already => types::Errno::Already, - filesystem::ErrorCode::BadDescriptor => types::Errno::Badf, - filesystem::ErrorCode::Busy => types::Errno::Busy, - filesystem::ErrorCode::Deadlock => types::Errno::Deadlk, - filesystem::ErrorCode::Quota => types::Errno::Dquot, - filesystem::ErrorCode::Exist => types::Errno::Exist, - filesystem::ErrorCode::FileTooLarge => types::Errno::Fbig, - filesystem::ErrorCode::IllegalByteSequence => types::Errno::Ilseq, - filesystem::ErrorCode::InProgress => types::Errno::Inprogress, - filesystem::ErrorCode::Interrupted => types::Errno::Intr, - filesystem::ErrorCode::Invalid => types::Errno::Inval, - filesystem::ErrorCode::Io => types::Errno::Io, - filesystem::ErrorCode::IsDirectory => types::Errno::Isdir, - filesystem::ErrorCode::Loop => types::Errno::Loop, - filesystem::ErrorCode::TooManyLinks => types::Errno::Mlink, - filesystem::ErrorCode::MessageSize => types::Errno::Msgsize, - filesystem::ErrorCode::NameTooLong => types::Errno::Nametoolong, - filesystem::ErrorCode::NoDevice => types::Errno::Nodev, - filesystem::ErrorCode::NoEntry => types::Errno::Noent, - filesystem::ErrorCode::NoLock => types::Errno::Nolck, - filesystem::ErrorCode::InsufficientMemory => types::Errno::Nomem, - filesystem::ErrorCode::InsufficientSpace => types::Errno::Nospc, - filesystem::ErrorCode::Unsupported => types::Errno::Notsup, - filesystem::ErrorCode::NotDirectory => types::Errno::Notdir, - filesystem::ErrorCode::NotEmpty => types::Errno::Notempty, - filesystem::ErrorCode::NotRecoverable => types::Errno::Notrecoverable, - filesystem::ErrorCode::NoTty => types::Errno::Notty, - filesystem::ErrorCode::NoSuchDevice => types::Errno::Nxio, - filesystem::ErrorCode::Overflow => types::Errno::Overflow, - filesystem::ErrorCode::NotPermitted => types::Errno::Perm, - filesystem::ErrorCode::Pipe => types::Errno::Pipe, - filesystem::ErrorCode::ReadOnly => types::Errno::Rofs, - filesystem::ErrorCode::InvalidSeek => types::Errno::Spipe, - filesystem::ErrorCode::TextFileBusy => types::Errno::Txtbsy, - filesystem::ErrorCode::CrossDevice => types::Errno::Xdev, - } - } -} - -impl From for types::Error { - fn from(_: std::num::TryFromIntError) -> Self { - types::Errno::Overflow.into() - } -} - -impl From for types::Error { - fn from(err: GuestError) -> Self { - use wiggle::GuestError::*; - match err { - InvalidFlagValue { .. } => types::Errno::Inval.into(), - InvalidEnumValue { .. } => types::Errno::Inval.into(), - // As per - // https://github.com/WebAssembly/wasi/blob/main/legacy/tools/witx-docs.md#pointers - // - // > If a misaligned pointer is passed to a function, the function - // > shall trap. - // > - // > If an out-of-bounds pointer is passed to a function and the - // > function needs to dereference it, the function shall trap. - // - // so this turns OOB and misalignment errors into traps. - PtrOverflow { .. } | PtrOutOfBounds { .. } | PtrNotAligned { .. } => { - types::Error::trap(err.into()) - } - PtrBorrowed { .. } => types::Errno::Fault.into(), - InvalidUtf8 { .. } => types::Errno::Ilseq.into(), - TryFromIntError { .. } => types::Errno::Overflow.into(), - SliceLengthsDiffer { .. } => types::Errno::Fault.into(), - BorrowCheckerOutOfHandles { .. } => types::Errno::Fault.into(), - InFunc { err, .. } => types::Error::from(*err), - } - } -} - -impl From for types::Error { - fn from(code: filesystem::ErrorCode) -> Self { - types::Errno::from(code).into() - } -} - -impl From for types::Error { - fn from(err: wasmtime::component::ResourceTableError) -> Self { - types::Error::trap(err.into()) - } -} - -type Result = std::result::Result; - -fn write_bytes<'a>( - ptr: impl Borrow>, - buf: &[u8], -) -> Result, types::Error> { - // NOTE: legacy implementation always returns Inval errno - - let buf = buf.as_ref(); - let len = buf.len().try_into()?; - - let ptr = ptr.borrow(); - ptr.as_array(len).copy_from_slice(buf)?; - let next = ptr.add(len)?; - Ok(next) -} - -fn write_byte<'a>(ptr: impl Borrow>, byte: u8) -> Result> { - let ptr = ptr.borrow(); - ptr.write(byte)?; - let next = ptr.add(1)?; - Ok(next) -} - -fn read_str<'a>(ptr: impl Borrow>) -> Result> { - let s = ptr.borrow().as_cow()?; - Ok(s) -} - -fn read_string<'a>(ptr: impl Borrow>) -> Result { - read_str(ptr).map(|s| s.to_string()) -} - -// Find first non-empty buffer. -fn first_non_empty_ciovec<'a, 'b>( - ciovs: &'a types::CiovecArray<'b>, -) -> Result>> { - for iov in ciovs.iter() { - let iov = iov?.read()?; - if iov.buf_len == 0 { - continue; - } - return Ok(Some(iov.buf.as_array(iov.buf_len))); - } - Ok(None) -} - -// Find first non-empty buffer. -fn first_non_empty_iovec<'a>(iovs: &types::IovecArray<'a>) -> Result>> { - iovs.iter() - .map(|iov| { - let iov = iov?.read()?; - if iov.buf_len == 0 { - return Ok(None); - } - let slice = iov.buf.as_array(iov.buf_len); - Ok(Some(slice)) - }) - .find_map(Result::transpose) - .transpose() -} - -#[async_trait::async_trait] -// Implement the WasiSnapshotPreview1 trait using only the traits that are -// required for T, i.e., in terms of the preview 2 wit interface, and state -// stored in the WasiPreview1Adapter struct. -impl< - T: WasiPreview1View - + bindings::cli::environment::Host - + bindings::cli::exit::Host - + bindings::filesystem::preopens::Host - + bindings::filesystem::types::Host - + bindings::io::poll::Host - + bindings::random::random::Host - + bindings::io::streams::Host - + bindings::clocks::monotonic_clock::Host - + bindings::clocks::wall_clock::Host, - > wasi_snapshot_preview1::WasiSnapshotPreview1 for T -{ - #[instrument(skip(self))] - async fn args_get<'b>( - &mut self, - argv: &GuestPtr<'b, GuestPtr<'b, u8>>, - argv_buf: &GuestPtr<'b, u8>, - ) -> Result<(), types::Error> { - self.get_arguments() - .await - .context("failed to call `get-arguments`") - .map_err(types::Error::trap)? - .into_iter() - .try_fold((*argv, *argv_buf), |(argv, argv_buf), arg| -> Result<_> { - argv.write(argv_buf)?; - let argv = argv.add(1)?; - - let argv_buf = write_bytes(argv_buf, arg.as_bytes())?; - let argv_buf = write_byte(argv_buf, 0)?; - - Ok((argv, argv_buf)) - })?; - Ok(()) - } - - #[instrument(skip(self))] - async fn args_sizes_get(&mut self) -> Result<(types::Size, types::Size), types::Error> { - let args = self - .get_arguments() - .await - .context("failed to call `get-arguments`") - .map_err(types::Error::trap)?; - let num = args.len().try_into().map_err(|_| types::Errno::Overflow)?; - let len = args - .iter() - .map(|buf| buf.len() + 1) // Each argument is expected to be `\0` terminated. - .sum::() - .try_into() - .map_err(|_| types::Errno::Overflow)?; - Ok((num, len)) - } - - #[instrument(skip(self))] - async fn environ_get<'b>( - &mut self, - environ: &GuestPtr<'b, GuestPtr<'b, u8>>, - environ_buf: &GuestPtr<'b, u8>, - ) -> Result<(), types::Error> { - self.get_environment() - .await - .context("failed to call `get-environment`") - .map_err(types::Error::trap)? - .into_iter() - .try_fold( - (*environ, *environ_buf), - |(environ, environ_buf), (k, v)| -> Result<_, types::Error> { - environ.write(environ_buf)?; - let environ = environ.add(1)?; - - let environ_buf = write_bytes(environ_buf, k.as_bytes())?; - let environ_buf = write_byte(environ_buf, b'=')?; - let environ_buf = write_bytes(environ_buf, v.as_bytes())?; - let environ_buf = write_byte(environ_buf, 0)?; - - Ok((environ, environ_buf)) - }, - )?; - Ok(()) - } - - #[instrument(skip(self))] - async fn environ_sizes_get(&mut self) -> Result<(types::Size, types::Size), types::Error> { - let environ = self - .get_environment() - .await - .context("failed to call `get-environment`") - .map_err(types::Error::trap)?; - let num = environ.len().try_into()?; - let len = environ - .iter() - .map(|(k, v)| k.len() + 1 + v.len() + 1) // Key/value pairs are expected to be joined with `=`s, and terminated with `\0`s. - .sum::() - .try_into()?; - Ok((num, len)) - } - - #[instrument(skip(self))] - async fn clock_res_get( - &mut self, - id: types::Clockid, - ) -> Result { - let res = match id { - types::Clockid::Realtime => wall_clock::Host::resolution(self) - .await - .context("failed to call `wall_clock::resolution`") - .map_err(types::Error::trap)? - .try_into()?, - types::Clockid::Monotonic => monotonic_clock::Host::resolution(self) - .await - .context("failed to call `monotonic_clock::resolution`") - .map_err(types::Error::trap)?, - types::Clockid::ProcessCputimeId | types::Clockid::ThreadCputimeId => { - return Err(types::Errno::Badf.into()) - } - }; - Ok(res) - } - - #[instrument(skip(self))] - async fn clock_time_get( - &mut self, - id: types::Clockid, - _precision: types::Timestamp, - ) -> Result { - let now = match id { - types::Clockid::Realtime => wall_clock::Host::now(self) - .await - .context("failed to call `wall_clock::now`") - .map_err(types::Error::trap)? - .try_into()?, - types::Clockid::Monotonic => monotonic_clock::Host::now(self) - .await - .context("failed to call `monotonic_clock::now`") - .map_err(types::Error::trap)?, - types::Clockid::ProcessCputimeId | types::Clockid::ThreadCputimeId => { - return Err(types::Errno::Badf.into()) - } - }; - Ok(now) - } - - #[instrument(skip(self))] - async fn fd_advise( - &mut self, - fd: types::Fd, - offset: types::Filesize, - len: types::Filesize, - advice: types::Advice, - ) -> Result<(), types::Error> { - let fd = self.get_file_fd(fd)?; - self.advise(fd, offset, len, advice.into()) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `advise`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Force the allocation of space in a file. - /// NOTE: This is similar to `posix_fallocate` in POSIX. - #[instrument(skip(self))] - fn fd_allocate( - &mut self, - fd: types::Fd, - _offset: types::Filesize, - _len: types::Filesize, - ) -> Result<(), types::Error> { - self.get_file_fd(fd)?; - Err(types::Errno::Notsup.into()) - } - - /// Close a file descriptor. - /// NOTE: This is similar to `close` in POSIX. - #[instrument(skip(self))] - async fn fd_close(&mut self, fd: types::Fd) -> Result<(), types::Error> { - let desc = self - .transact()? - .descriptors - .remove(fd) - .ok_or(types::Errno::Badf)?; - match desc { - Descriptor::Stdin { stream, .. } => streams::HostInputStream::drop(self, stream) - .context("failed to call `drop` on `input-stream`"), - Descriptor::Stdout { stream, .. } | Descriptor::Stderr { stream, .. } => { - streams::HostOutputStream::drop(self, stream) - .context("failed to call `drop` on `output-stream`") - } - Descriptor::File(File { fd, .. }) | Descriptor::PreopenDirectory((fd, _)) => { - filesystem::HostDescriptor::drop(self, fd).context("failed to call `drop`") - } - } - .map_err(types::Error::trap) - } - - /// Synchronize the data of a file to disk. - /// NOTE: This is similar to `fdatasync` in POSIX. - #[instrument(skip(self))] - async fn fd_datasync(&mut self, fd: types::Fd) -> Result<(), types::Error> { - let fd = self.get_file_fd(fd)?; - self.sync_data(fd).await.map_err(|e| { - e.try_into() - .context("failed to call `sync-data`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Get the attributes of a file descriptor. - /// NOTE: This returns similar flags to `fsync(fd, F_GETFL)` in POSIX, as well as additional fields. - #[instrument(skip(self))] - async fn fd_fdstat_get(&mut self, fd: types::Fd) -> Result { - let (fd, blocking, append) = match self.transact()?.get_descriptor(fd)? { - Descriptor::Stdin { isatty, .. } => { - let fs_rights_base = types::Rights::FD_READ; - return Ok(types::Fdstat { - fs_filetype: (*isatty).into(), - fs_flags: types::Fdflags::empty(), - fs_rights_base, - fs_rights_inheriting: fs_rights_base, - }); - } - Descriptor::Stdout { isatty, .. } | Descriptor::Stderr { isatty, .. } => { - let fs_rights_base = types::Rights::FD_WRITE; - return Ok(types::Fdstat { - fs_filetype: (*isatty).into(), - fs_flags: types::Fdflags::empty(), - fs_rights_base, - fs_rights_inheriting: fs_rights_base, - }); - } - Descriptor::PreopenDirectory((_, _)) => { - // Hard-coded set or rights expected by many userlands: - let fs_rights_base = types::Rights::PATH_CREATE_DIRECTORY - | types::Rights::PATH_CREATE_FILE - | types::Rights::PATH_LINK_SOURCE - | types::Rights::PATH_LINK_TARGET - | types::Rights::PATH_OPEN - | types::Rights::FD_READDIR - | types::Rights::PATH_READLINK - | types::Rights::PATH_RENAME_SOURCE - | types::Rights::PATH_RENAME_TARGET - | types::Rights::PATH_SYMLINK - | types::Rights::PATH_REMOVE_DIRECTORY - | types::Rights::PATH_UNLINK_FILE - | types::Rights::PATH_FILESTAT_GET - | types::Rights::PATH_FILESTAT_SET_TIMES - | types::Rights::FD_FILESTAT_GET - | types::Rights::FD_FILESTAT_SET_TIMES; - - let fs_rights_inheriting = fs_rights_base - | types::Rights::FD_DATASYNC - | types::Rights::FD_READ - | types::Rights::FD_SEEK - | types::Rights::FD_FDSTAT_SET_FLAGS - | types::Rights::FD_SYNC - | types::Rights::FD_TELL - | types::Rights::FD_WRITE - | types::Rights::FD_ADVISE - | types::Rights::FD_ALLOCATE - | types::Rights::FD_FILESTAT_GET - | types::Rights::FD_FILESTAT_SET_SIZE - | types::Rights::FD_FILESTAT_SET_TIMES - | types::Rights::POLL_FD_READWRITE; - - return Ok(types::Fdstat { - fs_filetype: types::Filetype::Directory, - fs_flags: types::Fdflags::empty(), - fs_rights_base, - fs_rights_inheriting, - }); - } - Descriptor::File(File { - fd, - blocking_mode, - append, - .. - }) => (fd.borrowed(), *blocking_mode, *append), - }; - let flags = self.get_flags(fd.borrowed()).await.map_err(|e| { - e.try_into() - .context("failed to call `get-flags`") - .unwrap_or_else(types::Error::trap) - })?; - let fs_filetype = self - .get_type(fd.borrowed()) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `get-type`") - .unwrap_or_else(types::Error::trap) - })? - .try_into() - .map_err(types::Error::trap)?; - let mut fs_flags = types::Fdflags::empty(); - let mut fs_rights_base = types::Rights::all(); - if let types::Filetype::Directory = fs_filetype { - fs_rights_base &= !types::Rights::FD_SEEK; - } - if !flags.contains(filesystem::DescriptorFlags::READ) { - fs_rights_base &= !types::Rights::FD_READ; - } - if !flags.contains(filesystem::DescriptorFlags::WRITE) { - fs_rights_base &= !types::Rights::FD_WRITE; - } - if flags.contains(filesystem::DescriptorFlags::DATA_INTEGRITY_SYNC) { - fs_flags |= types::Fdflags::DSYNC; - } - if flags.contains(filesystem::DescriptorFlags::REQUESTED_WRITE_SYNC) { - fs_flags |= types::Fdflags::RSYNC; - } - if flags.contains(filesystem::DescriptorFlags::FILE_INTEGRITY_SYNC) { - fs_flags |= types::Fdflags::SYNC; - } - if append { - fs_flags |= types::Fdflags::APPEND; - } - if matches!(blocking, BlockingMode::NonBlocking) { - fs_flags |= types::Fdflags::NONBLOCK; - } - Ok(types::Fdstat { - fs_filetype, - fs_flags, - fs_rights_base, - fs_rights_inheriting: fs_rights_base, - }) - } - - /// Adjust the flags associated with a file descriptor. - /// NOTE: This is similar to `fcntl(fd, F_SETFL, flags)` in POSIX. - #[instrument(skip(self))] - fn fd_fdstat_set_flags( - &mut self, - fd: types::Fd, - flags: types::Fdflags, - ) -> Result<(), types::Error> { - let mut st = self.transact()?; - let File { - append, - blocking_mode, - .. - } = st.get_file_mut(fd)?; - - // Only support changing the NONBLOCK or APPEND flags. - if flags.contains(types::Fdflags::DSYNC) - || flags.contains(types::Fdflags::SYNC) - || flags.contains(types::Fdflags::RSYNC) - { - return Err(types::Errno::Inval.into()); - } - *append = flags.contains(types::Fdflags::APPEND); - *blocking_mode = BlockingMode::from_fdflags(&flags); - Ok(()) - } - - /// Does not do anything if `fd` corresponds to a valid descriptor and returns `[types::Errno::Badf]` error otherwise. - #[instrument(skip(self))] - fn fd_fdstat_set_rights( - &mut self, - fd: types::Fd, - _fs_rights_base: types::Rights, - _fs_rights_inheriting: types::Rights, - ) -> Result<(), types::Error> { - self.get_fd(fd)?; - Ok(()) - } - - /// Return the attributes of an open file. - #[instrument(skip(self))] - async fn fd_filestat_get(&mut self, fd: types::Fd) -> Result { - let t = self.transact()?; - let desc = t.get_descriptor(fd)?; - match desc { - Descriptor::Stdin { isatty, .. } - | Descriptor::Stdout { isatty, .. } - | Descriptor::Stderr { isatty, .. } => Ok(types::Filestat { - dev: 0, - ino: 0, - filetype: (*isatty).into(), - nlink: 0, - size: 0, - atim: 0, - mtim: 0, - ctim: 0, - }), - Descriptor::PreopenDirectory((fd, _)) | Descriptor::File(File { fd, .. }) => { - let fd = fd.borrowed(); - drop(t); - let filesystem::DescriptorStat { - type_, - link_count: nlink, - size, - data_access_timestamp, - data_modification_timestamp, - status_change_timestamp, - } = self.stat(fd.borrowed()).await.map_err(|e| { - e.try_into() - .context("failed to call `stat`") - .unwrap_or_else(types::Error::trap) - })?; - let metadata_hash = self.metadata_hash(fd).await.map_err(|e| { - e.try_into() - .context("failed to call `metadata_hash`") - .unwrap_or_else(types::Error::trap) - })?; - let filetype = type_.try_into().map_err(types::Error::trap)?; - let zero = wall_clock::Datetime { - seconds: 0, - nanoseconds: 0, - }; - let atim = data_access_timestamp.unwrap_or(zero).try_into()?; - let mtim = data_modification_timestamp.unwrap_or(zero).try_into()?; - let ctim = status_change_timestamp.unwrap_or(zero).try_into()?; - Ok(types::Filestat { - dev: 1, - ino: metadata_hash.lower, - filetype, - nlink, - size, - atim, - mtim, - ctim, - }) - } - } - } - - /// Adjust the size of an open file. If this increases the file's size, the extra bytes are filled with zeros. - /// NOTE: This is similar to `ftruncate` in POSIX. - #[instrument(skip(self))] - async fn fd_filestat_set_size( - &mut self, - fd: types::Fd, - size: types::Filesize, - ) -> Result<(), types::Error> { - let fd = self.get_file_fd(fd)?; - self.set_size(fd, size).await.map_err(|e| { - e.try_into() - .context("failed to call `set-size`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Adjust the timestamps of an open file or directory. - /// NOTE: This is similar to `futimens` in POSIX. - #[instrument(skip(self))] - async fn fd_filestat_set_times( - &mut self, - fd: types::Fd, - atim: types::Timestamp, - mtim: types::Timestamp, - fst_flags: types::Fstflags, - ) -> Result<(), types::Error> { - let atim = systimespec( - fst_flags.contains(types::Fstflags::ATIM), - atim, - fst_flags.contains(types::Fstflags::ATIM_NOW), - )?; - let mtim = systimespec( - fst_flags.contains(types::Fstflags::MTIM), - mtim, - fst_flags.contains(types::Fstflags::MTIM_NOW), - )?; - - let fd = self.get_fd(fd)?; - self.set_times(fd, atim, mtim).await.map_err(|e| { - e.try_into() - .context("failed to call `set-times`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Read from a file descriptor. - /// NOTE: This is similar to `readv` in POSIX. - #[instrument(skip(self))] - async fn fd_read<'a>( - &mut self, - fd: types::Fd, - iovs: &types::IovecArray<'a>, - ) -> Result { - let t = self.transact()?; - let desc = t.get_descriptor(fd)?; - let (buf, read) = match desc { - Descriptor::File(File { - fd, - blocking_mode, - position, - .. - }) if t.view.table().get(fd)?.is_file() => { - let fd = fd.borrowed(); - let blocking_mode = *blocking_mode; - let position = position.clone(); - drop(t); - let Some(buf) = first_non_empty_iovec(iovs)? else { - return Ok(0); - }; - - let pos = position.load(Ordering::Relaxed); - let stream = self.read_via_stream(fd.borrowed(), pos).map_err(|e| { - e.try_into() - .context("failed to call `read-via-stream`") - .unwrap_or_else(types::Error::trap) - })?; - let read = blocking_mode - .read(self, stream.borrowed(), buf.len().try_into()?) - .await; - streams::HostInputStream::drop(self, stream).map_err(|e| types::Error::trap(e))?; - let read = read?; - let n = read.len().try_into()?; - let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?; - position.store(pos, Ordering::Relaxed); - - (buf, read) - } - Descriptor::Stdin { stream, .. } => { - let stream = stream.borrowed(); - drop(t); - let Some(buf) = first_non_empty_iovec(iovs)? else { - return Ok(0); - }; - let read = BlockingMode::Blocking - .read(self, stream, buf.len().try_into()?) - .await?; - (buf, read) - } - _ => return Err(types::Errno::Badf.into()), - }; - if read.len() > buf.len().try_into()? { - return Err(types::Errno::Range.into()); - } - let buf = buf.get_range(0..u32::try_from(read.len())?).unwrap(); - buf.copy_from_slice(&read)?; - let n = read.len().try_into()?; - Ok(n) - } - - /// Read from a file descriptor, without using and updating the file descriptor's offset. - /// NOTE: This is similar to `preadv` in POSIX. - #[instrument(skip(self))] - async fn fd_pread<'a>( - &mut self, - fd: types::Fd, - iovs: &types::IovecArray<'a>, - offset: types::Filesize, - ) -> Result { - let t = self.transact()?; - let desc = t.get_descriptor(fd)?; - let (buf, read) = match desc { - Descriptor::File(File { - fd, blocking_mode, .. - }) if t.view.table().get(fd)?.is_file() => { - let fd = fd.borrowed(); - let blocking_mode = *blocking_mode; - drop(t); - let Some(buf) = first_non_empty_iovec(iovs)? else { - return Ok(0); - }; - - let stream = self.read_via_stream(fd, offset).map_err(|e| { - e.try_into() - .context("failed to call `read-via-stream`") - .unwrap_or_else(types::Error::trap) - })?; - let read = blocking_mode - .read(self, stream.borrowed(), buf.len().try_into()?) - .await; - streams::HostInputStream::drop(self, stream).map_err(|e| types::Error::trap(e))?; - (buf, read?) - } - Descriptor::Stdin { .. } => { - // NOTE: legacy implementation returns SPIPE here - return Err(types::Errno::Spipe.into()); - } - _ => return Err(types::Errno::Badf.into()), - }; - if read.len() > buf.len().try_into()? { - return Err(types::Errno::Range.into()); - } - let buf = buf.get_range(0..u32::try_from(read.len())?).unwrap(); - buf.copy_from_slice(&read)?; - let n = read.len().try_into()?; - Ok(n) - } - - /// Write to a file descriptor. - /// NOTE: This is similar to `writev` in POSIX. - #[instrument(skip(self))] - async fn fd_write<'a>( - &mut self, - fd: types::Fd, - ciovs: &types::CiovecArray<'a>, - ) -> Result { - let t = self.transact()?; - let desc = t.get_descriptor(fd)?; - match desc { - Descriptor::File(File { - fd, - blocking_mode, - append, - position, - }) if t.view.table().get(fd)?.is_file() => { - let fd = fd.borrowed(); - let fd2 = fd.borrowed(); - let blocking_mode = *blocking_mode; - let position = position.clone(); - let append = *append; - drop(t); - let Some(buf) = first_non_empty_ciovec(ciovs)? else { - return Ok(0); - }; - let (stream, pos) = if append { - let stream = self.append_via_stream(fd).map_err(|e| { - e.try_into() - .context("failed to call `append-via-stream`") - .unwrap_or_else(types::Error::trap) - })?; - (stream, 0) - } else { - let pos = position.load(Ordering::Relaxed); - let stream = self.write_via_stream(fd, pos).map_err(|e| { - e.try_into() - .context("failed to call `write-via-stream`") - .unwrap_or_else(types::Error::trap) - })?; - (stream, pos) - }; - let n = blocking_mode.write(self, stream.borrowed(), buf).await; - streams::HostOutputStream::drop(self, stream).map_err(|e| types::Error::trap(e))?; - let n = n?; - if append { - let len = self.stat(fd2).await?; - position.store(len.size, Ordering::Relaxed); - } else { - let pos = pos.checked_add(n as u64).ok_or(types::Errno::Overflow)?; - position.store(pos, Ordering::Relaxed); - } - let n = n.try_into()?; - Ok(n) - } - Descriptor::Stdout { stream, .. } | Descriptor::Stderr { stream, .. } => { - let stream = stream.borrowed(); - drop(t); - let Some(buf) = first_non_empty_ciovec(ciovs)? else { - return Ok(0); - }; - let n = BlockingMode::Blocking - .write(self, stream, buf) - .await? - .try_into()?; - Ok(n) - } - _ => Err(types::Errno::Badf.into()), - } - } - - /// Write to a file descriptor, without using and updating the file descriptor's offset. - /// NOTE: This is similar to `pwritev` in POSIX. - #[instrument(skip(self))] - async fn fd_pwrite<'a>( - &mut self, - fd: types::Fd, - ciovs: &types::CiovecArray<'a>, - offset: types::Filesize, - ) -> Result { - let t = self.transact()?; - let desc = t.get_descriptor(fd)?; - let n = match desc { - Descriptor::File(File { - fd, blocking_mode, .. - }) if t.view.table().get(fd)?.is_file() => { - let fd = fd.borrowed(); - let blocking_mode = *blocking_mode; - drop(t); - let Some(buf) = first_non_empty_ciovec(ciovs)? else { - return Ok(0); - }; - let stream = self.write_via_stream(fd, offset).map_err(|e| { - e.try_into() - .context("failed to call `write-via-stream`") - .unwrap_or_else(types::Error::trap) - })?; - let result = blocking_mode.write(self, stream.borrowed(), buf).await; - streams::HostOutputStream::drop(self, stream).map_err(|e| types::Error::trap(e))?; - result? - } - Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => { - // NOTE: legacy implementation returns SPIPE here - return Err(types::Errno::Spipe.into()); - } - _ => return Err(types::Errno::Badf.into()), - }; - Ok(n.try_into()?) - } - - /// Return a description of the given preopened file descriptor. - #[instrument(skip(self))] - fn fd_prestat_get(&mut self, fd: types::Fd) -> Result { - if let Descriptor::PreopenDirectory((_, p)) = self.transact()?.get_descriptor(fd)? { - let pr_name_len = p.len().try_into()?; - return Ok(types::Prestat::Dir(types::PrestatDir { pr_name_len })); - } - Err(types::Errno::Badf.into()) // NOTE: legacy implementation returns BADF here - } - - /// Return a description of the given preopened file descriptor. - #[instrument(skip(self))] - fn fd_prestat_dir_name<'a>( - &mut self, - fd: types::Fd, - path: &GuestPtr<'a, u8>, - path_max_len: types::Size, - ) -> Result<(), types::Error> { - let path_max_len = path_max_len.try_into()?; - if let Descriptor::PreopenDirectory((_, p)) = self.transact()?.get_descriptor(fd)? { - if p.len() > path_max_len { - return Err(types::Errno::Nametoolong.into()); - } - write_bytes(path, p.as_bytes())?; - return Ok(()); - } - Err(types::Errno::Notdir.into()) // NOTE: legacy implementation returns NOTDIR here - } - - /// Atomically replace a file descriptor by renumbering another file descriptor. - #[instrument(skip(self))] - fn fd_renumber(&mut self, from: types::Fd, to: types::Fd) -> Result<(), types::Error> { - let mut st = self.transact()?; - let desc = st.descriptors.remove(from).ok_or(types::Errno::Badf)?; - st.descriptors.insert(to.into(), desc); - Ok(()) - } - - /// Move the offset of a file descriptor. - /// NOTE: This is similar to `lseek` in POSIX. - #[instrument(skip(self))] - async fn fd_seek( - &mut self, - fd: types::Fd, - offset: types::Filedelta, - whence: types::Whence, - ) -> Result { - let t = self.transact()?; - let File { fd, position, .. } = t.get_seekable(fd)?; - let fd = fd.borrowed(); - let position = position.clone(); - drop(t); - let pos = match whence { - types::Whence::Set if offset >= 0 => { - offset.try_into().map_err(|_| types::Errno::Inval)? - } - types::Whence::Cur => position - .load(Ordering::Relaxed) - .checked_add_signed(offset) - .ok_or(types::Errno::Inval)?, - types::Whence::End => { - let filesystem::DescriptorStat { size, .. } = self.stat(fd).await.map_err(|e| { - e.try_into() - .context("failed to call `stat`") - .unwrap_or_else(types::Error::trap) - })?; - size.checked_add_signed(offset).ok_or(types::Errno::Inval)? - } - _ => return Err(types::Errno::Inval.into()), - }; - position.store(pos, Ordering::Relaxed); - Ok(pos) - } - - /// Synchronize the data and metadata of a file to disk. - /// NOTE: This is similar to `fsync` in POSIX. - #[instrument(skip(self))] - async fn fd_sync(&mut self, fd: types::Fd) -> Result<(), types::Error> { - let fd = self.get_file_fd(fd)?; - self.sync(fd).await.map_err(|e| { - e.try_into() - .context("failed to call `sync`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Return the current offset of a file descriptor. - /// NOTE: This is similar to `lseek(fd, 0, SEEK_CUR)` in POSIX. - #[instrument(skip(self))] - fn fd_tell(&mut self, fd: types::Fd) -> Result { - let pos = self - .transact()? - .get_seekable(fd) - .map(|File { position, .. }| position.load(Ordering::Relaxed))?; - Ok(pos) - } - - #[instrument(skip(self))] - async fn fd_readdir<'a>( - &mut self, - fd: types::Fd, - buf: &GuestPtr<'a, u8>, - buf_len: types::Size, - cookie: types::Dircookie, - ) -> Result { - let fd = self.get_dir_fd(fd)?; - let stream = self.read_directory(fd.borrowed()).await.map_err(|e| { - e.try_into() - .context("failed to call `read-directory`") - .unwrap_or_else(types::Error::trap) - })?; - let dir_metadata_hash = self.metadata_hash(fd.borrowed()).await.map_err(|e| { - e.try_into() - .context("failed to call `metadata-hash`") - .unwrap_or_else(types::Error::trap) - })?; - let cookie = cookie.try_into().map_err(|_| types::Errno::Overflow)?; - - let head = [ - ( - types::Dirent { - d_next: 1u64.to_le(), - d_ino: dir_metadata_hash.lower.to_le(), - d_type: types::Filetype::Directory, - d_namlen: 1u32.to_le(), - }, - ".".into(), - ), - ( - types::Dirent { - d_next: 2u64.to_le(), - d_ino: dir_metadata_hash.lower.to_le(), // NOTE: incorrect, but legacy implementation returns `fd` inode here - d_type: types::Filetype::Directory, - d_namlen: 2u32.to_le(), - }, - "..".into(), - ), - ]; - - let mut dir = Vec::new(); - for (entry, d_next) in self - .table_mut() - // remove iterator from table and use it directly: - .delete(stream)? - .into_iter() - .zip(3u64..) - { - let filesystem::DirectoryEntry { type_, name } = entry.map_err(|e| { - e.try_into() - .context("failed to inspect `read-directory` entry") - .unwrap_or_else(types::Error::trap) - })?; - let metadata_hash = self - .metadata_hash_at(fd.borrowed(), filesystem::PathFlags::empty(), name.clone()) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `metadata-hash-at`") - .unwrap_or_else(types::Error::trap) - })?; - let d_type = type_.try_into().map_err(types::Error::trap)?; - let d_namlen: u32 = name.len().try_into().map_err(|_| types::Errno::Overflow)?; - dir.push(( - types::Dirent { - d_next: d_next.to_le(), - d_ino: metadata_hash.lower.to_le(), - d_type, // endian-invariant - d_namlen: d_namlen.to_le(), - }, - name, - )) - } - - // assume that `types::Dirent` size always fits in `u32` - const DIRENT_SIZE: u32 = size_of::() as _; - assert_eq!( - types::Dirent::guest_size(), - DIRENT_SIZE, - "Dirent guest repr and host repr should match" - ); - let mut buf = *buf; - let mut cap = buf_len; - for (ref entry, path) in head.into_iter().chain(dir.into_iter()).skip(cookie) { - let mut path = path.into_bytes(); - assert_eq!( - 1, - size_of_val(&entry.d_type), - "Dirent member d_type should be endian-invariant" - ); - let entry_len = cap.min(DIRENT_SIZE); - let entry = entry as *const _ as _; - let entry = unsafe { slice::from_raw_parts(entry, entry_len as _) }; - cap = cap.checked_sub(entry_len).unwrap(); - buf = write_bytes(buf, entry)?; - if cap == 0 { - return Ok(buf_len); - } - - if let Ok(cap) = cap.try_into() { - // `path` cannot be longer than `usize`, only truncate if `cap` fits in `usize` - path.truncate(cap); - } - cap = cap.checked_sub(path.len() as _).unwrap(); - buf = write_bytes(buf, &path)?; - if cap == 0 { - return Ok(buf_len); - } - } - Ok(buf_len.checked_sub(cap).unwrap()) - } - - #[instrument(skip(self))] - async fn path_create_directory<'a>( - &mut self, - dirfd: types::Fd, - path: &GuestPtr<'a, str>, - ) -> Result<(), types::Error> { - let dirfd = self.get_dir_fd(dirfd)?; - let path = read_string(path)?; - self.create_directory_at(dirfd.borrowed(), path) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `create-directory-at`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Return the attributes of a file or directory. - /// NOTE: This is similar to `stat` in POSIX. - #[instrument(skip(self))] - async fn path_filestat_get<'a>( - &mut self, - dirfd: types::Fd, - flags: types::Lookupflags, - path: &GuestPtr<'a, str>, - ) -> Result { - let dirfd = self.get_dir_fd(dirfd)?; - let path = read_string(path)?; - let filesystem::DescriptorStat { - type_, - link_count: nlink, - size, - data_access_timestamp, - data_modification_timestamp, - status_change_timestamp, - } = self - .stat_at(dirfd.borrowed(), flags.into(), path.clone()) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `stat-at`") - .unwrap_or_else(types::Error::trap) - })?; - let metadata_hash = self - .metadata_hash_at(dirfd, flags.into(), path) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `metadata-hash-at`") - .unwrap_or_else(types::Error::trap) - })?; - let filetype = type_.try_into().map_err(types::Error::trap)?; - let zero = wall_clock::Datetime { - seconds: 0, - nanoseconds: 0, - }; - let atim = data_access_timestamp.unwrap_or(zero).try_into()?; - let mtim = data_modification_timestamp.unwrap_or(zero).try_into()?; - let ctim = status_change_timestamp.unwrap_or(zero).try_into()?; - Ok(types::Filestat { - dev: 1, - ino: metadata_hash.lower, - filetype, - nlink, - size, - atim, - mtim, - ctim, - }) - } - - /// Adjust the timestamps of a file or directory. - /// NOTE: This is similar to `utimensat` in POSIX. - #[instrument(skip(self))] - async fn path_filestat_set_times<'a>( - &mut self, - dirfd: types::Fd, - flags: types::Lookupflags, - path: &GuestPtr<'a, str>, - atim: types::Timestamp, - mtim: types::Timestamp, - fst_flags: types::Fstflags, - ) -> Result<(), types::Error> { - let atim = systimespec( - fst_flags.contains(types::Fstflags::ATIM), - atim, - fst_flags.contains(types::Fstflags::ATIM_NOW), - )?; - let mtim = systimespec( - fst_flags.contains(types::Fstflags::MTIM), - mtim, - fst_flags.contains(types::Fstflags::MTIM_NOW), - )?; - - let dirfd = self.get_dir_fd(dirfd)?; - let path = read_string(path)?; - self.set_times_at(dirfd, flags.into(), path, atim, mtim) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `set-times-at`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Create a hard link. - /// NOTE: This is similar to `linkat` in POSIX. - #[instrument(skip(self))] - async fn path_link<'a>( - &mut self, - src_fd: types::Fd, - src_flags: types::Lookupflags, - src_path: &GuestPtr<'a, str>, - target_fd: types::Fd, - target_path: &GuestPtr<'a, str>, - ) -> Result<(), types::Error> { - let src_fd = self.get_dir_fd(src_fd)?; - let target_fd = self.get_dir_fd(target_fd)?; - let src_path = read_string(src_path)?; - let target_path = read_string(target_path)?; - self.link_at(src_fd, src_flags.into(), src_path, target_fd, target_path) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `link-at`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Open a file or directory. - /// NOTE: This is similar to `openat` in POSIX. - #[instrument(skip(self))] - async fn path_open<'a>( - &mut self, - dirfd: types::Fd, - dirflags: types::Lookupflags, - path: &GuestPtr<'a, str>, - oflags: types::Oflags, - fs_rights_base: types::Rights, - _fs_rights_inheriting: types::Rights, - fdflags: types::Fdflags, - ) -> Result { - let path = read_string(path)?; - - let mut flags = filesystem::DescriptorFlags::empty(); - if fs_rights_base.contains(types::Rights::FD_READ) { - flags |= filesystem::DescriptorFlags::READ; - } - if fs_rights_base.contains(types::Rights::FD_WRITE) { - flags |= filesystem::DescriptorFlags::WRITE; - } - if fdflags.contains(types::Fdflags::SYNC) { - flags |= filesystem::DescriptorFlags::FILE_INTEGRITY_SYNC; - } - if fdflags.contains(types::Fdflags::DSYNC) { - flags |= filesystem::DescriptorFlags::DATA_INTEGRITY_SYNC; - } - if fdflags.contains(types::Fdflags::RSYNC) { - flags |= filesystem::DescriptorFlags::REQUESTED_WRITE_SYNC; - } - - let t = self.transact()?; - let dirfd = match t.get_descriptor(dirfd)? { - Descriptor::PreopenDirectory((fd, _)) => fd.borrowed(), - Descriptor::File(File { fd, .. }) => { - t.view.table().get(fd)?.dir()?; - fd.borrowed() - } - _ => return Err(types::Errno::Badf.into()), - }; - drop(t); - let fd = self - .open_at(dirfd, dirflags.into(), path, oflags.into(), flags) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `open-at`") - .unwrap_or_else(types::Error::trap) - })?; - let fd = self.transact()?.descriptors.push_file(File { - fd, - position: Default::default(), - append: fdflags.contains(types::Fdflags::APPEND), - blocking_mode: BlockingMode::from_fdflags(&fdflags), - })?; - Ok(fd.into()) - } - - /// Read the contents of a symbolic link. - /// NOTE: This is similar to `readlinkat` in POSIX. - #[instrument(skip(self))] - async fn path_readlink<'a>( - &mut self, - dirfd: types::Fd, - path: &GuestPtr<'a, str>, - buf: &GuestPtr<'a, u8>, - buf_len: types::Size, - ) -> Result { - let dirfd = self.get_dir_fd(dirfd)?; - let path = read_string(path)?; - let mut path = self - .readlink_at(dirfd, path) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `readlink-at`") - .unwrap_or_else(types::Error::trap) - })? - .into_bytes(); - if let Ok(buf_len) = buf_len.try_into() { - // `path` cannot be longer than `usize`, only truncate if `buf_len` fits in `usize` - path.truncate(buf_len); - } - let n = path.len().try_into().map_err(|_| types::Errno::Overflow)?; - write_bytes(buf, &path)?; - Ok(n) - } - - #[instrument(skip(self))] - async fn path_remove_directory<'a>( - &mut self, - dirfd: types::Fd, - path: &GuestPtr<'a, str>, - ) -> Result<(), types::Error> { - let dirfd = self.get_dir_fd(dirfd)?; - let path = read_string(path)?; - self.remove_directory_at(dirfd, path).await.map_err(|e| { - e.try_into() - .context("failed to call `remove-directory-at`") - .unwrap_or_else(types::Error::trap) - }) - } - - /// Rename a file or directory. - /// NOTE: This is similar to `renameat` in POSIX. - #[instrument(skip(self))] - async fn path_rename<'a>( - &mut self, - src_fd: types::Fd, - src_path: &GuestPtr<'a, str>, - dest_fd: types::Fd, - dest_path: &GuestPtr<'a, str>, - ) -> Result<(), types::Error> { - let src_fd = self.get_dir_fd(src_fd)?; - let dest_fd = self.get_dir_fd(dest_fd)?; - let src_path = read_string(src_path)?; - let dest_path = read_string(dest_path)?; - self.rename_at(src_fd, src_path, dest_fd, dest_path) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `rename-at`") - .unwrap_or_else(types::Error::trap) - }) - } - - #[instrument(skip(self))] - async fn path_symlink<'a>( - &mut self, - src_path: &GuestPtr<'a, str>, - dirfd: types::Fd, - dest_path: &GuestPtr<'a, str>, - ) -> Result<(), types::Error> { - let dirfd = self.get_dir_fd(dirfd)?; - let src_path = read_string(src_path)?; - let dest_path = read_string(dest_path)?; - self.symlink_at(dirfd.borrowed(), src_path, dest_path) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `symlink-at`") - .unwrap_or_else(types::Error::trap) - }) - } - - #[instrument(skip(self))] - async fn path_unlink_file<'a>( - &mut self, - dirfd: types::Fd, - path: &GuestPtr<'a, str>, - ) -> Result<(), types::Error> { - let dirfd = self.get_dir_fd(dirfd)?; - let path = path.as_cow()?.to_string(); - self.unlink_file_at(dirfd.borrowed(), path) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `unlink-file-at`") - .unwrap_or_else(types::Error::trap) - }) - } - - #[instrument(skip(self))] - async fn poll_oneoff<'a>( - &mut self, - subs: &GuestPtr<'a, types::Subscription>, - events: &GuestPtr<'a, types::Event>, - nsubscriptions: types::Size, - ) -> Result { - if nsubscriptions == 0 { - // Indefinite sleeping is not supported in preview1. - return Err(types::Errno::Inval.into()); - } - let subs = subs.as_array(nsubscriptions); - let events = events.as_array(nsubscriptions); - - let n = usize::try_from(nsubscriptions).unwrap_or(usize::MAX); - let mut pollables = Vec::with_capacity(n); - for sub in subs.iter() { - let sub = sub?.read()?; - let p = match sub.u { - types::SubscriptionU::Clock(types::SubscriptionClock { - id, - timeout, - flags, - .. - }) => { - let absolute = flags.contains(types::Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME); - let (timeout, absolute) = match id { - types::Clockid::Monotonic => (timeout, absolute), - types::Clockid::Realtime if !absolute => (timeout, false), - types::Clockid::Realtime => { - let now = wall_clock::Host::now(self) - .await - .context("failed to call `wall_clock::now`") - .map_err(types::Error::trap)?; - - // Convert `timeout` to `Datetime` format. - let seconds = timeout / 1_000_000_000; - let nanoseconds = timeout % 1_000_000_000; - - let timeout = if now.seconds < seconds - || now.seconds == seconds - && u64::from(now.nanoseconds) < nanoseconds - { - // `now` is less than `timeout`, which is expressable as u64, - // substract the nanosecond counts directly - now.seconds * 1_000_000_000 + u64::from(now.nanoseconds) - timeout - } else { - 0 - }; - (timeout, false) - } - _ => return Err(types::Errno::Inval.into()), - }; - if absolute { - monotonic_clock::Host::subscribe_instant(self, timeout) - .await - .context("failed to call `monotonic_clock::subscribe_instant`") - .map_err(types::Error::trap)? - } else { - monotonic_clock::Host::subscribe_duration(self, timeout) - .await - .context("failed to call `monotonic_clock::subscribe_duration`") - .map_err(types::Error::trap)? - } - } - types::SubscriptionU::FdRead(types::SubscriptionFdReadwrite { - file_descriptor, - }) => { - let stream = { - let t = self.transact()?; - let desc = t.get_descriptor(file_descriptor)?; - match desc { - Descriptor::Stdin { stream, .. } => stream.borrowed(), - Descriptor::File(File { fd, position, .. }) - if t.view.table().get(fd)?.is_file() => - { - let pos = position.load(Ordering::Relaxed); - let fd = fd.borrowed(); - drop(t); - self.read_via_stream(fd, pos).map_err(|e| { - e.try_into() - .context("failed to call `read-via-stream`") - .unwrap_or_else(types::Error::trap) - })? - } - // TODO: Support sockets - _ => return Err(types::Errno::Badf.into()), - } - }; - streams::HostInputStream::subscribe(self, stream) - .context("failed to call `subscribe` on `input-stream`") - .map_err(types::Error::trap)? - } - types::SubscriptionU::FdWrite(types::SubscriptionFdReadwrite { - file_descriptor, - }) => { - let stream = { - let t = self.transact()?; - let desc = t.get_descriptor(file_descriptor)?; - match desc { - Descriptor::Stdout { stream, .. } - | Descriptor::Stderr { stream, .. } => stream.borrowed(), - Descriptor::File(File { - fd, - position, - append, - .. - }) if t.view.table().get(fd)?.is_file() => { - let fd = fd.borrowed(); - let position = position.clone(); - let append = *append; - drop(t); - if append { - self.append_via_stream(fd).map_err(|e| { - e.try_into() - .context("failed to call `append-via-stream`") - .unwrap_or_else(types::Error::trap) - })? - } else { - let pos = position.load(Ordering::Relaxed); - self.write_via_stream(fd, pos).map_err(|e| { - e.try_into() - .context("failed to call `write-via-stream`") - .unwrap_or_else(types::Error::trap) - })? - } - } - // TODO: Support sockets - _ => return Err(types::Errno::Badf.into()), - } - }; - streams::HostOutputStream::subscribe(self, stream) - .context("failed to call `subscribe` on `output-stream`") - .map_err(types::Error::trap)? - } - }; - pollables.push(p); - } - let ready: HashSet<_> = self - .poll(pollables) - .await - .context("failed to call `poll-oneoff`") - .map_err(types::Error::trap)? - .into_iter() - .collect(); - - let mut count: types::Size = 0; - for (sub, event) in (0..) - .zip(subs.iter()) - .filter_map(|(idx, sub)| ready.contains(&idx).then_some(sub)) - .zip(events.iter()) - { - let sub = sub?.read()?; - let event = event?; - let e = match sub.u { - types::SubscriptionU::Clock(..) => types::Event { - userdata: sub.userdata, - error: types::Errno::Success, - type_: types::Eventtype::Clock, - fd_readwrite: types::EventFdReadwrite { - flags: types::Eventrwflags::empty(), - nbytes: 0, - }, - }, - types::SubscriptionU::FdRead(types::SubscriptionFdReadwrite { - file_descriptor, - }) => { - let t = self.transact()?; - let desc = t.get_descriptor(file_descriptor)?; - match desc { - Descriptor::Stdin { .. } => types::Event { - userdata: sub.userdata, - error: types::Errno::Success, - type_: types::Eventtype::FdRead, - fd_readwrite: types::EventFdReadwrite { - flags: types::Eventrwflags::empty(), - nbytes: 1, - }, - }, - Descriptor::File(File { fd, position, .. }) - if t.view.table().get(fd)?.is_file() => - { - let fd = fd.borrowed(); - let position = position.clone(); - drop(t); - match self.stat(fd).await? { - filesystem::DescriptorStat { size, .. } => { - let pos = position.load(Ordering::Relaxed); - let nbytes = size.saturating_sub(pos); - types::Event { - userdata: sub.userdata, - error: types::Errno::Success, - type_: types::Eventtype::FdRead, - fd_readwrite: types::EventFdReadwrite { - flags: if nbytes == 0 { - types::Eventrwflags::FD_READWRITE_HANGUP - } else { - types::Eventrwflags::empty() - }, - nbytes: 1, - }, - } - } - } - } - // TODO: Support sockets - _ => return Err(types::Errno::Badf.into()), - } - } - types::SubscriptionU::FdWrite(types::SubscriptionFdReadwrite { - file_descriptor, - }) => { - let t = self.transact()?; - let desc = t.get_descriptor(file_descriptor)?; - match desc { - Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => types::Event { - userdata: sub.userdata, - error: types::Errno::Success, - type_: types::Eventtype::FdWrite, - fd_readwrite: types::EventFdReadwrite { - flags: types::Eventrwflags::empty(), - nbytes: 1, - }, - }, - Descriptor::File(File { fd, .. }) if t.view.table().get(fd)?.is_file() => { - types::Event { - userdata: sub.userdata, - error: types::Errno::Success, - type_: types::Eventtype::FdWrite, - fd_readwrite: types::EventFdReadwrite { - flags: types::Eventrwflags::empty(), - nbytes: 1, - }, - } - } - // TODO: Support sockets - _ => return Err(types::Errno::Badf.into()), - } - } - }; - event.write(e)?; - count = count - .checked_add(1) - .ok_or_else(|| types::Error::from(types::Errno::Overflow))? - } - Ok(count) - } - - #[instrument(skip(self))] - fn proc_exit(&mut self, status: types::Exitcode) -> anyhow::Error { - // Check that the status is within WASI's range. - if status >= 126 { - return anyhow::Error::msg("exit with invalid exit status outside of [0..126)"); - } - crate::preview2::I32Exit(status as i32).into() - } - - #[instrument(skip(self))] - fn proc_raise(&mut self, _sig: types::Signal) -> Result<(), types::Error> { - Err(types::Errno::Notsup.into()) - } - - #[instrument(skip(self))] - fn sched_yield(&mut self) -> Result<(), types::Error> { - // No such thing in preview 2. Intentionally left empty. - Ok(()) - } - - #[instrument(skip(self))] - async fn random_get<'a>( - &mut self, - buf: &GuestPtr<'a, u8>, - buf_len: types::Size, - ) -> Result<(), types::Error> { - let rand = self - .get_random_bytes(buf_len.into()) - .await - .context("failed to call `get-random-bytes`") - .map_err(types::Error::trap)?; - write_bytes(buf, &rand)?; - Ok(()) - } - - #[allow(unused_variables)] - #[instrument(skip(self))] - fn sock_accept( - &mut self, - fd: types::Fd, - flags: types::Fdflags, - ) -> Result { - todo!("preview1 sock_accept is not implemented") - } - - #[allow(unused_variables)] - #[instrument(skip(self))] - fn sock_recv<'a>( - &mut self, - fd: types::Fd, - ri_data: &types::IovecArray<'a>, - ri_flags: types::Riflags, - ) -> Result<(types::Size, types::Roflags), types::Error> { - todo!("preview1 sock_recv is not implemented") - } - - #[allow(unused_variables)] - #[instrument(skip(self))] - fn sock_send<'a>( - &mut self, - fd: types::Fd, - si_data: &types::CiovecArray<'a>, - _si_flags: types::Siflags, - ) -> Result { - todo!("preview1 sock_send is not implemented") - } - - #[allow(unused_variables)] - #[instrument(skip(self))] - fn sock_shutdown(&mut self, fd: types::Fd, how: types::Sdflags) -> Result<(), types::Error> { - todo!("preview1 sock_shutdown is not implemented") - } -} - -trait ResourceExt { - fn borrowed(&self) -> Resource; -} - -impl ResourceExt for Resource { - fn borrowed(&self) -> Resource { - Resource::new_borrow(self.rep()) - } -} From 9898bacc45d51d89675d12566db495ef440b1494 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 29 Jan 2024 17:18:10 +0100 Subject: [PATCH 07/12] Storing path in file/dir descriptors --- crates/wasi/src/preview2/ctx.rs | 3 ++- crates/wasi/src/preview2/filesystem.rs | 9 +++++++-- crates/wasi/src/preview2/host/filesystem.rs | 6 ++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/wasi/src/preview2/ctx.rs b/crates/wasi/src/preview2/ctx.rs index 4a77917ad609..7b40e5be4d9c 100644 --- a/crates/wasi/src/preview2/ctx.rs +++ b/crates/wasi/src/preview2/ctx.rs @@ -12,6 +12,7 @@ use crate::preview2::{ use cap_rand::{Rng, RngCore, SeedableRng}; use std::sync::Arc; use std::{mem, net::SocketAddr}; +use std::path::PathBuf; use std::time::Duration; use wasmtime::component::ResourceTable; @@ -150,7 +151,7 @@ impl WasiCtxBuilder { path: impl AsRef, ) -> &mut Self { self.preopens - .push((Dir::new(dir, perms, file_perms), path.as_ref().to_owned())); + .push((Dir::new(dir, perms, file_perms, PathBuf::from(path.as_ref())), path.as_ref().to_owned())); self } diff --git a/crates/wasi/src/preview2/filesystem.rs b/crates/wasi/src/preview2/filesystem.rs index b29f8f7f7712..8defa57d5b22 100644 --- a/crates/wasi/src/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/filesystem.rs @@ -7,6 +7,7 @@ use bytes::{Bytes, BytesMut}; use std::any::Any; use std::io; use std::mem; +use std::path::PathBuf; use std::sync::Arc; pub type FsResult = Result; @@ -75,13 +76,15 @@ pub struct File { /// [`spawn_blocking`]: Self::spawn_blocking pub file: Arc, pub perms: FilePerms, + pub path: PathBuf } impl File { - pub fn new(file: cap_std::fs::File, perms: FilePerms) -> Self { + pub fn new(file: cap_std::fs::File, perms: FilePerms, path: PathBuf) -> Self { Self { file: Arc::new(file), perms, + path } } @@ -110,14 +113,16 @@ pub struct Dir { pub dir: Arc, pub perms: DirPerms, pub file_perms: FilePerms, + pub path: PathBuf } impl Dir { - pub fn new(dir: cap_std::fs::Dir, perms: DirPerms, file_perms: FilePerms) -> Self { + pub fn new(dir: cap_std::fs::Dir, perms: DirPerms, file_perms: FilePerms, path: PathBuf) -> Self { Dir { dir: Arc::new(dir), perms, file_perms, + path } } diff --git a/crates/wasi/src/preview2/host/filesystem.rs b/crates/wasi/src/preview2/host/filesystem.rs index 02bd323d6132..d026f900d5ec 100644 --- a/crates/wasi/src/preview2/host/filesystem.rs +++ b/crates/wasi/src/preview2/host/filesystem.rs @@ -564,9 +564,10 @@ impl HostDescriptor for T { NotDir, } + let path_clone = path.clone(); let opened = d .spawn_blocking::<_, std::io::Result>(move |d| { - let mut opened = d.open_with(&path, &opts)?; + let mut opened = d.open_with(&path_clone, &opts)?; if opened.metadata()?.is_dir() { Ok(OpenResult::Dir(cap_std::fs::Dir::from_std_file( opened.into_std(), @@ -585,12 +586,13 @@ impl HostDescriptor for T { match opened { OpenResult::Dir(dir) => { - Ok(table.push(Descriptor::Dir(Dir::new(dir, d.perms, d.file_perms)))?) + Ok(table.push(Descriptor::Dir(Dir::new(dir, d.perms, d.file_perms, d.path.join(path))))?) } OpenResult::File(file) => Ok(table.push(Descriptor::File(File::new( file, mask_file_perms(d.file_perms, flags), + d.path.join(path) )))?), OpenResult::NotDir => Err(ErrorCode::NotDirectory.into()), From e7761ba728c63be0f86ff860de0861f3715317c7 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 29 Jan 2024 17:41:30 +0100 Subject: [PATCH 08/12] Passing the host path for preopened directories --- crates/wasi/src/preview2/ctx.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/wasi/src/preview2/ctx.rs b/crates/wasi/src/preview2/ctx.rs index 7b40e5be4d9c..bd39a308e423 100644 --- a/crates/wasi/src/preview2/ctx.rs +++ b/crates/wasi/src/preview2/ctx.rs @@ -149,9 +149,10 @@ impl WasiCtxBuilder { perms: DirPerms, file_perms: FilePerms, path: impl AsRef, + host_path: PathBuf, ) -> &mut Self { self.preopens - .push((Dir::new(dir, perms, file_perms, PathBuf::from(path.as_ref())), path.as_ref().to_owned())); + .push((Dir::new(dir, perms, file_perms, host_path), path.as_ref().to_owned())); self } From 54bfd9118ea9d17f4f17397f0656619ac003aca0 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Wed, 31 Jan 2024 10:30:01 +0100 Subject: [PATCH 09/12] Opened ReaddirIterator --- crates/wasi/src/preview2/filesystem.rs | 2 +- crates/wasi/src/preview2/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/wasi/src/preview2/filesystem.rs b/crates/wasi/src/preview2/filesystem.rs index 8defa57d5b22..9ad4b1c96edb 100644 --- a/crates/wasi/src/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/filesystem.rs @@ -315,7 +315,7 @@ pub struct ReaddirIterator( ); impl ReaddirIterator { - pub(crate) fn new( + pub fn new( i: impl Iterator> + Send + 'static, ) -> Self { ReaddirIterator(std::sync::Mutex::new(Box::new(i))) diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 8c9fd90d7458..5b236025f842 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -39,7 +39,7 @@ mod write_stream; pub use self::clocks::{HostMonotonicClock, HostWallClock}; pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiView}; pub use self::error::{I32Exit, TrappableError}; -pub use self::filesystem::{DirPerms, FilePerms, FsError, FsResult}; +pub use self::filesystem::{DirPerms, FilePerms, FsError, FsResult, ReaddirIterator}; pub use self::network::{Network, SocketError, SocketResult}; pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe}; pub use self::random::{thread_rng, Deterministic}; From b5eace206dcb7031f273e237f4087497eec29eea Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Wed, 31 Jan 2024 12:43:02 +0100 Subject: [PATCH 10/12] resolve_addresses made async --- crates/wasi/src/preview2/ip_name_lookup.rs | 2 +- crates/wasi/src/preview2/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/wasi/src/preview2/ip_name_lookup.rs b/crates/wasi/src/preview2/ip_name_lookup.rs index e3cd01f60207..011d0e068a7a 100644 --- a/crates/wasi/src/preview2/ip_name_lookup.rs +++ b/crates/wasi/src/preview2/ip_name_lookup.rs @@ -20,7 +20,7 @@ pub enum ResolveAddressStream { #[async_trait::async_trait] impl Host for T { - fn resolve_addresses( + async fn resolve_addresses( &mut self, network: Resource, name: String, diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 5b236025f842..9dd748393584 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -161,7 +161,8 @@ pub mod bindings { "get-environment", "get-arguments", "initial-cwd", - "get-directories" + "get-directories", + "resolve-addresses" ], }, trappable_error_type: { From df86b3ea017d87d38831e3315af96adb38569aa7 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Thu, 18 Jan 2024 17:54:47 +0100 Subject: [PATCH 11/12] fix: allow dynamic owned resources to be used as borrowed parameters (#7783) * fix: allow dynamic owned resources to be used as borrowed parameters Signed-off-by: Roman Volosatovs * tests: add `can_use_own_for_borrow` test Signed-off-by: Roman Volosatovs --------- Signed-off-by: Roman Volosatovs (cherry picked from commit 2c86e26b25e87548be189cfcc9903fda8548c13a) --- crates/wasmtime/src/component/func.rs | 3 +- crates/wasmtime/src/component/func/host.rs | 2 +- crates/wasmtime/src/component/types.rs | 11 +++- crates/wasmtime/src/component/values.rs | 12 ++-- tests/all/component_model/resources.rs | 66 ++++++++++++++++++++++ 5 files changed, 85 insertions(+), 9 deletions(-) diff --git a/crates/wasmtime/src/component/func.rs b/crates/wasmtime/src/component/func.rs index 0b69aff34d8c..cf4f710f1b1b 100644 --- a/crates/wasmtime/src/component/func.rs +++ b/crates/wasmtime/src/component/func.rs @@ -362,7 +362,8 @@ impl Func { } for (param, ty) in params.iter().zip(param_tys.iter()) { - ty.check(param).context("type mismatch with parameters")?; + ty.is_supertype_of(param) + .context("type mismatch with parameters")?; } self.call_raw( diff --git a/crates/wasmtime/src/component/func/host.rs b/crates/wasmtime/src/component/func/host.rs index a048275b9090..c00f38c833bd 100644 --- a/crates/wasmtime/src/component/func/host.rs +++ b/crates/wasmtime/src/component/func/host.rs @@ -385,7 +385,7 @@ where let mut cx = LowerContext::new(store, &options, types, instance); let instance = cx.instance_type(); for (val, ty) in result_vals.iter().zip(result_tys.types.iter()) { - Type::from(ty, &instance).check(val)?; + Type::from(ty, &instance).is_supertype_of(val)?; } if let Some(cnt) = result_tys.abi.flat_count(MAX_FLAT_RESULTS) { let mut dst = storage[..cnt].iter_mut(); diff --git a/crates/wasmtime/src/component/types.rs b/crates/wasmtime/src/component/types.rs index f751079fa037..401ba15ec407 100644 --- a/crates/wasmtime/src/component/types.rs +++ b/crates/wasmtime/src/component/types.rs @@ -675,9 +675,16 @@ impl Type { } } - pub(crate) fn check(&self, value: &Val) -> Result<()> { + /// Checks whether type of `value` is a subtype of `self`. + /// + /// # Errors + /// + /// Returns an error in case of a type mismatch + pub(crate) fn is_supertype_of(&self, value: &Val) -> Result<()> { let other = &value.ty(); - if self == other { + if self == other + || matches!((self, other), (Self::Borrow(s), Self::Own(other)) if s == other) + { Ok(()) } else if mem::discriminant(self) != mem::discriminant(other) { Err(anyhow!( diff --git a/crates/wasmtime/src/component/values.rs b/crates/wasmtime/src/component/values.rs index 332a0659a875..0775dcf7b2da 100644 --- a/crates/wasmtime/src/component/values.rs +++ b/crates/wasmtime/src/component/values.rs @@ -26,7 +26,7 @@ impl List { let element_type = ty.ty(); for (index, value) in values.iter().enumerate() { element_type - .check(value) + .is_supertype_of(value) .with_context(|| format!("type mismatch for element {index} of list"))?; } @@ -83,7 +83,7 @@ impl Record { if name == field.name { field .ty - .check(&value) + .is_supertype_of(&value) .with_context(|| format!("type mismatch for field {name} of record"))?; values.push(value); @@ -150,7 +150,7 @@ impl Tuple { } for (index, (value, ty)) in values.iter().zip(ty.types()).enumerate() { - ty.check(value) + ty.is_supertype_of(value) .with_context(|| format!("type mismatch for field {index} of tuple"))?; } @@ -256,7 +256,7 @@ impl Variant { fn typecheck_payload(name: &str, case_type: Option<&Type>, value: Option<&Val>) -> Result<()> { match (case_type, value) { (Some(expected), Some(actual)) => expected - .check(&actual) + .is_supertype_of(&actual) .with_context(|| format!("type mismatch for case {name} of variant")), (None, None) => Ok(()), (Some(_), None) => bail!("expected a payload for case `{name}`"), @@ -341,7 +341,9 @@ impl OptionVal { pub fn new(ty: &types::OptionType, value: Option) -> Result { let value = value .map(|value| { - ty.ty().check(&value).context("type mismatch for option")?; + ty.ty() + .is_supertype_of(&value) + .context("type mismatch for option")?; Ok::<_, Error>(value) }) diff --git a/tests/all/component_model/resources.rs b/tests/all/component_model/resources.rs index 448eceecf7ad..bb593894419b 100644 --- a/tests/all/component_model/resources.rs +++ b/tests/all/component_model/resources.rs @@ -856,6 +856,72 @@ fn cannot_use_borrow_for_own() -> Result<()> { Ok(()) } +#[test] +fn can_use_own_for_borrow() -> Result<()> { + let engine = super::engine(); + let c = Component::new( + &engine, + r#" + (component + (import "t" (type $t (sub resource))) + + (core func $drop (canon resource.drop $t)) + + (core module $m + (import "" "drop" (func $drop (param i32))) + (func (export "f") (param i32) + (call $drop (local.get 0)) + ) + ) + (core instance $i (instantiate $m + (with "" (instance + (export "drop" (func $drop)) + )) + )) + + (func (export "f") (param "x" (borrow $t)) + (canon lift (core func $i "f"))) + ) + "#, + )?; + + struct MyType; + + let mut store = Store::new(&engine, ()); + let mut linker = Linker::new(&engine); + let ty_idx = linker + .root() + .resource("t", ResourceType::host::(), |_, _| Ok(()))?; + let i_pre = linker.instantiate_pre(&c)?; + let i = i_pre.instantiate(&mut store)?; + + let f = i.get_func(&mut store, "f").unwrap(); + let f_typed = i.get_typed_func::<(&Resource,), ()>(&mut store, "f")?; + + let resource = Resource::new_own(100); + f_typed.call(&mut store, (&resource,))?; + f_typed.post_return(&mut store)?; + + let resource = Resource::new_borrow(200); + f_typed.call(&mut store, (&resource,))?; + f_typed.post_return(&mut store)?; + + let resource = + Resource::::new_own(300).try_into_resource_any(&mut store, &i_pre, ty_idx)?; + f.call(&mut store, &[Val::Resource(resource)], &mut [])?; + f.post_return(&mut store)?; + resource.resource_drop(&mut store)?; + + // TODO: Enable once https://github.com/bytecodealliance/wasmtime/issues/7793 is fixed + //let resource = + // Resource::::new_borrow(400).try_into_resource_any(&mut store, &i_pre, ty_idx)?; + //f.call(&mut store, &[Val::Resource(resource)], &mut [])?; + //f.post_return(&mut store)?; + //resource.resource_drop(&mut store)?; + + Ok(()) +} + #[test] fn passthrough_wrong_type() -> Result<()> { let engine = super::engine(); From c2e97ff31dfaa0da2deeb9365e53551183039dd3 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 28 Mar 2024 15:40:55 +0100 Subject: [PATCH 12/12] wasi-http handle() made async --- crates/wasi-http/src/http_impl.rs | 4 +++- crates/wasi-http/src/lib.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 7804edffc3a2..e7134c3f7c6c 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use crate::{ bindings::http::{ outgoing_handler, @@ -12,8 +13,9 @@ use http_body_util::{BodyExt, Empty}; use hyper::Method; use wasmtime::component::Resource; +#[async_trait] impl outgoing_handler::Host for T { - fn handle( + async fn handle( &mut self, request_id: Resource, options: Option>, diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index e1df49c7d75c..fca206b43d6c 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -18,6 +18,7 @@ pub mod bindings { tracing: true, async: { only_imports: [ + "handle", "[method]future-incoming-response.get", "[method]future-trailers.get", ],