Skip to content

Commit b727aa1

Browse files
authored
Move StoreContextMut::on_fiber to StoreOpaque::on_fiber (#10496)
This allows code that needs to run operations on a fiber, but which only has an untyped, opaque store, to do what it needs to do.
1 parent 6392f56 commit b727aa1

File tree

1 file changed

+160
-147
lines changed

1 file changed

+160
-147
lines changed

crates/wasmtime/src/runtime/store/async_.rs

Lines changed: 160 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -209,143 +209,6 @@ impl<T> StoreInner<T> {
209209

210210
#[doc(hidden)]
211211
impl StoreOpaque {
212-
#[cfg(feature = "gc")]
213-
pub async fn gc_async(&mut self) {
214-
assert!(
215-
self.async_support(),
216-
"cannot use `gc_async` without enabling async support in the config",
217-
);
218-
219-
// If the GC heap hasn't been initialized, there is nothing to collect.
220-
if self.gc_store.is_none() {
221-
return;
222-
}
223-
224-
log::trace!("============ Begin Async GC ===========");
225-
226-
// Take the GC roots out of `self` so we can borrow it mutably but still
227-
// call mutable methods on `self`.
228-
let mut roots = core::mem::take(&mut self.gc_roots_list);
229-
230-
self.trace_roots_async(&mut roots).await;
231-
self.unwrap_gc_store_mut()
232-
.gc_async(unsafe { roots.iter() })
233-
.await;
234-
235-
// Restore the GC roots for the next GC.
236-
roots.clear();
237-
self.gc_roots_list = roots;
238-
239-
log::trace!("============ End Async GC ===========");
240-
}
241-
242-
#[inline]
243-
#[cfg(not(feature = "gc"))]
244-
pub async fn gc_async(&mut self) {
245-
// Nothing to collect.
246-
//
247-
// Note that this is *not* a public method, this is just defined for the
248-
// crate-internal `StoreOpaque` type. This is a convenience so that we
249-
// don't have to `cfg` every call site.
250-
}
251-
252-
#[cfg(feature = "gc")]
253-
async fn trace_roots_async(&mut self, gc_roots_list: &mut crate::runtime::vm::GcRootsList) {
254-
use crate::runtime::vm::Yield;
255-
256-
log::trace!("Begin trace GC roots");
257-
258-
// We shouldn't have any leftover, stale GC roots.
259-
assert!(gc_roots_list.is_empty());
260-
261-
self.trace_wasm_stack_roots(gc_roots_list);
262-
Yield::new().await;
263-
self.trace_vmctx_roots(gc_roots_list);
264-
Yield::new().await;
265-
self.trace_user_roots(gc_roots_list);
266-
267-
log::trace!("End trace GC roots")
268-
}
269-
270-
/// Yields the async context, assuming that we are executing on a fiber and
271-
/// that fiber is not in the process of dying. This function will return
272-
/// None in the latter case (the fiber is dying), and panic if
273-
/// `async_support()` is false.
274-
#[inline]
275-
pub fn async_cx(&self) -> Option<AsyncCx> {
276-
assert!(self.async_support());
277-
278-
let poll_cx_box_ptr = self.async_state.current_poll_cx.get();
279-
if poll_cx_box_ptr.is_null() {
280-
return None;
281-
}
282-
283-
let poll_cx_inner_ptr = unsafe { *poll_cx_box_ptr };
284-
if poll_cx_inner_ptr.future_context.is_null() {
285-
return None;
286-
}
287-
288-
Some(AsyncCx {
289-
current_suspend: self.async_state.current_suspend.get(),
290-
current_poll_cx: unsafe { &raw mut (*poll_cx_box_ptr).future_context },
291-
track_pkey_context_switch: self.pkey.is_some(),
292-
})
293-
}
294-
295-
/// Yields execution to the caller on out-of-gas or epoch interruption.
296-
///
297-
/// This only works on async futures and stores, and assumes that we're
298-
/// executing on a fiber. This will yield execution back to the caller once.
299-
pub fn async_yield_impl(&mut self) -> Result<()> {
300-
use crate::runtime::vm::Yield;
301-
302-
let mut future = Yield::new();
303-
304-
// When control returns, we have a `Result<()>` passed
305-
// in from the host fiber. If this finished successfully then
306-
// we were resumed normally via a `poll`, so keep going. If
307-
// the future was dropped while we were yielded, then we need
308-
// to clean up this fiber. Do so by raising a trap which will
309-
// abort all wasm and get caught on the other side to clean
310-
// things up.
311-
unsafe {
312-
self.async_cx()
313-
.expect("attempted to pull async context during shutdown")
314-
.block_on(Pin::new_unchecked(&mut future))
315-
}
316-
}
317-
318-
fn allocate_fiber_stack(&mut self) -> Result<wasmtime_fiber::FiberStack> {
319-
if let Some(stack) = self.async_state.last_fiber_stack.take() {
320-
return Ok(stack);
321-
}
322-
self.engine().allocator().allocate_fiber_stack()
323-
}
324-
325-
fn deallocate_fiber_stack(&mut self, stack: wasmtime_fiber::FiberStack) {
326-
self.flush_fiber_stack();
327-
self.async_state.last_fiber_stack = Some(stack);
328-
}
329-
330-
/// Releases the last fiber stack to the underlying instance allocator, if
331-
/// present.
332-
pub fn flush_fiber_stack(&mut self) {
333-
if let Some(stack) = self.async_state.last_fiber_stack.take() {
334-
unsafe {
335-
self.engine.allocator().deallocate_fiber_stack(stack);
336-
}
337-
}
338-
}
339-
340-
pub(crate) fn async_guard_range(&self) -> Range<*mut u8> {
341-
unsafe {
342-
let ptr = self.async_state.current_poll_cx.get();
343-
(*ptr).guard_range_start..(*ptr).guard_range_end
344-
}
345-
}
346-
}
347-
348-
impl<T> StoreContextMut<'_, T> {
349212
/// Executes a synchronous computation `func` asynchronously on a new fiber.
350213
///
351214
/// This function will convert the synchronous `func` into an asynchronous
@@ -357,20 +220,17 @@ impl<T> StoreContextMut<'_, T> {
357220
/// that the various comments are illuminating as to what's going on here.
358221
pub(crate) async fn on_fiber<R>(
359222
&mut self,
360-
func: impl FnOnce(&mut StoreContextMut<'_, T>) -> R + Send,
361-
) -> Result<R>
362-
where
363-
T: Send,
364-
{
223+
func: impl FnOnce(&mut Self) -> R + Send,
224+
) -> Result<R> {
365225
let config = self.engine().config();
366-
debug_assert!(self.0.async_support());
226+
debug_assert!(self.async_support());
367227
debug_assert!(config.async_stack_size > 0);
368228

369229
let mut slot = None;
370230
let mut future = {
371-
let current_poll_cx = self.0.async_state.current_poll_cx.get();
372-
let current_suspend = self.0.async_state.current_suspend.get();
373-
let stack = self.0.allocate_fiber_stack()?;
231+
let current_poll_cx = self.async_state.current_poll_cx.get();
232+
let current_suspend = self.async_state.current_suspend.get();
233+
let stack = self.allocate_fiber_stack()?;
374234

375235
let engine = self.engine().clone();
376236
let slot = &mut slot;
@@ -411,7 +271,7 @@ impl<T> StoreContextMut<'_, T> {
411271
let stack = future.fiber.take().map(|f| f.into_stack());
412272
drop(future);
413273
if let Some(stack) = stack {
414-
self.0.deallocate_fiber_stack(stack);
274+
self.deallocate_fiber_stack(stack);
415275
}
416276

417277
return Ok(slot.unwrap());
@@ -645,6 +505,159 @@ impl<T> StoreContextMut<'_, T> {
645505
}
646506
}
647507
}
508+
509+
#[cfg(feature = "gc")]
510+
pub async fn gc_async(&mut self) {
511+
assert!(
512+
self.async_support(),
513+
"cannot use `gc_async` without enabling async support in the config",
514+
);
515+
516+
// If the GC heap hasn't been initialized, there is nothing to collect.
517+
if self.gc_store.is_none() {
518+
return;
519+
}
520+
521+
log::trace!("============ Begin Async GC ===========");
522+
523+
// Take the GC roots out of `self` so we can borrow it mutably but still
524+
// call mutable methods on `self`.
525+
let mut roots = core::mem::take(&mut self.gc_roots_list);
526+
527+
self.trace_roots_async(&mut roots).await;
528+
self.unwrap_gc_store_mut()
529+
.gc_async(unsafe { roots.iter() })
530+
.await;
531+
532+
// Restore the GC roots for the next GC.
533+
roots.clear();
534+
self.gc_roots_list = roots;
535+
536+
log::trace!("============ End Async GC ===========");
537+
}
538+
539+
#[inline]
540+
#[cfg(not(feature = "gc"))]
541+
pub async fn gc_async(&mut self) {
542+
// Nothing to collect.
543+
//
544+
// Note that this is *not* a public method, this is just defined for the
545+
// crate-internal `StoreOpaque` type. This is a convenience so that we
546+
// don't have to `cfg` every call site.
547+
}
548+
549+
#[cfg(feature = "gc")]
550+
async fn trace_roots_async(&mut self, gc_roots_list: &mut crate::runtime::vm::GcRootsList) {
551+
use crate::runtime::vm::Yield;
552+
553+
log::trace!("Begin trace GC roots");
554+
555+
// We shouldn't have any leftover, stale GC roots.
556+
assert!(gc_roots_list.is_empty());
557+
558+
self.trace_wasm_stack_roots(gc_roots_list);
559+
Yield::new().await;
560+
self.trace_vmctx_roots(gc_roots_list);
561+
Yield::new().await;
562+
self.trace_user_roots(gc_roots_list);
563+
564+
log::trace!("End trace GC roots")
565+
}
566+
567+
/// Yields the async context, assuming that we are executing on a fiber and
568+
/// that fiber is not in the process of dying. This function will return
569+
/// None in the latter case (the fiber is dying), and panic if
570+
/// `async_support()` is false.
571+
#[inline]
572+
pub fn async_cx(&self) -> Option<AsyncCx> {
573+
assert!(self.async_support());
574+
575+
let poll_cx_box_ptr = self.async_state.current_poll_cx.get();
576+
if poll_cx_box_ptr.is_null() {
577+
return None;
578+
}
579+
580+
let poll_cx_inner_ptr = unsafe { *poll_cx_box_ptr };
581+
if poll_cx_inner_ptr.future_context.is_null() {
582+
return None;
583+
}
584+
585+
Some(AsyncCx {
586+
current_suspend: self.async_state.current_suspend.get(),
587+
current_poll_cx: unsafe { &raw mut (*poll_cx_box_ptr).future_context },
588+
track_pkey_context_switch: self.pkey.is_some(),
589+
})
590+
}
591+
592+
/// Yields execution to the caller on out-of-gas or epoch interruption.
593+
///
594+
/// This only works on async futures and stores, and assumes that we're
595+
/// executing on a fiber. This will yield execution back to the caller once.
596+
pub fn async_yield_impl(&mut self) -> Result<()> {
597+
use crate::runtime::vm::Yield;
598+
599+
let mut future = Yield::new();
600+
601+
// When control returns, we have a `Result<()>` passed
602+
// in from the host fiber. If this finished successfully then
603+
// we were resumed normally via a `poll`, so keep going. If
604+
// the future was dropped while we were yielded, then we need
605+
// to clean up this fiber. Do so by raising a trap which will
606+
// abort all wasm and get caught on the other side to clean
607+
// things up.
608+
unsafe {
609+
self.async_cx()
610+
.expect("attempted to pull async context during shutdown")
611+
.block_on(Pin::new_unchecked(&mut future))
612+
}
613+
}
614+
615+
fn allocate_fiber_stack(&mut self) -> Result<wasmtime_fiber::FiberStack> {
616+
if let Some(stack) = self.async_state.last_fiber_stack.take() {
617+
return Ok(stack);
618+
}
619+
self.engine().allocator().allocate_fiber_stack()
620+
}
621+
622+
fn deallocate_fiber_stack(&mut self, stack: wasmtime_fiber::FiberStack) {
623+
self.flush_fiber_stack();
624+
self.async_state.last_fiber_stack = Some(stack);
625+
}
626+
627+
/// Releases the last fiber stack to the underlying instance allocator, if
628+
/// present.
629+
pub fn flush_fiber_stack(&mut self) {
630+
if let Some(stack) = self.async_state.last_fiber_stack.take() {
631+
unsafe {
632+
self.engine.allocator().deallocate_fiber_stack(stack);
633+
}
634+
}
635+
}
636+
637+
pub(crate) fn async_guard_range(&self) -> Range<*mut u8> {
638+
unsafe {
639+
let ptr = self.async_state.current_poll_cx.get();
640+
(*ptr).guard_range_start..(*ptr).guard_range_end
641+
}
642+
}
643+
}
644+
645+
impl<T> StoreContextMut<'_, T> {
646+
/// Executes a synchronous computation `func` asynchronously on a new fiber.
647+
pub(crate) async fn on_fiber<R>(
648+
&mut self,
649+
func: impl FnOnce(&mut StoreContextMut<'_, T>) -> R + Send,
650+
) -> Result<R>
651+
where
652+
T: Send,
653+
{
654+
self.0
655+
.on_fiber(|opaque| {
656+
let store = unsafe { opaque.traitobj().cast::<StoreInner<T>>().as_mut() };
657+
func(&mut StoreContextMut(store))
658+
})
659+
.await
660+
}
648661
}
649662

650663
pub struct AsyncCx {

0 commit comments

Comments
 (0)