Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/wasmtime/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,13 @@ impl Engine {
crate::runtime::vm::tls_eager_initialize();
}

/// Returns a [`PoolingAllocatorMetrics`] if this engine was configured with
/// [`InstanceAllocationStrategy::Pooling`].
#[cfg(feature = "pooling-allocator")]
pub fn pooling_allocator_metrics(&self) -> Option<crate::vm::PoolingAllocatorMetrics> {
crate::runtime::vm::PoolingAllocatorMetrics::new(self)
}

pub(crate) fn allocator(&self) -> &dyn crate::runtime::vm::InstanceAllocator {
self.inner.allocator.as_ref()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/wasmtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub use values::*;
pub(crate) use uninhabited::*;

#[cfg(feature = "pooling-allocator")]
pub use vm::PoolConcurrencyLimitError;
pub use vm::{PoolConcurrencyLimitError, PoolingAllocatorMetrics};

#[cfg(feature = "profiling")]
mod profiling;
Expand Down
2 changes: 1 addition & 1 deletion crates/wasmtime/src/runtime/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub use crate::runtime::vm::instance::{
};
#[cfg(feature = "pooling-allocator")]
pub use crate::runtime::vm::instance::{
InstanceLimits, PoolConcurrencyLimitError, PoolingInstanceAllocator,
InstanceLimits, PoolConcurrencyLimitError, PoolingAllocatorMetrics, PoolingInstanceAllocator,
PoolingInstanceAllocatorConfig,
};
pub use crate::runtime::vm::interpreter::*;
Expand Down
8 changes: 7 additions & 1 deletion crates/wasmtime/src/runtime/vm/instance/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use self::on_demand::OnDemandInstanceAllocator;
mod pooling;
#[cfg(feature = "pooling-allocator")]
pub use self::pooling::{
InstanceLimits, PoolConcurrencyLimitError, PoolingInstanceAllocator,
InstanceLimits, PoolConcurrencyLimitError, PoolingAllocatorMetrics, PoolingInstanceAllocator,
PoolingInstanceAllocatorConfig,
};

Expand Down Expand Up @@ -286,6 +286,12 @@ pub unsafe trait InstanceAllocator: Send + Sync {

/// Allow access to memory regions protected by any protection key.
fn allow_all_pkeys(&self);

/// Returns `Some(&PoolingInstanceAllocator)` if this is one.
#[cfg(feature = "pooling-allocator")]
fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
None
}
}

impl dyn InstanceAllocator + '_ {
Expand Down
86 changes: 60 additions & 26 deletions crates/wasmtime/src/runtime/vm/instance/allocator/pooling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mod decommit_queue;
mod index_allocator;
mod memory_pool;
mod metrics;
mod table_pool;

#[cfg(feature = "gc")]
Expand Down Expand Up @@ -54,6 +55,7 @@ use crate::runtime::vm::{
mpk::{self, ProtectionKey, ProtectionMask},
sys::vm::PageMap,
};
use core::sync::atomic::AtomicUsize;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::{Mutex, MutexGuard};
Expand All @@ -65,6 +67,8 @@ use wasmtime_environ::{
DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
};

pub use self::metrics::PoolingAllocatorMetrics;

#[cfg(feature = "gc")]
use super::GcHeapAllocationIndex;
#[cfg(feature = "gc")]
Expand Down Expand Up @@ -305,8 +309,12 @@ pub struct PoolingInstanceAllocator {
live_component_instances: AtomicU64,

decommit_queue: Mutex<DecommitQueue>,

memories: MemoryPool,
live_memories: AtomicUsize,

tables: TablePool,
live_tables: AtomicUsize,

#[cfg(feature = "gc")]
gc_heaps: GcHeapPool,
Expand Down Expand Up @@ -335,6 +343,8 @@ impl Drop for PoolingInstanceAllocator {

debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);

debug_assert!(self.memories.is_empty());
debug_assert!(self.tables.is_empty());
Expand All @@ -357,7 +367,9 @@ impl PoolingInstanceAllocator {
live_core_instances: AtomicU64::new(0),
decommit_queue: Mutex::new(DecommitQueue::default()),
memories: MemoryPool::new(config, tunables)?,
live_memories: AtomicUsize::new(0),
tables: TablePool::new(config)?,
live_tables: AtomicUsize::new(0),
#[cfg(feature = "gc")]
gc_heaps: GcHeapPool::new(config)?,
#[cfg(feature = "async")]
Expand Down Expand Up @@ -652,23 +664,29 @@ unsafe impl InstanceAllocator for PoolingInstanceAllocator {
ty: &wasmtime_environ::Memory,
memory_index: Option<DefinedMemoryIndex>,
) -> Result<(MemoryAllocationIndex, Memory)> {
// FIXME(rust-lang/rust#145127) this should ideally use a version of
// `with_flush_and_retry` but adapted for async closures instead of only
// sync closures. Right now that won't compile though so this is the
// manually expanded version of the method.
let e = match self.memories.allocate(request, ty, memory_index).await {
Ok(result) => return Ok(result),
Err(e) => e,
};
async {
// FIXME(rust-lang/rust#145127) this should ideally use a version of
// `with_flush_and_retry` but adapted for async closures instead of only
// sync closures. Right now that won't compile though so this is the
// manually expanded version of the method.
let e = match self.memories.allocate(request, ty, memory_index).await {
Ok(result) => return Ok(result),
Err(e) => e,
};

if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return self.memories.allocate(request, ty, memory_index).await;
if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return self.memories.allocate(request, ty, memory_index).await;
}
}
}

Err(e)
Err(e)
}
.await
.inspect(|_| {
self.live_memories.fetch_add(1, Ordering::Relaxed);
})
}

unsafe fn deallocate_memory(
Expand All @@ -677,6 +695,9 @@ unsafe impl InstanceAllocator for PoolingInstanceAllocator {
allocation_index: MemoryAllocationIndex,
memory: Memory,
) {
let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
debug_assert!(prev > 0);

// Reset the image slot. If there is any error clearing the
// image, just drop it here, and let the drop handler for the
// slot unmap in a way that retains the address space
Expand Down Expand Up @@ -712,21 +733,27 @@ unsafe impl InstanceAllocator for PoolingInstanceAllocator {
ty: &wasmtime_environ::Table,
_table_index: DefinedTableIndex,
) -> Result<(super::TableAllocationIndex, Table)> {
// FIXME: see `allocate_memory` above for comments about duplication
// with `with_flush_and_retry`.
let e = match self.tables.allocate(request, ty).await {
Ok(result) => return Ok(result),
Err(e) => e,
};
async {
// FIXME: see `allocate_memory` above for comments about duplication
// with `with_flush_and_retry`.
let e = match self.tables.allocate(request, ty).await {
Ok(result) => return Ok(result),
Err(e) => e,
};

if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return self.tables.allocate(request, ty).await;
if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return self.tables.allocate(request, ty).await;
}
}
}

Err(e)
Err(e)
}
.await
.inspect(|_| {
self.live_tables.fetch_add(1, Ordering::Relaxed);
})
}

unsafe fn deallocate_table(
Expand All @@ -735,6 +762,9 @@ unsafe impl InstanceAllocator for PoolingInstanceAllocator {
allocation_index: TableAllocationIndex,
mut table: Table,
) {
let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
debug_assert!(prev > 0);

let mut queue = DecommitQueue::default();
// SAFETY: This table is no longer in use by the allocator when this
// method is called and additionally all image ranges are pushed with
Expand Down Expand Up @@ -816,6 +846,10 @@ unsafe impl InstanceAllocator for PoolingInstanceAllocator {
) -> (MemoryAllocationIndex, Memory) {
self.gc_heaps.deallocate(allocation_index, gc_heap)
}

fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
Some(self)
}
}

#[cfg(test)]
Expand Down
115 changes: 115 additions & 0 deletions crates/wasmtime/src/runtime/vm/instance/allocator/pooling/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use core::sync::atomic::Ordering;

use crate::{Engine, vm::PoolingInstanceAllocator};

/// `PoolingAllocatorMetrics` provides access to runtime metrics of a pooling
/// allocator configured with [`crate::InstanceAllocationStrategy::Pooling`].
///
/// This is a cheap cloneable handle which can be obtained with
/// [`Engine::pooling_allocator_metrics`].
#[derive(Clone)]
pub struct PoolingAllocatorMetrics {
engine: Engine,
}

impl PoolingAllocatorMetrics {
pub(crate) fn new(engine: &Engine) -> Option<Self> {
engine.allocator().as_pooling().map(|_| Self {
engine: engine.clone(),
})
}

/// Returns the number of core (module) instances currently allocated.
pub fn core_instances(&self) -> u64 {
self.allocator().live_core_instances.load(Ordering::Relaxed)
}

/// Returns the number of component instances currently allocated.
pub fn component_instances(&self) -> u64 {
self.allocator()
.live_component_instances
.load(Ordering::Relaxed)
}

/// Returns the number of WebAssembly memories currently allocated.
pub fn memories(&self) -> usize {
self.allocator().live_memories.load(Ordering::Relaxed)
}

/// Returns the number of WebAssembly tables currently allocated.
pub fn tables(&self) -> usize {
self.allocator().live_tables.load(Ordering::Relaxed)
}

fn allocator(&self) -> &PoolingInstanceAllocator {
self.engine
.allocator()
.as_pooling()
.expect("engine should have pooling allocator")
}
}

#[cfg(test)]
mod tests {
use crate::{
Config, InstanceAllocationStrategy, Store,
component::{Component, Linker},
};

use super::*;

// A component with 1 core instance, 1 memory, 1 table
const TEST_COMPONENT: &[u8] = b"
(component
(core module $m
(memory 1)
(table 1 funcref)
)
(core instance (instantiate (module $m)))
)
";

#[test]
#[cfg_attr(miri, ignore)]
fn smoke_test() {
// Start with nothing
let engine =
Engine::new(&Config::new().allocation_strategy(InstanceAllocationStrategy::pooling()))
.unwrap();
let metrics = engine.pooling_allocator_metrics().unwrap();

assert_eq!(metrics.core_instances(), 0);
assert_eq!(metrics.component_instances(), 0);
assert_eq!(metrics.memories(), 0);
assert_eq!(metrics.tables(), 0);

// Instantiate one of each
let mut store = Store::new(&engine, ());
let component = Component::new(&engine, TEST_COMPONENT).unwrap();
let linker = Linker::new(&engine);
let instance = linker.instantiate(&mut store, &component).unwrap();

assert_eq!(metrics.core_instances(), 1);
assert_eq!(metrics.component_instances(), 1);
assert_eq!(metrics.memories(), 1);
assert_eq!(metrics.tables(), 1);

// Back to nothing
let _ = (instance, store);

assert_eq!(metrics.core_instances(), 0);
assert_eq!(metrics.component_instances(), 0);
assert_eq!(metrics.memories(), 0);
assert_eq!(metrics.tables(), 0);
}

#[test]
fn test_non_pooling_allocator() {
let engine =
Engine::new(&Config::new().allocation_strategy(InstanceAllocationStrategy::OnDemand))
.unwrap();

let maybe_metrics = engine.pooling_allocator_metrics();
assert!(maybe_metrics.is_none());
}
}