diff --git a/lucet-runtime/lucet-runtime-internals/src/future.rs b/lucet-runtime/lucet-runtime-internals/src/future.rs index 313917fd6..f88e73a39 100644 --- a/lucet-runtime/lucet-runtime-internals/src/future.rs +++ b/lucet-runtime/lucet-runtime-internals/src/future.rs @@ -1,112 +1,158 @@ -use crate::error::Error; -use crate::instance::{InstanceHandle, InternalRunResult, RunResult, State, TerminationDetails}; +use crate::instance::{InstanceHandle, InternalRunResult, RunResult, TerminationDetails}; use crate::module::FunctionHandle; -use crate::val::{UntypedRetVal, Val}; +use crate::val::Val; use crate::vmctx::{Vmctx, VmctxInternal}; -use std::any::Any; -use std::future::Future; +use crate::{error::Error, instance::EmptyYieldVal}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; +use std::{any::Any, future::Future}; -/// This is the same type defined by the `futures` library, but we don't need the rest of the -/// library for this purpose. -type BoxFuture<'a, T> = Pin + Send + 'a>>; +/// a representation of AsyncContext which can be freely cloned +#[doc(hidden)] +#[derive(Clone)] +pub struct AsyncContext { + waker: Waker, +} -/// A unique type that wraps a boxed future with a boxed return value. -/// -/// Type and lifetime guarantees are maintained by `Vmctx::block_on` and `Instance::run_async`. The -/// user never sees this type. -struct YieldedFuture(BoxFuture<'static, ResumeVal>); +const DEFAULT_INST_COUNT_BOUND: u64 = i64::MAX as u64; + +/// A value representing that the guest instance yielded because it was blocked on a future. +struct AsyncYielded; -/// A unique type for a boxed return value. The user never sees this type. -pub struct ResumeVal(Box); +/// Providing the private `AsyncResume` as a resume value certifies that +/// RunAsync upheld the invariants necessary to safely resume the instance. +struct AsyncResume; -unsafe impl Send for ResumeVal {} +/// An error representing a failure of `try_block_on` +#[doc(hidden)] +pub enum BlockOnError { + NeedsAsyncContext, +} + +impl From for TerminationDetails { + fn from(err: BlockOnError) -> Self { + match err { + BlockOnError::NeedsAsyncContext => TerminationDetails::BlockOnNeedsAsync, + } + } +} impl Vmctx { /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. /// - /// Lucet hostcalls are synchronous `extern "C" fn` functions called from WebAssembly. In that - /// context, we cannot use `.await` directly because the hostcall is not `async`. While we could - /// block on an executor using `futures::executor::block_on` or - /// `tokio::runtime::Runtime::block_on`, that has two drawbacks: + /// While this method is supported and part of the public API, it's easiest to define the hostcall + /// function itself as async (the `#[lucet_hostcall]` macro simply calls this function). + /// If you need to provide a synchronous fallback, use [`Vmctx::try_block_on()`] instead. + /// + /// Note: + /// - This method may only be used if `Instance::run_async` was used to run the VM, + /// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. + #[doc(hidden)] + #[inline(always)] + pub fn block_on(&self, f: impl Future) -> R { + match self.try_block_on(f) { + Ok(res) => res, + Err(err) => panic!(TerminationDetails::from(err)), + } + } + + /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. + /// + /// The easiest way to make a hostcall async is simply to add the `async` modifier: /// - /// - If the Lucet instance was originally invoked from an async context, trying to block on the - /// same runtime will fail if the executor cannot be nested (all executors we know of have this - /// restriction). + /// ```ignore + /// #[lucet_hostcall] + /// #[no_mangle] + /// pub async fn hostcall_async(vmctx: &Vmctx) { + /// foobar().await + /// } + /// ``` /// - /// - The current OS thread would be blocked on the result of the computation, rather than being - /// able to run other async tasks while awaiting. This means an application will need more - /// threads than otherwise would be necessary. + /// Of course, we can only expose synchronous interfaces to Wasm guests. To implement + /// async hostcalls, Lucet blocks guest execution while waiting the future to complete, + /// yielding control of the thread back the async executor so that other tasks can make + /// progress in the meantime. /// - /// Instead, this block_on operator is designed to work only when called within an invocation - /// of [`Instance::run_async`]. The `run_async` method executes instance code within a - /// trampoline, itself running within an async context, making it possible to temporarily pause - /// guest execution, jump back to the trampoline, and await there. The future given to block_on - /// is in passed back to that trampoline, and runs on the same runtime that invoked - /// `run_async`, avoiding problems of nesting, and allowing the current OS thread to continue - /// performing other async work. + /// It's recommended to use the async modifier, rather than call this function directly. + /// The primary reason you may want to do so directly is if you want to provide a fallback + /// implementation for the case when calling a hostcall from outside of an async context. /// - /// Note that this method may only be used if `Instance::run_async` was used to run the VM, - /// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. - pub fn block_on<'a, R>(&'a self, f: impl Future + Send + 'a) -> R - where - R: Any + Send + 'static, - { - // Die if we aren't in Instance::run_async - match self.instance().state { - State::Running { async_context } => { - if !async_context { - panic!(TerminationDetails::BlockOnNeedsAsync) - } + /// There is no additional yield to the host when the future passed is immediately ready. + /// This behavior is the same as implemented by the `.await` operator. + /// + /// The future passed is polled from within the guest execution context. If the future is not + /// immediately ready, the instance will yield and return control to the async executor. + /// Later, once the future is ready to make progress, the async executor will return us to + /// the guest context, where Lucet will resume polling the future to completion. + /// + /// For async hostcalls that may yield to the async executor many times, it's recommended that + /// you use `tokio::spawn`, or the equivalent from your async executor, which will spawn the task + /// to be run from the host execution context. This avoids the overhead of context switching into + /// the guest execution context every time the future needs to make progress. + /// + /// If `Instance::run_async` is not being used to run the VM, this function will return + /// `Err(BlockOnError::NeedsAsyncContext)`. + pub fn try_block_on(&self, mut f: impl Future) -> Result { + // We pin the future to the stack (a requirement for being able to poll the future). + // By pinning to the stack instead of using `Box::pin`, we avoid heap allocations for + // immediately-ready futures. + // + // SAFETY: We must uphold the invariants of Pin, namely that future does not move until + // it is dropped. By overriding the variable named `f`, it is impossible to access f again, + // except through the pinned reference. + let mut f = unsafe { Pin::new_unchecked(&mut f) }; + + loop { + // Get the AsyncContext, or die if we aren't async + let arc_cx = match &self.instance().async_ctx { + Some(cx) => cx.clone(), + None => return Err(BlockOnError::NeedsAsyncContext), + }; + + // build an std::task::Context + let mut cx = Context::from_waker(&arc_cx.waker); + + if let Poll::Ready(ret) = f.as_mut().poll(&mut cx) { + return Ok(ret); } - _ => unreachable!("Access to vmctx implies instance is Running"), - } - // Wrap the Output of `f` as a boxed ResumeVal. Then, box the entire - // async computation. - let f = Box::pin(async move { ResumeVal(Box::new(f.await)) }); - // Change the lifetime of the async computation from `'a` to `'static. - // We need to lie about this lifetime so that `YieldedFuture` may impl - // `Any` and be passed through the yield. `Instance::run_async` - // rehydrates this lifetime to be at most as long as the Vmctx's `'a`. - // This is safe because the stack frame that `'a` is tied to gets - // frozen in place as part of `self.yield_val_expecting_val`. - let f = unsafe { - std::mem::transmute::, BoxFuture<'static, ResumeVal>>(f) - }; - // Wrap the computation in `YieldedFuture` so that - // `Instance::run_async` can catch and run it. We will get the - // `ResumeVal` we applied to `f` above. - self.yield_impl::(YieldedFuture(f), false, false); - let ResumeVal(v) = self.take_resumed_val(); - // We may now downcast and unbox the returned Box into an `R` - // again. - *v.downcast().expect("run_async broke invariant") - } -} -/// A simple future that yields once. We use this to yield when a runtime bound is reached. -/// -/// Inspired by Tokio's `yield_now()`. -struct YieldNow { - yielded: bool, -} + // The future is pending, so we need to yield to the async executor + self.yield_impl::(AsyncYielded, false, false); -impl YieldNow { - fn new() -> Self { - Self { yielded: false } - } -} + // Poll is synchronous and may have yielded back to this host, so it is possible that we are + // executing from a new RunAsync future (and thus, a new async executor). If the future has awoken + // the previous waker, we would miss a wakeup. + // + // Rather than try to prevent this from happening at all, we check if the waker changed from under us, + // and if so, we simply poll the future again. If the future is Ready by that point, there's no need + // for another wakeup (we've already done so!). Otherwise, polling the future registers the new waker, + // allowing us to yield back to the async executor without risking a missed wakup. + // + // Note: Waking a waker unnecessarily will not cause unsafety or logic errors. In the worst case, the + // async executor may waste CPU time polling pending futures an extra time. + + match &self.instance().async_ctx { + Some(new_cx) => { + let same_waker = Arc::ptr_eq(&arc_cx, &new_cx) || arc_cx.waker.will_wake(&new_cx.waker); + + if !same_waker { + // The AsyncContext changed on us. + // Poll the future again before yielding to the executor in order to register the new waker. + continue; + } + }, + _ => panic!("Lucet instance blocked on a future, but no longer running in async context. Make sure to use resume_async when resuming an async guest.") + } -impl Future for YieldNow { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if self.yielded { - Poll::Ready(()) - } else { - self.yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending + // Providing the private `AsyncResume` as a resume value certifies that + // RunAsync upheld the invariants necessary for us to avoid a borrow check. + // + // If we resume with any other value, the instance may have been modified, and it is + // unsound to resume the instance. + let AsyncResume = self.take_resumed_val::(); } } } @@ -114,26 +160,29 @@ impl Future for YieldNow { impl InstanceHandle { /// Run a WebAssembly function with arguments in the guest context at the given entrypoint. /// - /// This method is similar to `Instance::run()`, but allows the Wasm program to invoke hostcalls - /// that use `Vmctx::block_on` and provides the trampoline that `.await`s those futures on - /// behalf of the guest. + /// This method is similar to `Instance::run()`, but allows the Wasm program to invoke async hostcalls. /// - /// If `runtime_bound` is provided, it will also pause the Wasm execution and yield a future - /// that resumes it after (approximately) that many Wasm opcodes have executed. + /// To define an async hostcall, simply add an `async` modifier to your hostcall: + /// + /// ```ignore + /// #[lucet_hostcall] + /// #[no_mangle] + /// pub async fn hostcall_async(vmctx: &Vmctx) { + /// foobar().await + /// } + /// ``` /// - /// # `Vmctx` Restrictions + /// See [`Vmctx::try_block_on()`] for details. /// - /// This method permits the use of `Vmctx::block_on`, but disallows all other uses of `Vmctx:: - /// yield_val_expecting_val` and family (`Vmctx::yield_`, `Vmctx::yield_expecting_val`, - /// `Vmctx::yield_val`). - pub async fn run_async<'a>( - &'a mut self, - entrypoint: &'a str, - args: &'a [Val], - runtime_bound: Option, - ) -> Result { - let func = self.module.get_export_func(entrypoint)?; - self.run_async_internal(func, args, runtime_bound).await + /// If `runtime_bound` is provided, it will also pause the Wasm execution and yield a future + /// that resumes it after (approximately) that many Wasm opcodes have executed. + pub fn run_async<'a>(&'a mut self, entrypoint: &'a str, args: &'a [Val]) -> RunAsync<'a> { + let func = self.module.get_export_func(entrypoint); + + match func { + Ok(func) => self.run_async_internal(func, args), + Err(err) => self.run_async_failed(err), + } } /// Run the module's [start function][start], if one exists. @@ -150,91 +199,155 @@ impl InstanceHandle { /// runtime between async future yields (invocations of `.poll()` on the /// underlying generated future) if `runtime_bound` is provided. This /// behaves the same way as `Instance::run_async()`. - pub async fn run_async_start<'a>( - &'a mut self, - runtime_bound: Option, - ) -> Result<(), Error> { + /// + /// Just like `Instance::run_start()`, hostcalls, including async hostcalls, + /// cannot be called from the instance start function. + /// + /// The result of the `RunAsync` future is unspecified, and should not be relied on. + pub fn run_async_start<'a>(&'a mut self) -> RunAsync<'a> { if !self.is_not_started() { - return Err(Error::StartAlreadyRun); + return self.run_async_failed(Error::StartAlreadyRun); } - let start = match self.module.get_start_func()? { - Some(start) => start, - None => return Ok(()), + + let func = match self.module.get_start_func() { + Ok(start) => start.expect("NotStarted, but no start function"), // can only be in NotStarted state if a start function exists, + Err(err) => return self.run_async_failed(err), }; - self.run_async_internal(start, &[], runtime_bound).await?; - Ok(()) + + self.run_async_internal(func, &[]) } - /// Shared async run-loop implementation for both `run_async()` and - /// `run_start_async()`. - async fn run_async_internal<'a>( - &'a mut self, - func: FunctionHandle, - args: &'a [Val], - runtime_bound: Option, - ) -> Result { - if self.is_yielded() { - return Err(Error::Unsupported( - "cannot run_async a yielded instance".to_owned(), - )); + /// Resume async execution of an instance that has yielded, providing a value to the guest. + /// + /// If an async execution context yields from within a future, resuming with [`Instance::resume()`], + /// [`Instance::resume_with_val()`], may panic if the instance needs to block on an async function. + /// Use this function instead, which will resume the instance within an async context. + /// + /// The provided value will be dynamically typechecked against the type the guest expects to + /// receive, and if that check fails, this call will fail with `Error::InvalidArgument`. + /// + /// See [`Instance::resume()`], [`Instance::resume_with_val()`], and [`Instance::run_async()`]. + /// + /// # Safety + /// + /// The foreign code safety caveat of [`Instance::run()`](struct.Instance.html#method.run) + /// applies. + pub fn resume_async_with_val<'a>(&'a mut self, val: impl Any + 'static + Send) -> RunAsync<'a> { + let val = Box::new(val) as Box; + + RunAsync { + inst: self, + inst_count_bound: DEFAULT_INST_COUNT_BOUND, + state: RunAsyncState::ResumeYielded(val), } + } - // Store the ResumeVal here when we get it. - let mut resume_val: Option = None; - loop { - // Run the WebAssembly program - let run_result = if self.is_yielded() { - // A previous iteration of the loop stored the ResumeVal in - // `resume_val`, send it back to the guest ctx and continue - // running: - self.resume_with_val_impl( - resume_val - .take() - .expect("is_yielded implies resume_value is some"), - true, - runtime_bound, - ) - } else if self.is_bound_expired() { - self.resume_bounded( - runtime_bound.expect("should have bound if guest had expired bound"), - ) - } else { + /// Resume execution of an instance that has yielded without providing a value to the guest. + /// + /// See [`Instance::resume_async_with_val()`] + pub fn resume_async<'a>(&'a mut self) -> RunAsync<'a> { + self.resume_async_with_val(EmptyYieldVal) + } + + /// Returns a `RunAsync` that will asynchronously execute the guest instnace. + fn run_async_internal<'a>(&'a mut self, func: FunctionHandle, args: &'a [Val]) -> RunAsync<'a> { + RunAsync { + inst: self, + inst_count_bound: DEFAULT_INST_COUNT_BOUND, + state: RunAsyncState::Start(func, args), + } + } + + /// Returns a `RunAsync` that will immediately fail with the given error, without executing the guest instance. + fn run_async_failed<'a>(&'a mut self, err: Error) -> RunAsync<'a> { + RunAsync { + inst: self, + inst_count_bound: DEFAULT_INST_COUNT_BOUND, + state: RunAsyncState::Failed(err), + } + } +} + +/// A Future implementation that enables asynchronous execution of bounded slices of +/// WebAssembly, and of underlying async hostcalls that it invokes. +/// +/// See [`Vmctx::try_block_on()`] for more details about how this works, and how +/// to define an async hostcall. +pub struct RunAsync<'a> { + inst: &'a mut InstanceHandle, + state: RunAsyncState<'a>, + /// The instance count bound. Can be changed at any time, taking effect on the next entry to the guest execution context + pub inst_count_bound: u64, +} + +impl<'a> RunAsync<'a> { + /// Set the instance count bound + pub fn bound_inst_count(&mut self, inst_count_bound: u64) -> &mut Self { + self.inst_count_bound = inst_count_bound; + self + } +} + +enum RunAsyncState<'a> { + Start(FunctionHandle, &'a [Val]), + ResumeYielded(Box), + BoundExpired, + Failed(Error), +} + +impl<'a> Future for RunAsync<'a> { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inst_count_bound = self.inst_count_bound; + + let waker = cx.waker(); + let cx = AsyncContext { + waker: waker.clone(), + }; + + let state = std::mem::replace( + &mut self.state, + RunAsyncState::Failed(Error::InvalidArgument("Polled an invalid future")), + ); + let run_result = match state { + RunAsyncState::Start(func, args) => { // This is the first iteration, call the entrypoint: - self.run_func(func, args, true, runtime_bound) - }; - match run_result? { - InternalRunResult::Normal(RunResult::Returned(rval)) => { - // Finished running, return UntypedReturnValue - return Ok(rval); - } - InternalRunResult::Normal(RunResult::Yielded(yval)) => { - // Check if the yield came from Vmctx::block_on: - if yval.is::() { - let YieldedFuture(future) = *yval.downcast::().unwrap(); - // Rehydrate the lifetime from `'static` to `'a`, which - // is morally the same lifetime as was passed into - // `Vmctx::block_on`. - let future = future as BoxFuture<'a, ResumeVal>; - - // await on the computation. Store its result in - // `resume_val`. - resume_val = Some(future.await); - // Now we want to `Instance::resume_with_val` and start - // this cycle over. - } else { - // Any other yielded value is not supported - die with an error. - return Err(Error::Unsupported( - "cannot yield anything besides a future in Instance::run_async" - .to_owned(), - )); + self.inst + .run_func(func, args, Some(cx), Some(inst_count_bound)) + } + RunAsyncState::ResumeYielded(val) => { + self.inst + .resume_with_val_impl(val, Some(cx), Some(inst_count_bound)) + } + RunAsyncState::BoundExpired => self.inst.resume_bounded(cx, inst_count_bound), + RunAsyncState::Failed(err) => Err(err), + }; + + let res = match run_result { + Ok(InternalRunResult::Normal(r @ RunResult::Returned(_))) => Ok(r), + Ok(InternalRunResult::Normal(RunResult::Yielded(yval))) => { + match yval.downcast::() { + Ok(_) => { + // When this future is polled next, we'll resume the guest instance using `AsyncResume` + self.state = RunAsyncState::ResumeYielded(Box::new(AsyncResume)); + return Poll::Pending; } - } - InternalRunResult::BoundExpired => { - // Await on a simple future that yields once then is ready. - YieldNow::new().await + Err(yval) => Ok(RunResult::Yielded(yval)), } } - } + Ok(InternalRunResult::BoundExpired) => { + // The instruction count bound expired. Yield to the async exeuctor and immediately wake. + // + // By immediately waking, the future will be scheduled to run later (similar to tokio's yield_now()) + self.state = RunAsyncState::BoundExpired; + waker.wake_by_ref(); + return Poll::Pending; + } + Err(err) => Err(err), + }; + + Poll::Ready(res) } } @@ -251,8 +364,8 @@ mod test { #[allow(unreachable_code)] fn _dont_run_me() { let mut _inst: InstanceHandle = unimplemented!(); - _assert_send(&_inst.run_async("", &[], None)); - _assert_send(&_inst.run_async_start(None)); + _assert_send(&_inst.run_async("", &[])); + _assert_send(&_inst.run_async_start()); } } } diff --git a/lucet-runtime/lucet-runtime-internals/src/instance.rs b/lucet-runtime/lucet-runtime-internals/src/instance.rs index 822cbe315..fdc953775 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance.rs @@ -7,7 +7,6 @@ pub use crate::instance::execution::{KillError, KillState, KillSuccess, KillSwit pub use crate::instance::signals::{signal_handler_none, SignalBehavior, SignalHandler}; pub use crate::instance::state::State; -use crate::alloc::Alloc; use crate::context::Context; use crate::embed_ctx::CtxMap; use crate::error::Error; @@ -18,13 +17,13 @@ use crate::region::RegionInternal; use crate::sysdeps::HOST_PAGE_SIZE_EXPECTED; use crate::val::{UntypedRetVal, Val}; use crate::WASM_PAGE_SIZE; +use crate::{alloc::Alloc, future::AsyncContext}; use libc::{c_void, pthread_self, siginfo_t, uintptr_t}; use lucet_module::InstanceRuntimeData; use memoffset::offset_of; use std::any::Any; use std::cell::{BorrowError, BorrowMutError, Ref, RefCell, RefMut, UnsafeCell}; use std::convert::TryFrom; -use std::marker::PhantomData; use std::mem; use std::ops::{Deref, DerefMut}; use std::ptr::{self, NonNull}; @@ -228,6 +227,10 @@ pub struct Instance { /// Small mutexed state used for remote kill switch functionality pub(crate) kill_state: Arc, + /// Indicates whether the instance is running in an async context (`Instance::run_async`) + /// or not. Needed by `Vmctx::block_on`. + pub(crate) async_ctx: Option>, + #[cfg(feature = "concurrent_testpoints")] /// Conditionally-present helpers to force permutations of possible races in testing. pub lock_testpoints: Arc, @@ -515,7 +518,7 @@ impl Instance { /// in the future. pub fn run(&mut self, entrypoint: &str, args: &[Val]) -> Result { let func = self.module.get_export_func(entrypoint)?; - Ok(self.run_func(func, &args, false, None)?.unwrap()) + Ok(self.run_func(func, &args, None, None)?.unwrap()) } /// Run a function with arguments in the guest context from the [WebAssembly function @@ -531,7 +534,7 @@ impl Instance { args: &[Val], ) -> Result { let func = self.module.get_func_from_idx(table_idx, func_idx)?; - Ok(self.run_func(func, &args, false, None)?.unwrap()) + Ok(self.run_func(func, &args, None, None)?.unwrap()) } /// Resume execution of an instance that has yielded without providing a value to the guest. @@ -562,19 +565,21 @@ impl Instance { /// The foreign code safety caveat of [`Instance::run()`](struct.Instance.html#method.run) /// applies. pub fn resume_with_val(&mut self, val: A) -> Result { - Ok(self.resume_with_val_impl(val, false, None)?.unwrap()) + Ok(self + .resume_with_val_impl(Box::new(val), None, None)? + .unwrap()) } - pub(crate) fn resume_with_val_impl( + pub(crate) fn resume_with_val_impl( &mut self, - val: A, - async_context: bool, + val: Box, + async_context: Option, max_insn_count: Option, ) -> Result { match &self.state { State::Yielded { expecting, .. } => { // make sure the resumed value is of the right type - if !expecting.is::>() { + if &(*val).type_id() != expecting { return Err(Error::InvalidArgument( "type mismatch between yielded instance expected value and resumed value", )); @@ -583,7 +588,7 @@ impl Instance { _ => return Err(Error::InvalidArgument("can only resume a yielded instance")), } - self.resumed_val = Some(Box::new(val) as Box); + self.resumed_val = Some(val); self.set_instruction_bound_delta(max_insn_count); self.swap_and_return(async_context) @@ -602,6 +607,7 @@ impl Instance { /// applies. pub(crate) fn resume_bounded( &mut self, + async_context: AsyncContext, max_insn_count: u64, ) -> Result { if !self.state.is_bound_expired() { @@ -610,7 +616,7 @@ impl Instance { )); } self.set_instruction_bound_delta(Some(max_insn_count)); - self.swap_and_return(true) + self.swap_and_return(Some(async_context)) } /// Run the module's [start function][start], if one exists. @@ -648,7 +654,7 @@ impl Instance { if !self.is_not_started() { return Err(Error::StartAlreadyRun); } - self.run_func(start, &[], false, None)?; + self.run_func(start, &[], None, None)?; } Ok(()) } @@ -1021,6 +1027,7 @@ impl Instance { entrypoint: None, resumed_val: None, terminate_on_heap_oom: false, + async_ctx: None, _padding: (), }; inst.set_globals_ptr(globals_ptr); @@ -1090,7 +1097,7 @@ impl Instance { &mut self, func: FunctionHandle, args: &[Val], - async_context: bool, + async_context: Option, inst_count_bound: Option, ) -> Result { let needs_start = self.state.is_not_started() && !func.is_start_func; @@ -1191,7 +1198,10 @@ impl Instance { /// This must only be called for an instance in a ready, non-fatally faulted, or yielded state, /// or in the not-started state on the start function. The public wrappers around this function /// should make sure the state is appropriate. - fn swap_and_return(&mut self, async_context: bool) -> Result { + fn swap_and_return<'a>( + &mut self, + async_context: Option, + ) -> Result { let is_start_func = self .entrypoint .expect("we always have an entrypoint by now") @@ -1203,7 +1213,10 @@ impl Instance { || self.state.is_yielded() || self.state.is_bound_expired() ); - self.state = State::Running { async_context }; + + self.async_ctx = async_context.map(|cx| Arc::new(cx)); + + self.state = State::Running; let res = self.with_current_instance(|i| { i.with_signals_on(|i| { @@ -1217,6 +1230,9 @@ impl Instance { }) }); + // remove async ctx + self.async_ctx.take(); + #[cfg(feature = "concurrent_testpoints")] self.lock_testpoints .instance_after_clearing_current_instance diff --git a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs index 97247b0e5..a2b53c2ca 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs @@ -2,7 +2,7 @@ use crate::instance::siginfo_ext::SiginfoExt; use crate::instance::{FaultDetails, TerminationDetails, YieldedVal}; use crate::sysdeps::UContext; use libc::{SIGBUS, SIGSEGV}; -use std::any::Any; +use std::any::TypeId; use std::ffi::{CStr, CString}; /// The representation of a Lucet instance's state machine. @@ -22,11 +22,7 @@ pub enum State { /// Transitions to `Ready` when the guest function returns normally, or to `Faulted`, /// `Terminating`, or `Yielding` if the instance faults, terminates, or yields, or to /// `BoundExpired` if the instance is run with an instruction bound and reaches it. - Running { - /// Indicates whether the instance is running in an async context (`Instance::run_async`) - /// or not. Needed by `Vmctx::block_on`. - async_context: bool, - }, + Running, /// The instance has faulted, potentially fatally. /// @@ -56,11 +52,8 @@ pub enum State { /// `RunResult` before anything else happens to the instance. Yielding { val: YieldedVal, - /// A phantom value carrying the type of the expected resumption value. - /// - /// Concretely, this should only ever be `Box>` where `R` is the type - /// the guest expects upon resumption. - expecting: Box, + /// The type of the expected resumption value + expecting: TypeId, }, /// The instance has yielded. @@ -69,10 +62,7 @@ pub enum State { /// instance is reset. Yielded { /// A phantom value carrying the type of the expected resumption value. - /// - /// Concretely, this should only ever be `Box>` where `R` is the type - /// the guest expects upon resumption. - expecting: Box, + expecting: TypeId, }, /// The instance has reached an instruction-count bound. @@ -96,12 +86,7 @@ impl std::fmt::Display for State { match self { State::NotStarted => write!(f, "not started"), State::Ready => write!(f, "ready"), - State::Running { - async_context: false, - } => write!(f, "running"), - State::Running { - async_context: true, - } => write!(f, "running (in async context)"), + State::Running => write!(f, "running"), State::Faulted { details, siginfo, .. } => { @@ -162,14 +147,6 @@ impl State { } } - pub fn is_running_async(&self) -> bool { - if let State::Running { async_context } = self { - *async_context - } else { - false - } - } - pub fn is_faulted(&self) -> bool { if let State::Faulted { .. } = self { true diff --git a/lucet-runtime/lucet-runtime-internals/src/vmctx.rs b/lucet-runtime/lucet-runtime-internals/src/vmctx.rs index 11e14937d..e7ef8bffa 100644 --- a/lucet-runtime/lucet-runtime-internals/src/vmctx.rs +++ b/lucet-runtime/lucet-runtime-internals/src/vmctx.rs @@ -13,10 +13,9 @@ use crate::instance::{ CURRENT_INSTANCE, HOST_CTX, }; use lucet_module::{FunctionHandle, GlobalValue}; -use std::any::Any; +use std::any::{Any, TypeId}; use std::borrow::{Borrow, BorrowMut}; use std::cell::{Ref, RefCell, RefMut}; -use std::marker::PhantomData; /// An opaque handle to a running instance's context. #[derive(Debug)] @@ -436,10 +435,9 @@ impl Vmctx { if is_bound_expiration { inst.state = State::BoundExpired; } else { - let expecting: Box> = Box::new(PhantomData); inst.state = State::Yielding { val: YieldedVal::new(val), - expecting: expecting as Box, + expecting: TypeId::of::(), }; } diff --git a/lucet-runtime/lucet-runtime-macros/src/lib.rs b/lucet-runtime/lucet-runtime-macros/src/lib.rs index 9cf89e230..7c91b7857 100644 --- a/lucet-runtime/lucet-runtime-macros/src/lib.rs +++ b/lucet-runtime/lucet-runtime-macros/src/lib.rs @@ -101,6 +101,17 @@ pub fn lucet_hostcall(_attr: TokenStream, item: TokenStream) -> TokenStream { quote! { lucet_runtime::TerminationDetails } }; + let res_ident = quote::format_ident!("res"); + + let block_if_async = match raw_sig.asyncness.take() { + Some(_) => { + quote! { let #res_ident = vmctx.block_on(#res_ident); } + } + None => { + quote! {} + } + }; + let raw_hostcall = quote! { #(#attrs)* #vis @@ -111,7 +122,13 @@ pub fn lucet_hostcall(_attr: TokenStream, item: TokenStream) -> TokenStream { let vmctx = #vmctx_mod::Vmctx::from_raw(vmctx_raw); #vmctx_mod::VmctxInternal::instance_mut(&vmctx).uninterruptable(|| { let res = std::panic::catch_unwind(move || { - #hostcall_ident(&#vmctx_mod::Vmctx::from_raw(vmctx_raw), #(#impl_args),*) + let vmctx = #vmctx_mod::Vmctx::from_raw(vmctx_raw); + + let #res_ident = #hostcall_ident(&vmctx, #(#impl_args),*); + + #block_if_async + + #res_ident }); match res { Ok(res) => res, diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json index f9837ac99..ce0eb173b 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json @@ -1,5 +1,8 @@ { "env": { - "hostcall_containing_block_on": "hostcall_containing_block_on" + "hostcall_containing_block_on": "hostcall_containing_block_on", + "hostcall_containing_yielding_block_on": "hostcall_containing_yielding_block_on", + "hostcall_async_containing_yielding_block_on": "hostcall_async_containing_yielding_block_on", + "await_manual_future": "await_manual_future" } } diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c index 846e12a0f..5b6422a70 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c @@ -1,9 +1,30 @@ #include extern void hostcall_containing_block_on(int); +extern void hostcall_containing_yielding_block_on(int); +extern int hostcall_async_containing_yielding_block_on(int, int); int main(void) { hostcall_containing_block_on(1312); return 0; } + +int yielding() +{ + hostcall_containing_yielding_block_on(0); + hostcall_containing_yielding_block_on(1); + hostcall_containing_yielding_block_on(2); + hostcall_containing_yielding_block_on(3); + + int six = hostcall_async_containing_yielding_block_on(3, 6); + hostcall_async_containing_yielding_block_on(3, six); + + return 0; +} + +int manual_future() +{ + await_manual_future(); + return 0; +} \ No newline at end of file diff --git a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs index 2af09d06b..cd23836c9 100644 --- a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs +++ b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs @@ -1,8 +1,53 @@ +use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +enum StubFutureInner { + NeverPolled, + Polled(Waker), + Ready, +} +#[derive(Clone)] +pub struct StubFuture(Arc>); +impl StubFuture { + pub fn new() -> Self { + StubFuture(Arc::new(Mutex::new(StubFutureInner::NeverPolled))) + } + pub fn make_ready(&self) { + let mut inner = self.0.lock().unwrap(); + match std::mem::replace(&mut *inner, StubFutureInner::Ready) { + StubFutureInner::Polled(waker) => { + waker.wake(); + } + _ => panic!("never polled"), + } + } +} + +impl Future for StubFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut inner = self.0.lock().unwrap(); + + match *inner { + StubFutureInner::Ready => Poll::Ready(()), + _ => { + *inner = StubFutureInner::Polled(cx.waker().clone()); + Poll::Pending + } + } + } +} + #[macro_export] macro_rules! async_hostcall_tests { ( $( $region_id:ident => $TestRegion:path ),* ) => { - use lucet_runtime::{vmctx::Vmctx, lucet_hostcall}; + use std::future::Future; + use std::task::{Waker, Context}; + use $crate::async_hostcall::StubFuture; + #[lucet_hostcall] #[no_mangle] @@ -11,13 +56,76 @@ macro_rules! async_hostcall_tests { assert_eq!(asynced_value, value); } + #[lucet_hostcall] + #[no_mangle] + pub fn hostcall_containing_yielding_block_on(vmctx: &Vmctx, times: u32) { + struct YieldingFuture { times: u32 } + + impl Future for YieldingFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + if self.times == 0 { + return std::task::Poll::Ready(()) + } else { + self.get_mut().times -= 1; + + cx.waker().wake_by_ref(); + + return std::task::Poll::Pending + } + } + } + + vmctx.block_on(YieldingFuture { times }); + } + + #[lucet_hostcall] + #[no_mangle] + pub async fn hostcall_async_containing_yielding_block_on(vmctx: &Vmctx, times: u32, times_double: u32) -> u32 { + assert_eq!(times * 2, times_double); + + struct YieldingFuture { times: u32 } + + impl Future for YieldingFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + if self.times == 0 { + return std::task::Poll::Ready(()) + } else { + self.get_mut().times -= 1; + + cx.waker().wake_by_ref(); + + return std::task::Poll::Pending + } + } + } + + for i in 0..times { + YieldingFuture { times: 2 }.await + } + + return times * 2; + } + + #[lucet_hostcall] + #[no_mangle] + pub async fn await_manual_future(vmctx: &Vmctx) { + vmctx.yield_(); + vmctx.get_embed_ctx_mut::>().take().unwrap().await; + } + $( mod $region_id { - use lucet_runtime::{DlModule, Error, Limits, Region, RegionCreate, TerminationDetails}; - use std::sync::Arc; + use lucet_runtime::{DlModule, Error, Limits, Region, RegionCreate, TerminationDetails, RunResult}; + use std::sync::{Arc}; use $TestRegion as TestRegion; use $crate::build::test_module_c; + use $crate::async_hostcall::StubFuture; + #[test] fn ensure_linked() { lucet_runtime::lucet_internal_ensure_linked(); @@ -45,10 +153,11 @@ macro_rules! async_hostcall_tests { futures_executor::block_on( inst.run_async( "main", - &[0u32.into(), 0i32.into()], - // Run with bounded execution to test its interaction with block_on - Some(1), - )); + &[0u32.into(), 0i32.into()] + ) + // Run with bounded execution to test its interaction with block_on + .bound_inst_count(1) + ); match correct_run_res { Ok(_) => {} // expected - UntypedRetVal is (), so no reason to inspect value _ => panic!( @@ -73,9 +182,8 @@ macro_rules! async_hostcall_tests { futures_executor::block_on( inst.run_async( "main", - &[0u32.into(), 0i32.into()], - Some(1), - )); + &[0u32.into(), 0i32.into()] + ).bound_inst_count(1)); match correct_run_res_2 { Ok(_) => {} // expected _ => panic!( @@ -83,8 +191,66 @@ macro_rules! async_hostcall_tests { correct_run_res_2 ), } + + let correct_run_res_3 = + futures_executor::block_on( + inst.run_async( + "yielding", + &[] + ).bound_inst_count(10)); + + match correct_run_res_3 { + Ok(_) => {} // expected + _ => panic!( + "run_async yielding should return successfully, got {:?}", + correct_run_res_3 + ), + } } + + #[test] + fn yield_from_within_future() { + let module = test_module_c("async_hostcall", "hostcall_block_on.c") + .expect("module compiled and loaded"); + let region = ::create(1, &Limits::default()) + .expect("region can be created"); + let mut inst = region + .new_instance(module) + .expect("instance can be created"); + + inst.run_start().expect("start section runs"); + + let manual_future = StubFuture::new(); + + inst.insert_embed_ctx(Some(manual_future.clone())); + + let run_res = + futures_executor::block_on( + inst.run_async( + "manual_future", + &[] + )); + + if let Ok(RunResult::Yielded(_)) = run_res { /* expected */ } else { panic!("did not yield"); } + + // The loop within try_block_on polled the future returned by await_manual_future, + // and the waker that will be passed to poll `manuall_future` is from the old + // executor. + // + // However, we yielded from the guest prior to polling manual_future, so we + // need to spawn a thread that will wake manual_future _after_ it has been polled. + std::thread::spawn(move || { + // the instance some time, so that it polls and blocks on manual_future + std::thread::sleep(std::time::Duration::from_millis(5)); + // wake the future manually. this will force us to miss the wakeup + manual_future.make_ready(); + }); + + let run_res = futures_executor::block_on(inst.resume_async()); + + if let Ok(RunResult::Returned(_)) = run_res { /* expected */ } else { panic!("did not return"); } + } } )* }; diff --git a/lucet-runtime/src/lib.rs b/lucet-runtime/src/lib.rs index a235c9788..c732e1495 100644 --- a/lucet-runtime/src/lib.rs +++ b/lucet-runtime/src/lib.rs @@ -406,6 +406,7 @@ pub mod c_api; pub use lucet_module::{PublicKey, TrapCode}; pub use lucet_runtime_internals::alloc::{AllocStrategy, Limits, DEFAULT_SIGNAL_STACK_SIZE}; pub use lucet_runtime_internals::error::Error; +pub use lucet_runtime_internals::future::RunAsync; pub use lucet_runtime_internals::instance::signals::{ install_lucet_signal_handler, remove_lucet_signal_handler, }; diff --git a/lucet-runtime/tests/instruction_counting.rs b/lucet-runtime/tests/instruction_counting.rs index 4a5e8d267..74a8697de 100644 --- a/lucet-runtime/tests/instruction_counting.rs +++ b/lucet-runtime/tests/instruction_counting.rs @@ -187,10 +187,12 @@ fn check_instruction_count_with_periodic_yields_internal(want_start_function: bo } let yields = if want_start_function { - let future = Box::pin(inst.run_async_start(Some(1000))); + let mut future = Box::pin(inst.run_async_start()); + future.bound_inst_count(1000); future_loop(future) } else { - let future = Box::pin(inst.run_async("test_function", &[], Some(1000))); + let mut future = Box::pin(inst.run_async("test_function", &[])); + future.bound_inst_count(1000); future_loop(future) };