diff --git a/Cargo.lock b/Cargo.lock index 45df23c9b941..b3ba92fb883b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4457,6 +4457,7 @@ dependencies = [ "target-lexicon", "tempfile", "tokio", + "tracing-subscriber", "wasm-encoder", "wasm-wave", "wasmparser 0.239.0", @@ -4565,6 +4566,7 @@ dependencies = [ "tokio", "toml", "tracing", + "tracing-subscriber", "walkdir", "wasi-common", "wasm-encoder", diff --git a/Cargo.toml b/Cargo.toml index 21839c551ee5..eb496e36e9c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,10 @@ wasmtime-cranelift = { workspace = true, optional = true } wasmtime-environ = { workspace = true } wasmtime-explorer = { workspace = true, optional = true } wasmtime-wast = { workspace = true, optional = true } -wasi-common = { workspace = true, default-features = true, features = ["exit", "tokio"], optional = true } +wasi-common = { workspace = true, default-features = true, features = [ + "exit", + "tokio", +], optional = true } wasmtime-wasi = { workspace = true, default-features = true, optional = true } wasmtime-wasi-nn = { workspace = true, optional = true } wasmtime-wasi-config = { workspace = true, optional = true } @@ -82,22 +85,36 @@ pulley-interpreter = { workspace = true, optional = true } async-trait = { workspace = true } bytes = { workspace = true } cfg-if = { workspace = true } -tokio = { workspace = true, optional = true, features = [ "signal", "macros" ] } +tokio = { workspace = true, optional = true, features = ["signal", "macros"] } hyper = { workspace = true, optional = true } http = { workspace = true, optional = true } http-body-util = { workspace = true, optional = true } +tracing-subscriber.workspace = true [target.'cfg(unix)'.dependencies] rustix = { workspace = true, features = ["mm", "process"] } [dev-dependencies] # depend again on wasmtime to activate its default features for tests -wasmtime = { workspace = true, features = ['default', 'winch', 'pulley', 'all-arch', 'call-hook', 'memory-protection-keys', 'component-model-async'] } +wasmtime = { workspace = true, features = [ + 'default', + 'winch', + 'pulley', + 'all-arch', + 'call-hook', + 'memory-protection-keys', + 'component-model-async', +] } env_logger = { workspace = true } log = { workspace = true } filecheck = { workspace = true } tempfile = { workspace = true } -tokio = { workspace = true, features = ["rt", "time", "macros", "rt-multi-thread"] } +tokio = { workspace = true, features = [ + "rt", + "time", + "macros", + "rt-multi-thread", +] } wast = { workspace = true } criterion = { workspace = true } num_cpus = "1.13.0" @@ -107,7 +124,10 @@ wat = { workspace = true } rayon = "1.5.0" wasmtime-wast = { workspace = true, features = ['component-model'] } wasmtime-component-util = { workspace = true } -wasmtime-test-util = { workspace = true, features = ['wasmtime-wast', 'component'] } +wasmtime-test-util = { workspace = true, features = [ + 'wasmtime-wast', + 'component', +] } bstr = "1.6.0" libc = { workspace = true } serde = { workspace = true } @@ -117,7 +137,11 @@ test-programs-artifacts = { workspace = true } bytesize = "2.0.1" wit-component = { workspace = true } cranelift-filetests = { workspace = true } -cranelift-codegen = { workspace = true, features = ["disas", "trace-log", "timing"] } +cranelift-codegen = { workspace = true, features = [ + "disas", + "trace-log", + "timing", +] } cranelift-reader = { workspace = true } toml = { workspace = true } similar = { workspace = true } @@ -170,9 +194,7 @@ members = [ "fuzz", "winch/codegen", ] -exclude = [ - 'docs/rust_wasi_markdown_parser', -] +exclude = ['docs/rust_wasi_markdown_parser'] [workspace.package] version = "38.0.0" @@ -253,8 +275,8 @@ wasmtime-wmemcheck = { path = "crates/wmemcheck", version = "=38.0.0", package = wasmtime-c-api-macros = { path = "crates/c-api-macros", version = "=38.0.0", package = 'wasmtime-internal-c-api-macros' } wasmtime-cache = { path = "crates/cache", version = "=38.0.0", package = 'wasmtime-internal-cache' } wasmtime-cranelift = { path = "crates/cranelift", version = "=38.0.0", package = 'wasmtime-internal-cranelift' } -wasmtime-winch = { path = "crates/winch", version = "=38.0.0", package = 'wasmtime-internal-winch' } -wasmtime-explorer = { path = "crates/explorer", version = "=38.0.0", package = 'wasmtime-internal-explorer' } +wasmtime-winch = { path = "crates/winch", version = "=38.0.0", package = 'wasmtime-internal-winch' } +wasmtime-explorer = { path = "crates/explorer", version = "=38.0.0", package = 'wasmtime-internal-explorer' } wasmtime-fiber = { path = "crates/fiber", version = "=38.0.0", package = 'wasmtime-internal-fiber' } wasmtime-jit-debug = { path = "crates/jit-debug", version = "=38.0.0", package = 'wasmtime-internal-jit-debug' } wasmtime-component-util = { path = "crates/component-util", version = "=38.0.0", package = 'wasmtime-internal-component-util' } @@ -263,7 +285,7 @@ wasmtime-versioned-export-macros = { path = "crates/versioned-export-macros", ve wasmtime-slab = { path = "crates/slab", version = "=38.0.0", package = 'wasmtime-internal-slab' } wasmtime-jit-icache-coherence = { path = "crates/jit-icache-coherence", version = "=38.0.0", package = 'wasmtime-internal-jit-icache-coherence' } wasmtime-wit-bindgen = { path = "crates/wit-bindgen", version = "=38.0.0", package = 'wasmtime-internal-wit-bindgen' } -wasmtime-math = { path = "crates/math", version = "=38.0.0", package = 'wasmtime-internal-math' } +wasmtime-math = { path = "crates/math", version = "=38.0.0", package = 'wasmtime-internal-math' } wasmtime-unwinder = { path = "crates/unwinder", version = "=38.0.0", package = 'wasmtime-internal-unwinder' } # Miscellaneous crates without a `wasmtime-*` prefix in their name but still @@ -277,7 +299,10 @@ pulley-macros = { path = 'pulley/macros', version = "=38.0.0" } # Cranelift crates in this workspace cranelift-assembler-x64 = { path = "cranelift/assembler-x64", version = "0.125.0" } -cranelift-codegen = { path = "cranelift/codegen", version = "0.125.0", default-features = false, features = ["std", "unwind"] } +cranelift-codegen = { path = "cranelift/codegen", version = "0.125.0", default-features = false, features = [ + "std", + "unwind", +] } cranelift-frontend = { path = "cranelift/frontend", version = "0.125.0" } cranelift-entity = { path = "cranelift/entity", version = "0.125.0" } cranelift-native = { path = "cranelift/native", version = "0.125.0" } @@ -328,7 +353,9 @@ wit-bindgen = { version = "0.46.0", default-features = false } wit-bindgen-rust-macro = { version = "0.46.0", default-features = false } # wasm-tools family: -wasmparser = { version = "0.239.0", default-features = false, features = ['simd'] } +wasmparser = { version = "0.239.0", default-features = false, features = [ + 'simd', +] } wat = "1.239.0" wast = "239.0.0" wasmprinter = "0.239.0" @@ -346,14 +373,20 @@ json-from-wast = "0.239.0" arbitrary = "1.4.0" mutatis = "0.3.2" cc = "1.0" -object = { version = "0.37.3", default-features = false, features = ['read_core', 'elf'] } +object = { version = "0.37.3", default-features = false, features = [ + 'read_core', + 'elf', +] } gimli = { version = "0.32.0", default-features = false, features = ['read'] } addr2line = { version = "0.25.0", default-features = false } anyhow = { version = "1.0.93", default-features = false } windows-sys = "0.60.0" env_logger = "0.11.5" log = { version = "0.4.27", default-features = false } -clap = { version = "4.5.17", default-features = false, features = ["std", "derive"] } +clap = { version = "4.5.17", default-features = false, features = [ + "std", + "derive", +] } clap_complete = "4.4.7" hashbrown = { version = "0.15", default-features = false } capstone = "0.13.0" @@ -382,7 +415,7 @@ tempfile = "3.21.0" filecheck = "0.5.0" libc = { version = "0.2.112", default-features = true } file-per-thread-logger = "0.2.0" -tokio = { version = "1.43.0", features = [ "rt", "time" ] } +tokio = { version = "1.43.0", features = ["rt", "time"] } hyper = "1.0.1" http = "1.0.0" http-body = "1.0.0" @@ -394,10 +427,18 @@ syn = "2.0.25" quote = "1.0" proc-macro2 = "1.0" test-log = { version = "0.2", default-features = false, features = ["trace"] } -tracing-subscriber = { version = "0.3.1", default-features = false, features = ['fmt', 'env-filter', 'ansi', 'tracing-log'] } +tracing-subscriber = { version = "0.3.1", default-features = false, features = [ + 'fmt', + 'env-filter', + 'ansi', + 'tracing-log', +] } url = "2.3.1" postcard = { version = "1.0.8", default-features = false, features = ['alloc'] } -criterion = { version = "0.6.0", default-features = false, features = ["html_reports", "rayon"] } +criterion = { version = "0.6.0", default-features = false, features = [ + "html_reports", + "rayon", +] } rustc-hash = "2.0.0" libtest-mimic = "0.8.1" semver = { version = "1.0.17", default-features = false } @@ -505,14 +546,23 @@ disable-logging = ["log/max_level_off", "tracing/max_level_off"] wasi-nn = ["dep:wasmtime-wasi-nn"] wasi-tls = ["dep:wasmtime-wasi-tls"] wasi-threads = ["dep:wasmtime-wasi-threads", "threads"] -wasi-http = ["component-model", "dep:wasmtime-wasi-http", "dep:tokio", "dep:hyper", "wasmtime-wasi-http/default-send-request"] +wasi-http = [ + "component-model", + "dep:wasmtime-wasi-http", + "dep:tokio", + "dep:hyper", + "wasmtime-wasi-http/default-send-request", +] wasi-config = ["dep:wasmtime-wasi-config"] wasi-keyvalue = ["dep:wasmtime-wasi-keyvalue"] -pooling-allocator = ["wasmtime/pooling-allocator", "wasmtime-cli-flags/pooling-allocator"] +pooling-allocator = [ + "wasmtime/pooling-allocator", + "wasmtime-cli-flags/pooling-allocator", +] component-model = [ "wasmtime/component-model", "wasmtime-wast?/component-model", - "wasmtime-cli-flags/component-model" + "wasmtime-cli-flags/component-model", ] wat = ["dep:wat", "wasmtime/wat"] cache = ["dep:wasmtime-cache", "wasmtime-cli-flags/cache"] @@ -529,7 +579,10 @@ gc = ["wasmtime-cli-flags/gc", "wasmtime/gc"] gc-drc = ["gc", "wasmtime/gc-drc", "wasmtime-cli-flags/gc-drc"] gc-null = ["gc", "wasmtime/gc-null", "wasmtime-cli-flags/gc-null"] pulley = ["wasmtime-cli-flags/pulley"] -stack-switching = ["wasmtime/stack-switching", "wasmtime-cli-flags/stack-switching"] +stack-switching = [ + "wasmtime/stack-switching", + "wasmtime-cli-flags/stack-switching", +] # CLI subcommands for the `wasmtime` executable. See `wasmtime $cmd --help` # for more information on each subcommand. diff --git a/crates/cli-flags/src/lib.rs b/crates/cli-flags/src/lib.rs index f0cea981c550..243d07c9df09 100644 --- a/crates/cli-flags/src/lib.rs +++ b/crates/cli-flags/src/lib.rs @@ -381,6 +381,9 @@ wasmtime_option_group! { /// Component model support for async lifting/lowering: this corresponds /// to the ๐ŸšŸ emoji in the component model specification. pub component_model_async_stackful: Option, + /// Component model support for threading: this corresponds + /// to the ๐Ÿงต emoji in the component model specification. + pub component_model_threading: Option, /// Component model support for `error-context`: this corresponds /// to the ๐Ÿ“ emoji in the component model specification. pub component_model_error_context: Option, @@ -1052,6 +1055,7 @@ impl CommonOptions { ("component-model-async", component_model_async, wasm_component_model_async) ("component-model-async", component_model_async_builtins, wasm_component_model_async_builtins) ("component-model-async", component_model_async_stackful, wasm_component_model_async_stackful) + ("component-model-async", component_model_threading, wasm_component_model_threading) ("component-model", component_model_error_context, wasm_component_model_error_context) ("threads", threads, wasm_threads) ("gc", gc, wasm_gc) diff --git a/crates/cranelift/src/compiler/component.rs b/crates/cranelift/src/compiler/component.rs index 50df4c20ebb5..401401c5b387 100644 --- a/crates/cranelift/src/compiler/component.rs +++ b/crates/cranelift/src/compiler/component.rs @@ -789,6 +789,92 @@ impl<'a> TrampolineCompiler<'a> { }, ); } + Trampoline::ThreadIndex => { + self.translate_libcall( + host::thread_index, + TrapSentinel::NegativeOne, + WasmArgs::InRegisters, + |_, _| {}, + ); + } + Trampoline::ThreadNewIndirect { + instance, + start_func_table_idx, + start_func_ty_idx, + } => { + self.translate_libcall( + host::thread_new_indirect, + TrapSentinel::NegativeOne, + WasmArgs::InRegisters, + |me, params| { + params.push(me.index_value(*instance)); + params.push(me.index_value(*start_func_table_idx)); + params.push(me.index_value(*start_func_ty_idx)); + }, + ); + } + Trampoline::ThreadSwitchTo { + instance, + cancellable, + } => { + self.translate_libcall( + host::thread_switch_to, + TrapSentinel::NegativeOne, + WasmArgs::InRegisters, + |me, params| { + params.push(me.index_value(*instance)); + params.push( + me.builder + .ins() + .iconst(ir::types::I8, i64::from(*cancellable)), + ); + }, + ); + } + Trampoline::ThreadSuspend { + instance, + cancellable, + } => { + self.translate_libcall( + host::thread_suspend, + TrapSentinel::NegativeOne, + WasmArgs::InRegisters, + |me, params| { + params.push(me.index_value(*instance)); + params.push( + me.builder + .ins() + .iconst(ir::types::I8, i64::from(*cancellable)), + ); + }, + ); + } + Trampoline::ThreadResumeLater => { + self.translate_libcall( + host::thread_resume_later, + TrapSentinel::Falsy, + WasmArgs::InRegisters, + |_, _| {}, + ); + } + Trampoline::ThreadYieldTo { + instance, + cancellable, + } => { + self.translate_libcall( + host::thread_yield_to, + TrapSentinel::NegativeOne, + WasmArgs::InRegisters, + |me, params| { + params.push(me.index_value(*instance)); + params.push( + me.builder + .ins() + .iconst(ir::types::I8, i64::from(*cancellable)), + ); + }, + ); + } } } diff --git a/crates/environ/src/component.rs b/crates/environ/src/component.rs index 3c7087728a38..dec960fee091 100644 --- a/crates/environ/src/component.rs +++ b/crates/environ/src/component.rs @@ -187,6 +187,18 @@ macro_rules! foreach_builtin_component_function { context_get(vmctx: vmctx, caller_instance: u32, slot: u32) -> u64; #[cfg(feature = "component-model-async")] context_set(vmctx: vmctx, caller_instance: u32, slot: u32, val: u32) -> bool; + #[cfg(feature = "component-model-async")] + thread_index(vmctx: vmctx) -> u64; + #[cfg(feature = "component-model-async")] + thread_new_indirect(vmctx: vmctx, caller_instance: u32, func_ty_id: u32, func_table_idx: u32, func_idx: u32, context: u32) -> u64; + #[cfg(feature = "component-model-async")] + thread_switch_to(vmctx: vmctx, caller_instance: u32, cancellable: u8, thread_idx: u32) -> u32; + #[cfg(feature = "component-model-async")] + thread_suspend(vmctx: vmctx, caller_instance: u32, cancellable: u8) -> u32; + #[cfg(feature = "component-model-async")] + thread_resume_later(vmctx: vmctx, thread_idx: u32) -> bool; + #[cfg(feature = "component-model-async")] + thread_yield_to(vmctx: vmctx, caller_instance: u32, cancellable: u8, thread_idx: u32) -> u32; trap(vmctx: vmctx, code: u8) -> bool; diff --git a/crates/environ/src/component/dfg.rs b/crates/environ/src/component/dfg.rs index a85a80cea066..bb3f9cdcef39 100644 --- a/crates/environ/src/component/dfg.rs +++ b/crates/environ/src/component/dfg.rs @@ -64,9 +64,12 @@ pub struct ComponentDfg { /// Same as `reallocs`, but for post-return. pub post_returns: Intern, - /// Same as `reallocs`, but for post-return. + /// Same as `reallocs`, but for memories. pub memories: Intern>, + /// Same as `reallocs`, but for tables. + pub tables: Intern>, + /// Metadata about identified fused adapters. /// /// Note that this list is required to be populated in-order where the @@ -476,6 +479,25 @@ pub enum Trampoline { instance: RuntimeComponentInstanceIndex, slot: u32, }, + ThreadIndex, + ThreadNewIndirect { + instance: RuntimeComponentInstanceIndex, + start_func_ty_idx: ComponentTypeIndex, + start_func_table_id: TableId, + }, + ThreadSwitchTo { + instance: RuntimeComponentInstanceIndex, + cancellable: bool, + }, + ThreadSuspend { + instance: RuntimeComponentInstanceIndex, + cancellable: bool, + }, + ThreadResumeLater, + ThreadYieldTo { + instance: RuntimeComponentInstanceIndex, + cancellable: bool, + }, } #[derive(Copy, Clone, Hash, Eq, PartialEq)] @@ -823,6 +845,15 @@ impl LinearizeDfg<'_> { ) } + fn runtime_table(&mut self, table: TableId) -> RuntimeTableIndex { + self.intern( + table, + |me| &mut me.runtime_tables, + |me, table| me.core_export(&me.dfg.tables[table]), + |index, export| GlobalInitializer::ExtractTable(ExtractTable { index, export }), + ) + } + fn runtime_realloc(&mut self, realloc: ReallocId) -> RuntimeReallocIndex { self.intern( realloc, @@ -1118,6 +1149,38 @@ impl LinearizeDfg<'_> { instance: *instance, slot: *slot, }, + Trampoline::ThreadIndex => info::Trampoline::ThreadIndex, + Trampoline::ThreadNewIndirect { + instance, + start_func_ty_idx, + start_func_table_id, + } => info::Trampoline::ThreadNewIndirect { + instance: *instance, + start_func_ty_idx: *start_func_ty_idx, + start_func_table_idx: self.runtime_table(*start_func_table_id), + }, + Trampoline::ThreadSwitchTo { + instance, + cancellable, + } => info::Trampoline::ThreadSwitchTo { + instance: *instance, + cancellable: *cancellable, + }, + Trampoline::ThreadSuspend { + instance, + cancellable, + } => info::Trampoline::ThreadSuspend { + instance: *instance, + cancellable: *cancellable, + }, + Trampoline::ThreadResumeLater => info::Trampoline::ThreadResumeLater, + Trampoline::ThreadYieldTo { + instance, + cancellable, + } => info::Trampoline::ThreadYieldTo { + instance: *instance, + cancellable: *cancellable, + }, }; let i1 = self.trampolines.push(*signature); let i2 = self.trampoline_defs.push(trampoline); diff --git a/crates/environ/src/component/info.rs b/crates/environ/src/component/info.rs index d825c9a51e03..28e363c69913 100644 --- a/crates/environ/src/component/info.rs +++ b/crates/environ/src/component/info.rs @@ -1116,6 +1116,46 @@ pub enum Trampoline { /// Which slot to update. slot: u32, }, + + /// Intrinsic used to implement the `thread.index` component model builtin. + ThreadIndex, + + /// Intrinsic used to implement the `thread.new_indirect` component model builtin. + ThreadNewIndirect { + /// The specific component instance which is calling the intrinsic. + instance: RuntimeComponentInstanceIndex, + /// The type index for the start function of the thread. + start_func_ty_idx: ComponentTypeIndex, + /// The index of the table that stores the start function. + start_func_table_idx: RuntimeTableIndex, + }, + + /// Intrinsic used to implement the `thread.switch-to` component model builtin. + ThreadSwitchTo { + /// The specific component instance which is calling the intrinsic. + instance: RuntimeComponentInstanceIndex, + /// If `true`, indicates the caller instance maybe reentered. + cancellable: bool, + }, + + /// Intrinsic used to implement the `thread.suspend` component model builtin. + ThreadSuspend { + /// The specific component instance which is calling the intrinsic. + instance: RuntimeComponentInstanceIndex, + /// If `true`, indicates the caller instance maybe reentered. + cancellable: bool, + }, + + /// Intrinsic used to implement the `thread.resume-later` component model builtin. + ThreadResumeLater, + + /// Intrinsic used to implement the `thread.yield-to` component model builtin. + ThreadYieldTo { + /// The specific component instance which is calling the intrinsic. + instance: RuntimeComponentInstanceIndex, + /// If `true`, indicates the caller instance maybe reentered. + cancellable: bool, + }, } impl Trampoline { @@ -1181,6 +1221,12 @@ impl Trampoline { ErrorContextTransfer => format!("error-context-transfer"), ContextGet { .. } => format!("context-get"), ContextSet { .. } => format!("context-set"), + ThreadIndex => format!("thread-index"), + ThreadNewIndirect { .. } => format!("thread-new-indirect"), + ThreadSwitchTo { .. } => format!("thread-switch-to"), + ThreadSuspend { .. } => format!("thread-suspend"), + ThreadResumeLater => format!("thread-resume-later"), + ThreadYieldTo { .. } => format!("thread-yield-to"), } } } diff --git a/crates/environ/src/component/translate.rs b/crates/environ/src/component/translate.rs index 9cb276f43a0b..f729fba676da 100644 --- a/crates/environ/src/component/translate.rs +++ b/crates/environ/src/component/translate.rs @@ -309,6 +309,29 @@ enum LocalInitializer<'data> { func: ModuleInternedTypeIndex, i: u32, }, + ThreadIndex { + func: ModuleInternedTypeIndex, + }, + ThreadNewIndirect { + func: ModuleInternedTypeIndex, + start_func_ty: ComponentTypeIndex, + start_func_table_index: TableIndex, + }, + ThreadSwitchTo { + func: ModuleInternedTypeIndex, + cancellable: bool, + }, + ThreadSuspend { + func: ModuleInternedTypeIndex, + cancellable: bool, + }, + ThreadResumeLater { + func: ModuleInternedTypeIndex, + }, + ThreadYieldTo { + func: ModuleInternedTypeIndex, + cancellable: bool, + }, // core wasm modules ModuleStatic(StaticModuleIndex, ComponentCoreModuleTypeId), @@ -1089,24 +1112,42 @@ impl<'a, 'data> Translator<'a, 'data> { core_func_index += 1; LocalInitializer::ContextSet { i, func } } - wasmparser::CanonicalFunction::ThreadIndex => { - bail!("unimplemented `thread.index`"); + let func = self.core_func_signature(core_func_index)?; + core_func_index += 1; + LocalInitializer::ThreadIndex { func } } - wasmparser::CanonicalFunction::ThreadNewIndirect { .. } => { - bail!("unimplemented `thread.new-indirect`"); + wasmparser::CanonicalFunction::ThreadNewIndirect { + func_ty_index, + table_index, + } => { + let func = self.core_func_signature(core_func_index)?; + core_func_index += 1; + LocalInitializer::ThreadNewIndirect { + func, + start_func_ty: ComponentTypeIndex::from_u32(func_ty_index), + start_func_table_index: TableIndex::from_u32(table_index), + } } - wasmparser::CanonicalFunction::ThreadSwitchTo { .. } => { - bail!("unimplemented `thread.switch-to`"); + wasmparser::CanonicalFunction::ThreadSwitchTo { cancellable } => { + let func = self.core_func_signature(core_func_index)?; + core_func_index += 1; + LocalInitializer::ThreadSwitchTo { func, cancellable } } - wasmparser::CanonicalFunction::ThreadSuspend { .. } => { - bail!("unimplemented `thread.suspend`"); + wasmparser::CanonicalFunction::ThreadSuspend { cancellable } => { + let func = self.core_func_signature(core_func_index)?; + core_func_index += 1; + LocalInitializer::ThreadSuspend { func, cancellable } } wasmparser::CanonicalFunction::ThreadResumeLater => { - bail!("unimplemented `thread.resume-later`"); + let func = self.core_func_signature(core_func_index)?; + core_func_index += 1; + LocalInitializer::ThreadResumeLater { func } } - wasmparser::CanonicalFunction::ThreadYieldTo { .. } => { - bail!("unimplemented `thread.yield-to`"); + wasmparser::CanonicalFunction::ThreadYieldTo { cancellable } => { + let func = self.core_func_signature(core_func_index)?; + core_func_index += 1; + LocalInitializer::ThreadYieldTo { func, cancellable } } }; self.result.initializers.push(init); diff --git a/crates/environ/src/component/translate/adapt.rs b/crates/environ/src/component/translate/adapt.rs index 64c9e1a9c6e4..d9c7a474f96d 100644 --- a/crates/environ/src/component/translate/adapt.rs +++ b/crates/environ/src/component/translate/adapt.rs @@ -345,6 +345,7 @@ fn fact_import_to_core_def( fact::Import::ErrorContextTransfer => { simple_intrinsic(dfg::Trampoline::ErrorContextTransfer) } + fact::Import::ThreadIndex => simple_intrinsic(dfg::Trampoline::ThreadIndex), } } diff --git a/crates/environ/src/component/translate/inline.rs b/crates/environ/src/component/translate/inline.rs index 3dea3e40cb6d..227224589dea 100644 --- a/crates/environ/src/component/translate/inline.rs +++ b/crates/environ/src/component/translate/inline.rs @@ -1109,7 +1109,73 @@ impl<'a> Inliner<'a> { )); frame.funcs.push((*func, dfg::CoreDef::Trampoline(index))); } + ThreadIndex { func } => { + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::ThreadIndex)); + frame.funcs.push((*func, dfg::CoreDef::Trampoline(index))); + } + ThreadNewIndirect { + func, + start_func_table_index, + start_func_ty, + } => { + let table_export = frame.tables[*start_func_table_index] + .clone() + .map_index(|i| match i { + EntityIndex::Table(i) => i, + _ => unreachable!(), + }); + let table_id = self.result.tables.push(table_export); + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::ThreadNewIndirect { + instance: frame.instance, + start_func_ty_idx: *start_func_ty, + start_func_table_id: table_id, + }, + )); + frame.funcs.push((*func, dfg::CoreDef::Trampoline(index))); + } + ThreadSwitchTo { func, cancellable } => { + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::ThreadSwitchTo { + instance: frame.instance, + cancellable: *cancellable, + }, + )); + frame.funcs.push((*func, dfg::CoreDef::Trampoline(index))); + } + ThreadSuspend { func, cancellable } => { + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::ThreadSuspend { + instance: frame.instance, + cancellable: *cancellable, + }, + )); + frame.funcs.push((*func, dfg::CoreDef::Trampoline(index))); + } + ThreadResumeLater { func } => { + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::ThreadResumeLater)); + frame.funcs.push((*func, dfg::CoreDef::Trampoline(index))); + } + ThreadYieldTo { func, cancellable } => { + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::ThreadYieldTo { + instance: frame.instance, + cancellable: *cancellable, + }, + )); + frame.funcs.push((*func, dfg::CoreDef::Trampoline(index))); + } ModuleStatic(idx, ty) => { frame.modules.push(ModuleDef::Static(*idx, *ty)); } diff --git a/crates/environ/src/fact.rs b/crates/environ/src/fact.rs index d2a8689f2ab4..9e97dc4274e7 100644 --- a/crates/environ/src/fact.rs +++ b/crates/environ/src/fact.rs @@ -870,6 +870,8 @@ pub enum Import { /// An intrinisic used by FACT-generated modules to (partially or entirely) transfer /// ownership of an `error-context`. ErrorContextTransfer, + /// Get the index of the current thread. + ThreadIndex, } impl Options { diff --git a/crates/environ/src/trap_encoding.rs b/crates/environ/src/trap_encoding.rs index 15efa99b40fa..939a6f5c515b 100644 --- a/crates/environ/src/trap_encoding.rs +++ b/crates/environ/src/trap_encoding.rs @@ -89,6 +89,11 @@ pub enum Trap { /// triggering a trap instead. CannotEnterComponent, + /// When the `component-model` feature is enabled this trap represents a + /// scenario where one component tried to call an import at a time when it + /// was not legal to do so. + CannotLeaveComponent, + /// Async-lifted export failed to produce a result by calling `task.return` /// before returning `STATUS_DONE` and/or after all host tasks completed. NoAsyncResult, @@ -107,11 +112,6 @@ pub enum Trap { /// that all host tasks have completed and any/all host-owned stream/future /// handles have been dropped. AsyncDeadlock, - - /// When the `component-model` feature is enabled this trap represents a - /// scenario where a component instance tried to call an import or intrinsic - /// when it wasn't allowed to, e.g. from a post-return function. - CannotLeaveComponent, // if adding a variant here be sure to update the `check!` macro below } @@ -184,12 +184,12 @@ impl fmt::Display for Trap { AllocationTooLarge => "allocation size too large", CastFailure => "cast failure", CannotEnterComponent => "cannot enter component instance", + CannotLeaveComponent => "cannot leave component instance", NoAsyncResult => "async-lifted export failed to produce a result", UnhandledTag => "unhandled tag", ContinuationAlreadyConsumed => "continuation already consumed", DisabledOpcode => "pulley opcode disabled at compile time was executed", AsyncDeadlock => "deadlock detected: event loop cannot make further progress", - CannotLeaveComponent => "cannot leave component instance", }; write!(f, "wasm trap: {desc}") } diff --git a/crates/fuzzing/src/generators/config.rs b/crates/fuzzing/src/generators/config.rs index a72a68cc459e..63cced7174ba 100644 --- a/crates/fuzzing/src/generators/config.rs +++ b/crates/fuzzing/src/generators/config.rs @@ -137,6 +137,7 @@ impl Config { component_model_async, component_model_async_builtins, component_model_async_stackful, + component_model_threading, component_model_error_context, component_model_gc, simd, @@ -159,6 +160,7 @@ impl Config { component_model_async_builtins.unwrap_or(false); self.module_config.component_model_async_stackful = component_model_async_stackful.unwrap_or(false); + self.module_config.component_model_threading = component_model_threading.unwrap_or(false); self.module_config.component_model_error_context = component_model_error_context.unwrap_or(false); self.module_config.component_model_gc = component_model_gc.unwrap_or(false); @@ -283,6 +285,7 @@ impl Config { Some(self.module_config.component_model_async_builtins); cfg.wasm.component_model_async_stackful = Some(self.module_config.component_model_async_stackful); + cfg.wasm.component_model_threading = Some(self.module_config.component_model_threading); cfg.wasm.component_model_error_context = Some(self.module_config.component_model_error_context); cfg.wasm.component_model_gc = Some(self.module_config.component_model_gc); diff --git a/crates/fuzzing/src/generators/module.rs b/crates/fuzzing/src/generators/module.rs index 7b65c92c4c31..942ff9fad882 100644 --- a/crates/fuzzing/src/generators/module.rs +++ b/crates/fuzzing/src/generators/module.rs @@ -19,6 +19,7 @@ pub struct ModuleConfig { pub component_model_async: bool, pub component_model_async_builtins: bool, pub component_model_async_stackful: bool, + pub component_model_threading: bool, pub component_model_error_context: bool, pub component_model_gc: bool, pub legacy_exceptions: bool, @@ -70,6 +71,7 @@ impl<'a> Arbitrary<'a> for ModuleConfig { component_model_async: false, component_model_async_builtins: false, component_model_async_stackful: false, + component_model_threading: false, component_model_error_context: false, component_model_gc: false, legacy_exceptions: false, diff --git a/crates/misc/component-async-tests/tests/scenario/coop_threads.rs b/crates/misc/component-async-tests/tests/scenario/coop_threads.rs new file mode 100644 index 000000000000..d9ce940b2482 --- /dev/null +++ b/crates/misc/component-async-tests/tests/scenario/coop_threads.rs @@ -0,0 +1,19 @@ +use anyhow::Result; + +use super::util::test_run; + +// No-op function; we only test this by composing it in `async_coop_threads_caller` +#[allow( + dead_code, + reason = "here only to make the `assert_test_exists` macro happy" +)] +pub fn async_coop_threads_callee() {} + +#[tokio::test] +pub async fn async_coop_threads_caller() -> Result<()> { + test_run(&[ + test_programs_artifacts::ASYNC_COOP_THREADS_CALLER_COMPONENT, + test_programs_artifacts::ASYNC_COOP_THREADS_CALLEE_COMPONENT, + ]) + .await +} diff --git a/crates/misc/component-async-tests/tests/scenario/mod.rs b/crates/misc/component-async-tests/tests/scenario/mod.rs index e3481c18bf58..aab3fd3f69b2 100644 --- a/crates/misc/component-async-tests/tests/scenario/mod.rs +++ b/crates/misc/component-async-tests/tests/scenario/mod.rs @@ -2,6 +2,7 @@ mod util; pub mod backpressure; pub mod borrowing; +pub mod coop_threads; pub mod error_context; pub mod post_return; pub mod read_resource_stream; diff --git a/crates/misc/component-async-tests/tests/scenario/util.rs b/crates/misc/component-async-tests/tests/scenario/util.rs index 2ee4577cd25c..780f34531193 100644 --- a/crates/misc/component-async-tests/tests/scenario/util.rs +++ b/crates/misc/component-async-tests/tests/scenario/util.rs @@ -37,6 +37,7 @@ pub fn config() -> Config { config.wasm_component_model_async(true); config.wasm_component_model_async_builtins(true); config.wasm_component_model_async_stackful(true); + config.wasm_component_model_threading(true); config.wasm_component_model_error_context(true); config.async_support(true); config diff --git a/crates/misc/component-async-tests/tests/test_all.rs b/crates/misc/component-async-tests/tests/test_all.rs index 1c4c76e35d50..dfc3eee2f0fb 100644 --- a/crates/misc/component-async-tests/tests/test_all.rs +++ b/crates/misc/component-async-tests/tests/test_all.rs @@ -13,6 +13,7 @@ mod scenario; use scenario::backpressure::{async_backpressure_callee, async_backpressure_caller}; use scenario::borrowing::{async_borrowing_callee, async_borrowing_caller}; +use scenario::coop_threads::{async_coop_threads_callee, async_coop_threads_caller}; use scenario::error_context::{ async_error_context, async_error_context_callee, async_error_context_caller, }; diff --git a/crates/misc/component-async-tests/wit/test.wit b/crates/misc/component-async-tests/wit/test.wit index 88d52af74602..a949a64d9ec3 100644 --- a/crates/misc/component-async-tests/wit/test.wit +++ b/crates/misc/component-async-tests/wit/test.wit @@ -348,3 +348,16 @@ world sleep-post-return-caller { world closed-stream-guest { export closed-stream; } + +interface coop { + get-index: async func() -> u32; +} + +world coop-threads-callee { + export coop; +} + +world coop-threads-caller { + import coop; + export run; +} \ No newline at end of file diff --git a/crates/test-programs/src/async_.rs b/crates/test-programs/src/async_.rs index 1ec58158d27d..96fe914883d1 100644 --- a/crates/test-programs/src/async_.rs +++ b/crates/test-programs/src/async_.rs @@ -114,6 +114,17 @@ pub unsafe fn context_set(_: u32) { unreachable!() } +#[cfg(target_arch = "wasm32")] +#[link(wasm_import_module = "$root")] +unsafe extern "C" { + #[link_name = "[thread-index]"] + pub fn thread_index() -> u32; +} +#[cfg(not(target_arch = "wasm32"))] +pub unsafe fn thread_index() -> u32 { + unreachable!() +} + #[cfg(target_arch = "wasm32")] #[link(wasm_import_module = "[export]$root")] unsafe extern "C" { diff --git a/crates/test-programs/src/bin/async_coop_threads_callee.rs b/crates/test-programs/src/bin/async_coop_threads_callee.rs new file mode 100644 index 000000000000..0b1c8b871f34 --- /dev/null +++ b/crates/test-programs/src/bin/async_coop_threads_callee.rs @@ -0,0 +1,25 @@ +mod bindings { + wit_bindgen::generate!({ + path: "../misc/component-async-tests/wit", + world: "coop-threads-callee", + }); + + use super::Component; + export!(Component); +} + +use { + bindings::exports::local::local::coop::Guest as CoopThreads, + test_programs::async_::thread_index, +}; + +struct Component; + +impl CoopThreads for Component { + async fn get_index() -> u32 { + unsafe { thread_index() } + } +} + +// Unused function; required since this file is built as a `bin`: +fn main() {} diff --git a/crates/test-programs/src/bin/async_coop_threads_caller.rs b/crates/test-programs/src/bin/async_coop_threads_caller.rs new file mode 100644 index 000000000000..12e00b1cca56 --- /dev/null +++ b/crates/test-programs/src/bin/async_coop_threads_caller.rs @@ -0,0 +1,25 @@ +mod bindings { + wit_bindgen::generate!({ + path: "../misc/component-async-tests/wit", + world: "coop-threads-caller", + }); + + use super::Component; + export!(Component); +} + +use { + crate::bindings::local::local::coop::get_index, + bindings::exports::local::local::run::Guest as Run, +}; + +struct Component; + +impl Run for Component { + async fn run() { + assert_eq!(get_index().await, 10) + } +} + +// Unused function; required since this file is built as a `bin`: +fn main() {} diff --git a/crates/test-util/src/wasmtime_wast.rs b/crates/test-util/src/wasmtime_wast.rs index feec31e2146e..e4f9a0497103 100644 --- a/crates/test-util/src/wasmtime_wast.rs +++ b/crates/test-util/src/wasmtime_wast.rs @@ -40,6 +40,7 @@ pub fn apply_test_config(config: &mut Config, test_config: &wast::TestConfig) { component_model_async, component_model_async_builtins, component_model_async_stackful, + component_model_threading, component_model_error_context, component_model_gc, nan_canonicalization, @@ -67,6 +68,7 @@ pub fn apply_test_config(config: &mut Config, test_config: &wast::TestConfig) { let component_model_async = component_model_async.unwrap_or(false); let component_model_async_builtins = component_model_async_builtins.unwrap_or(false); let component_model_async_stackful = component_model_async_stackful.unwrap_or(false); + let component_model_threading = component_model_threading.unwrap_or(false); let component_model_error_context = component_model_error_context.unwrap_or(false); let component_model_gc = component_model_gc.unwrap_or(false); let nan_canonicalization = nan_canonicalization.unwrap_or(false); @@ -102,6 +104,7 @@ pub fn apply_test_config(config: &mut Config, test_config: &wast::TestConfig) { .wasm_component_model_async(component_model_async) .wasm_component_model_async_builtins(component_model_async_builtins) .wasm_component_model_async_stackful(component_model_async_stackful) + .wasm_component_model_threading(component_model_threading) .wasm_component_model_error_context(component_model_error_context) .wasm_component_model_gc(component_model_gc) .wasm_exceptions(exceptions) diff --git a/crates/test-util/src/wast.rs b/crates/test-util/src/wast.rs index 7517f919984a..cefa50ac27ba 100644 --- a/crates/test-util/src/wast.rs +++ b/crates/test-util/src/wast.rs @@ -228,6 +228,7 @@ macro_rules! foreach_config_option { component_model_async component_model_async_builtins component_model_async_stackful + component_model_threading component_model_error_context component_model_gc simd diff --git a/crates/wasmtime/Cargo.toml b/crates/wasmtime/Cargo.toml index 81ccc0e5083a..b07d94d90f98 100644 --- a/crates/wasmtime/Cargo.toml +++ b/crates/wasmtime/Cargo.toml @@ -61,6 +61,7 @@ hashbrown = { workspace = true, features = ["default-hasher"] } bitflags = { workspace = true } futures = { workspace = true, features = ["alloc"], optional = true } bytes = { workspace = true, optional = true } +tracing-subscriber.workspace = true [target.'cfg(target_os = "windows")'.dependencies.windows-sys] workspace = true diff --git a/crates/wasmtime/src/config.rs b/crates/wasmtime/src/config.rs index ae61afcfa0f4..bcb20f3a36a4 100644 --- a/crates/wasmtime/src/config.rs +++ b/crates/wasmtime/src/config.rs @@ -1162,6 +1162,19 @@ impl Config { self } + /// This corresponds to the ๐Ÿงต emoji in the component model specification. + /// + /// Please note that Wasmtime's support for this feature is _very_ + /// incomplete. + /// + /// [proposal]: + /// https://github.com/WebAssembly/component-model/pull/557 + #[cfg(feature = "component-model-async")] + pub fn wasm_component_model_threading(&mut self, enable: bool) -> &mut Self { + self.wasm_feature(WasmFeatures::CM_THREADING, enable); + self + } + /// This corresponds to the ๐Ÿ“ emoji in the component model specification. /// /// Please note that Wasmtime's support for this feature is _very_ diff --git a/crates/wasmtime/src/runtime/.DS_Store b/crates/wasmtime/src/runtime/.DS_Store new file mode 100644 index 000000000000..070e0289cbc1 Binary files /dev/null and b/crates/wasmtime/src/runtime/.DS_Store differ diff --git a/crates/wasmtime/src/runtime/component/.DS_Store b/crates/wasmtime/src/runtime/component/.DS_Store new file mode 100644 index 000000000000..3c8a2529dec7 Binary files /dev/null and b/crates/wasmtime/src/runtime/component/.DS_Store differ diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 6201bf71141c..f37cedf76d6b 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -57,8 +57,8 @@ use crate::store::{StoreInner, StoreOpaque, StoreToken}; use crate::vm::component::{ CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState, }; -use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; -use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw}; +use crate::vm::{self, AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; +use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType}; use anyhow::{Context as _, Result, anyhow, bail}; use error_contexts::GlobalErrorContextRefCount; use futures::channel::oneshot; @@ -85,9 +85,9 @@ use table::{TableDebug, TableId}; use wasmtime_environ::component::{ CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, - RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex, - TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex, - TypeTupleIndex, + RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding, + TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, + TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex, }; pub use abort::JoinHandle; @@ -103,7 +103,7 @@ pub(crate) use futures_and_streams::{ mod abort; mod error_contexts; mod futures_and_streams; -mod table; +pub(crate) mod table; pub(crate) mod tls; /// Constant defined in the Component Model spec to indicate that the async @@ -595,14 +595,24 @@ enum SuspendReason { /// waitable set or task. Waiting { set: TableId, - task: TableId, + thread: TableId, }, /// The fiber has finished handling its most recent work item and is waiting /// for another (or to be dropped if it is no longer needed). NeedWork, /// The fiber is yielding and should be resumed once other tasks have had a /// chance to run. - Yielding { task: TableId }, + Yielding { + thread: TableId, + to: Option>, + cancellable: bool, + }, + /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`. + ExplicitlySuspending { + thread: TableId, + to: Option>, + cancellable: bool, + }, } /// Represents a pending call into guest code for a given guest task. @@ -618,14 +628,16 @@ enum GuestCallKind { }, /// Indicates that a new guest task call is pending and may be executed /// using the specified closure. - Start(Box Result<()> + Send + Sync>), + StartImplicit(Box Result<()> + Send + Sync>), + StartExplicit(Box Result<()> + Send + Sync>), } impl fmt::Debug for GuestCallKind { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::DeliverEvent { set } => f.debug_struct("DeliverEvent").field("set", set).finish(), - Self::Start(_) => f.debug_tuple("Start").finish(), + Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(), + Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(), } } } @@ -633,7 +645,7 @@ impl fmt::Debug for GuestCallKind { /// Represents a pending call into guest code for a given guest task. #[derive(Debug)] struct GuestCall { - task: TableId, + thread: TableId, kind: GuestCallKind, } @@ -648,11 +660,13 @@ impl GuestCall { /// - the call is for a not-yet started task and the (sub-)component /// instance to be called has backpressure enabled fn is_ready(&self, state: &mut ConcurrentState) -> Result { - let task_instance = state.get_mut(self.task)?.instance; + let task_instance = state.get_task_mut(self.thread)?.instance; let state = state.instance_state(task_instance); + let ready = match &self.kind { GuestCallKind::DeliverEvent { .. } => !state.do_not_enter, - GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure > 0), + GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0), + GuestCallKind::StartExplicit(_) => true, }; log::trace!( "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})", @@ -673,8 +687,8 @@ enum WorkerItem { /// or `CallbackCode.POLL`). #[derive(Debug)] struct PollParams { - /// Identifies the polling task. - task: TableId, + /// Identifies the polling thread. + thread: TableId, /// The waitable set being polled. set: TableId, } @@ -714,17 +728,18 @@ impl ComponentInstance { /// async-lifted export; otherwise, it was received from its callback. fn handle_callback_code( mut self: Pin<&mut Self>, - guest_task: TableId, + guest_thread: TableId, runtime_instance: RuntimeComponentInstanceIndex, code: u32, initial_call: bool, ) -> Result<()> { let (code, set) = unpack_callback_code(code); - log::trace!("received callback code from {guest_task:?}: {code} (set: {set})"); + log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})"); let state = self.as_mut().concurrent_state_mut(); - let task = state.get_mut(guest_task)?; + let task_id = state.get_task_id(guest_thread)?; + let task = state.get_mut(task_id)?; if task.lift_result.is_some() { if code == callback_code::EXIT { @@ -733,7 +748,7 @@ impl ComponentInstance { if initial_call { // Notify any current or future waiters that this subtask has // started. - Waitable::Guest(guest_task).set_event( + Waitable::Guest(task_id).set_event( state, Some(Event::Subtask { status: Status::Started, @@ -754,15 +769,15 @@ impl ComponentInstance { match code { callback_code::EXIT => { - let task = state.get_mut(guest_task)?; + let task = state.get_task_mut(guest_thread)?; match &task.caller { Caller::Host { remove_task_automatically, .. } => { if *remove_task_automatically { - log::trace!("handle_callback_code will delete task {guest_task:?}"); - Waitable::Guest(guest_task).delete_from(state)?; + log::trace!("handle_callback_code will delete task {:?}", task_id); + Waitable::Guest(task_id).delete_from(state)?; } } Caller::Guest { .. } => { @@ -772,13 +787,13 @@ impl ComponentInstance { } } callback_code::YIELD => { - // Push this task onto the "low priority" queue so it runs after - // any other tasks have had a chance to run. - let task = state.get_mut(guest_task)?; + // Push this thread onto the "low priority" queue so it runs after + // any other threads have had a chance to run. + let task = state.get_task_mut(guest_thread)?; assert!(task.event.is_none()); task.event = Some(Event::None); state.push_low_priority(WorkItem::GuestCall(GuestCall { - task: guest_task, + thread: guest_thread, kind: GuestCallKind::DeliverEvent { set: None }, })); } @@ -786,12 +801,12 @@ impl ComponentInstance { let set = get_set(self.as_mut(), set)?; let state = self.concurrent_state_mut(); - if state.get_mut(guest_task)?.event.is_some() + if state.get_task_mut(guest_thread)?.event.is_some() || !state.get_mut(set)?.ready.is_empty() { // An event is immediately available; deliver it ASAP. state.push_high_priority(WorkItem::GuestCall(GuestCall { - task: guest_task, + thread: guest_thread, kind: GuestCallKind::DeliverEvent { set: Some(set) }, })); } else { @@ -801,7 +816,7 @@ impl ComponentInstance { // We're polling, so just yield and check whether an // event has arrived after that. state.push_low_priority(WorkItem::Poll(PollParams { - task: guest_task, + thread: guest_thread, set, })); } @@ -812,12 +827,12 @@ impl ComponentInstance { // Here we also set `GuestTask::wake_on_cancel` // which allows `subtask.cancel` to interrupt the // wait. - let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set); + let old = state.get_mut(guest_thread)?.wake_on_cancel.replace(set); assert!(old.is_none()); let old = state .get_mut(set)? .waiting - .insert(guest_task, WaitMode::Callback); + .insert(guest_thread, WaitMode::Callback); assert!(old.is_none()); } _ => unreachable!(), @@ -1020,7 +1035,7 @@ impl Instance { ); assert!(state.high_priority.is_empty()); assert!(state.low_priority.is_empty()); - assert!(state.guest_task.is_none()); + assert!(state.guest_thread.is_none()); assert!(state.futures.get_mut().as_ref().unwrap().is_empty()); assert!( state @@ -1376,11 +1391,12 @@ impl Instance { self.run_on_worker(store, WorkerItem::GuestCall(call)) .await?; } else { - let task = state.get_mut(call.task)?; + let task_id = state.get_task_id(call.thread)?; + let task = state.get_mut(task_id)?; if !task.starting_sent { task.starting_sent = true; - if let GuestCallKind::Start(_) = &call.kind { - Waitable::Guest(call.task).set_event( + if let GuestCallKind::StartImplicit(_) = &call.kind { + Waitable::Guest(task_id).set_event( state, Some(Event::Subtask { status: Status::Starting, @@ -1389,22 +1405,22 @@ impl Instance { } } - let runtime_instance = state.get_mut(call.task)?.instance; + let runtime_instance = state.get_mut(task_id)?.instance; state .instance_state(runtime_instance) .pending - .insert(call.task, call.kind); + .insert(call.thread, call.kind); } } WorkItem::Poll(params) => { let state = self.concurrent_state_mut(store.0); - if state.get_mut(params.task)?.event.is_some() + if state.get_task_mut(params.thread)?.event.is_some() || !state.get_mut(params.set)?.ready.is_empty() { // There's at least one event immediately available; deliver // it to the guest ASAP. state.push_high_priority(WorkItem::GuestCall(GuestCall { - task: params.task, + thread: params.thread, kind: GuestCallKind::DeliverEvent { set: Some(params.set), }, @@ -1412,9 +1428,9 @@ impl Instance { } else { // There are no events immediately available; deliver // `Event::None` to the guest. - state.get_mut(params.task)?.event = Some(Event::None); + state.get_task_mut(params.thread)?.event = Some(Event::None); state.push_high_priority(WorkItem::GuestCall(GuestCall { - task: params.task, + thread: params.thread, kind: GuestCallKind::DeliverEvent { set: Some(params.set), }, @@ -1432,17 +1448,18 @@ impl Instance { /// Resume the specified fiber, giving it exclusive access to the specified /// store. async fn resume_fiber(self, store: &mut StoreOpaque, fiber: StoreFiber<'static>) -> Result<()> { - let old_task = self.concurrent_state_mut(store).guest_task; - log::trace!("resume_fiber: save current task {old_task:?}"); + let old_thread = self.concurrent_state_mut(store).guest_thread; + log::trace!("resume_fiber: save current thread {old_thread:?}"); let fiber = fiber::resolve_or_release(store, fiber).await?; let state = self.concurrent_state_mut(store); - state.guest_task = old_task; - log::trace!("resume_fiber: restore current task {old_task:?}"); + state.guest_thread = old_thread; + log::trace!("resume_fiber: restore current thread {old_thread:?}"); if let Some(mut fiber) = fiber { + log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason); // See the `SuspendReason` documentation for what each case means. match state.suspend_reason.take().unwrap() { SuspendReason::NeedWork => { @@ -1452,17 +1469,24 @@ impl Instance { fiber.dispose(store); } } - SuspendReason::Yielding { .. } => { + SuspendReason::Yielding { thread, .. } => { + state.get_mut(thread)?.state = GuestThreadState::Pending; state.push_low_priority(WorkItem::ResumeFiber(fiber)); } - SuspendReason::Waiting { set, task } => { + SuspendReason::ExplicitlySuspending { thread, .. } => { + // TODO do this earlier? + state.get_mut(thread)?.state = GuestThreadState::Suspended(fiber); + } + SuspendReason::Waiting { set, thread } => { let old = state .get_mut(set)? .waiting - .insert(task, WaitMode::Fiber(fiber)); + .insert(thread, WaitMode::Fiber(fiber)); assert!(old.is_none()); } - } + }; + } else { + log::trace!("resume_fiber: fiber has exited"); } Ok(()) @@ -1500,55 +1524,60 @@ impl Instance { fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> { match call.kind { GuestCallKind::DeliverEvent { set } => { + let task_id = self.concurrent_state_mut(store).get_task_id(call.thread)?; let (event, waitable) = self .id() .get_mut(store) - .get_event(call.task, set, true)? + .get_event(task_id, set, true)? .unwrap(); let state = self.concurrent_state_mut(store); - let task = state.get_mut(call.task)?; - let runtime_instance = task.instance; + let runtime_instance = state.get_mut(task_id)?.instance; let handle = waitable.map(|(_, v)| v).unwrap_or(0); log::trace!( "use callback to deliver event {event:?} to {:?} for {waitable:?}", - call.task, + call.thread, ); - let old_task = state.guest_task.replace(call.task); + let old_thread = state.guest_thread.replace(call.thread); log::trace!( - "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task", - call.task + "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread", + call.thread ); + let old_task = old_thread.map(|thread| state.get_task_id(thread).unwrap()); - self.maybe_push_call_context(store.store_opaque_mut(), call.task)?; + self.maybe_push_call_context(store.store_opaque_mut(), task_id, old_task)?; let state = self.concurrent_state_mut(store); state.enter_instance(runtime_instance); - let callback = state.get_mut(call.task)?.callback.take().unwrap(); + let callback = state.get_mut(task_id)?.callback.take().unwrap(); let code = callback(store, self, runtime_instance, event, handle)?; let state = self.concurrent_state_mut(store); - state.get_mut(call.task)?.callback = Some(callback); - + state.get_mut(task_id)?.callback = Some(callback); state.exit_instance(runtime_instance)?; - self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?; + self.maybe_pop_call_context(store.store_opaque_mut(), task_id, old_task)?; self.id().get_mut(store).handle_callback_code( - call.task, + call.thread, runtime_instance, code, false, )?; - self.concurrent_state_mut(store).guest_task = old_task; - log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task"); + self.concurrent_state_mut(store).guest_thread = old_thread; + log::trace!( + "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread" + ); + } + GuestCallKind::StartImplicit(fun) => { + fun(store, self)?; } - GuestCallKind::Start(fun) => { + GuestCallKind::StartExplicit(fun) => { fun(store, self)?; } } @@ -1564,17 +1593,29 @@ impl Instance { fn suspend(self, store: &mut dyn VMStore, reason: SuspendReason) -> Result<()> { log::trace!("suspend fiber: {reason:?}"); - // If we're yielding or waiting on behalf of a guest task, we'll need to + let state = self.concurrent_state_mut(store); + + // If we're yielding or waiting on behalf of a guest thread, we'll need to // pop the call context which manages resource borrows before suspending // and then push it again once we've resumed. let task = match &reason { - SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task), + SuspendReason::Yielding { thread, .. } + | SuspendReason::Waiting { thread, .. } + | SuspendReason::ExplicitlySuspending { thread, .. } => { + Some(state.get_task_id(*thread)?) + } SuspendReason::NeedWork => None, }; - let old_guest_task = if let Some(task) = task { - self.maybe_pop_call_context(store, task)?; - self.concurrent_state_mut(store).guest_task + let old_guest_thread = if let Some(task) = task { + let old_thread = self.concurrent_state_mut(store).guest_thread; + let old_task = old_thread.map(|thread| { + self.concurrent_state_mut(store) + .get_task_id(thread) + .unwrap() + }); + self.maybe_pop_call_context(store, task, old_task)?; + old_thread } else { None }; @@ -1586,8 +1627,13 @@ impl Instance { store.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?; if let Some(task) = task { - self.concurrent_state_mut(store).guest_task = old_guest_task; - self.maybe_push_call_context(store, task)?; + self.concurrent_state_mut(store).guest_thread = old_guest_thread; + let old_task = old_guest_thread.map(|thread| { + self.concurrent_state_mut(store) + .get_task_id(thread) + .unwrap() + }); + self.maybe_push_call_context(store, task, old_task)?; } Ok(()) @@ -1600,9 +1646,10 @@ impl Instance { self, store: &mut StoreOpaque, guest_task: TableId, + old_task: Option>, ) -> Result<()> { let task = self.concurrent_state_mut(store).get_mut(guest_task)?; - if task.lift_result.is_some() { + if Some(guest_task) != old_task && task.lift_result.is_some() { log::trace!("push call context for {guest_task:?}"); let call_context = task.call_context.take().unwrap(); store.component_resource_state().0.push(call_context); @@ -1617,12 +1664,14 @@ impl Instance { self, store: &mut StoreOpaque, guest_task: TableId, + old_task: Option>, ) -> Result<()> { - if self - .concurrent_state_mut(store) - .get_mut(guest_task)? - .lift_result - .is_some() + if Some(guest_task) != old_task + && self + .concurrent_state_mut(store) + .get_mut(guest_task)? + .lift_result + .is_some() { log::trace!("pop call context for {guest_task:?}"); let call_context = Some(store.component_resource_state().0.pop().unwrap()); @@ -1642,7 +1691,7 @@ impl Instance { unsafe fn queue_call( self, mut store: StoreContextMut, - guest_task: TableId, + guest_thread: TableId, callee: SendSyncPtr, param_count: usize, result_count: usize, @@ -1667,7 +1716,7 @@ impl Instance { /// the returned closure is called. unsafe fn make_call( store: StoreContextMut, - guest_task: TableId, + guest_thread: TableId, callee: SendSyncPtr, param_count: usize, result_count: usize, @@ -1683,7 +1732,9 @@ impl Instance { let token = StoreToken::new(store); move |store: &mut dyn VMStore, instance: Instance| { let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS]; - let task = instance.concurrent_state_mut(store).get_mut(guest_task)?; + let task = instance + .concurrent_state_mut(store) + .get_task_mut(guest_thread)?; let may_enter_after_call = task.call_post_return_automatically(); let lower = task.lower_params.take().unwrap(); @@ -1697,6 +1748,7 @@ impl Instance { if let Some(mut flags) = flags { flags.set_may_enter(false); } + log::trace!("calling {callee:p} from guest thread {guest_thread:?}"); crate::Func::call_unchecked_raw( &mut store, callee.as_non_null(), @@ -1721,7 +1773,7 @@ impl Instance { let call = unsafe { make_call( store.as_context_mut(), - guest_task, + guest_thread, callee, param_count, result_count, @@ -1731,21 +1783,30 @@ impl Instance { let callee_instance = self .concurrent_state_mut(store.0) - .get_mut(guest_task)? + .get_task_mut(guest_thread)? .instance; let fun = if callback.is_some() { assert!(async_); Box::new(move |store: &mut dyn VMStore, instance: Instance| { - let old_task = instance + let old_thread = instance .concurrent_state_mut(store) - .guest_task - .replace(guest_task); + .guest_thread + .replace(guest_thread); log::trace!( - "stackless call: replaced {old_task:?} with {guest_task:?} as current task" + "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread" ); + let old_task = old_thread.map(|thread| { + instance + .concurrent_state_mut(store) + .get_task_id(thread) + .unwrap() + }); + let task_id = instance + .concurrent_state_mut(store) + .get_task_id(guest_thread)?; - instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?; + instance.maybe_push_call_context(store.store_opaque_mut(), task_id, old_task)?; instance .concurrent_state_mut(store) @@ -1763,18 +1824,18 @@ impl Instance { .concurrent_state_mut(store) .exit_instance(callee_instance)?; - instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?; + instance.maybe_pop_call_context(store.store_opaque_mut(), task_id, old_task)?; let state = instance.concurrent_state_mut(store); - state.guest_task = old_task; - log::trace!("stackless call: restored {old_task:?} as current task"); + state.guest_thread = old_thread; + log::trace!("stackless call: restored {old_thread:?} as current thread"); // SAFETY: `wasmparser` will have validated that the callback // function returns a `i32` result. let code = unsafe { storage[0].assume_init() }.get_i32() as u32; instance.id().get_mut(store).handle_callback_code( - guest_task, + guest_thread, callee_instance, code, true, @@ -1786,17 +1847,26 @@ impl Instance { } else { let token = StoreToken::new(store.as_context_mut()); Box::new(move |store: &mut dyn VMStore, instance: Instance| { - let old_task = instance + let old_thread = instance .concurrent_state_mut(store) - .guest_task - .replace(guest_task); + .guest_thread + .replace(guest_thread); log::trace!( - "stackful call: replaced {old_task:?} with {guest_task:?} as current task", + "stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread", ); + let old_task = old_thread.map(|thread| { + self.concurrent_state_mut(store) + .get_task_id(thread) + .unwrap() + }); + + let task_id = instance + .concurrent_state_mut(store) + .get_task_id(guest_thread)?; let mut flags = instance.id().get(store).instance_flags(callee_instance); - instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?; + instance.maybe_push_call_context(store.store_opaque_mut(), task_id, old_task)?; // Unless this is a callback-less (i.e. stackful) // async-lifted export, we need to record that the instance @@ -1822,7 +1892,7 @@ impl Instance { // been called. if instance .concurrent_state_mut(store) - .get_mut(guest_task)? + .get_mut(task_id)? .lift_result .is_some() { @@ -1838,9 +1908,9 @@ impl Instance { let state = instance.concurrent_state_mut(store); state.exit_instance(callee_instance)?; - assert!(state.get_mut(guest_task)?.result.is_none()); + assert!(state.get_mut(task_id)?.result.is_none()); - state.get_mut(guest_task)?.lift_result.take().unwrap() + state.get_mut(task_id)?.lift_result.take().unwrap() }; // SAFETY: `result_count` represents the number of core Wasm @@ -1861,7 +1931,7 @@ impl Instance { if instance .concurrent_state_mut(store) - .get_mut(guest_task)? + .get_mut(task_id)? .call_post_return_automatically() { unsafe { @@ -1894,16 +1964,18 @@ impl Instance { instance.task_complete( store, - guest_task, + task_id, result, Status::Returned, post_return_arg, )?; } - instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?; + instance.maybe_pop_call_context(store.store_opaque_mut(), task_id, old_task)?; - let task = instance.concurrent_state_mut(store).get_mut(guest_task)?; + let state = instance.concurrent_state_mut(store); + state.get_mut(guest_thread)?.state = GuestThreadState::Completed; + let task = state.get_mut(task_id)?; match &task.caller { Caller::Host { @@ -1911,8 +1983,7 @@ impl Instance { .. } => { if *remove_task_automatically { - Waitable::Guest(guest_task) - .delete_from(instance.concurrent_state_mut(store))?; + //todo } } Caller::Guest { .. } => { @@ -1926,8 +1997,8 @@ impl Instance { self.concurrent_state_mut(store.0) .push_high_priority(WorkItem::GuestCall(GuestCall { - task: guest_task, - kind: GuestCallKind::Start(fun), + thread: guest_thread, + kind: GuestCallKind::StartImplicit(fun), })); Ok(()) @@ -1994,7 +2065,7 @@ impl Instance { let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap()); let token = StoreToken::new(store.as_context_mut()); let state = self.concurrent_state_mut(store.0); - let old_task = state.guest_task.take(); + let old_thread = state.guest_thread.take(); let new_task = GuestTask::new( state, Box::new(move |store, instance, dst| { @@ -2039,7 +2110,7 @@ impl Instance { } dst.copy_from_slice(&src[..dst.len()]); let state = instance.concurrent_state_mut(store.0); - let task = state.guest_task.unwrap(); + let task = state.get_task_id(state.guest_thread.unwrap())?; Waitable::Guest(task).set_event( state, Some(Event::Subtask { @@ -2071,9 +2142,9 @@ impl Instance { )?; } let state = instance.concurrent_state_mut(store.0); - let task = state.guest_task.unwrap(); + let thread = state.guest_thread.unwrap(); if sync_caller { - state.get_mut(task)?.sync_result = + state.get_task_mut(thread)?.sync_result = Some(if let ResultInfo::Stack { result_count } = &result_info { match result_count { 0 => None, @@ -2091,7 +2162,7 @@ impl Instance { string_encoding: StringEncoding::from_u8(string_encoding).unwrap(), }, Caller::Guest { - task: old_task.unwrap(), + thread: old_thread.unwrap(), instance: caller_instance, }, None, @@ -2099,19 +2170,24 @@ impl Instance { )?; let guest_task = state.push(new_task)?; + let new_thread = GuestThread::new_implicit(guest_task); + let guest_thread = state.push(new_thread)?; + state.get_mut(guest_task)?.threads.push(guest_thread); - if let Some(old_task) = old_task { + if let Some(old_thread) = old_thread { if !state.may_enter(guest_task) { bail!(crate::Trap::CannotEnterComponent); } - state.get_mut(old_task)?.subtasks.insert(guest_task); + state.get_task_mut(old_thread)?.subtasks.insert(guest_task); }; - // Make the new task the current one so that `Self::start_call` knows + // Make the new thread the current one so that `Self::start_call` knows // which one to start. - state.guest_task = Some(guest_task); - log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}"); + state.guest_thread = Some(guest_thread); + log::trace!( + "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}" + ); Ok(()) } @@ -2179,7 +2255,8 @@ impl Instance { let token = StoreToken::new(store.as_context_mut()); let async_caller = storage.is_none(); let state = self.concurrent_state_mut(store.0); - let guest_task = state.guest_task.unwrap(); + let guest_thread = state.guest_thread.unwrap(); + let guest_task = state.get_task_id(guest_thread)?; let may_enter_after_call = state.get_mut(guest_task)?.call_post_return_automatically(); let callee = SendSyncPtr::new(NonNull::new(callee).unwrap()); let param_count = usize::try_from(param_count).unwrap(); @@ -2211,7 +2288,7 @@ impl Instance { } let Caller::Guest { - task: caller, + thread: caller, instance: runtime_instance, } = &task.caller else { @@ -2234,7 +2311,7 @@ impl Instance { unsafe { self.queue_call( store.as_context_mut(), - guest_task, + guest_thread, callee, param_count, result_count, @@ -2251,7 +2328,7 @@ impl Instance { // the subtask... let guest_waitable = Waitable::Guest(guest_task); let old_set = guest_waitable.common(state)?.set; - let set = state.get_mut(caller)?.sync_call_set; + let set = state.get_task_mut(caller)?.sync_call_set; guest_waitable.join(state, Some(set))?; // ... and suspend this fiber temporarily while we wait for it to start. @@ -2270,7 +2347,13 @@ impl Instance { // before committing to such an optimization. And again, we'd need to // update the spec to allow that. let (status, waitable) = loop { - self.suspend(store.0, SuspendReason::Waiting { set, task: caller })?; + self.suspend( + store.0, + SuspendReason::Waiting { + set, + thread: caller, + }, + )?; let state = self.concurrent_state_mut(store.0); @@ -2326,9 +2409,9 @@ impl Instance { } } - // Reset the current task to point to the caller as it resumes control. - state.guest_task = Some(caller); - log::trace!("popped current task {guest_task:?}; new task is {caller:?}"); + // Reset the current thread to point to the caller as it resumes control. + state.guest_thread = Some(caller); + log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}"); Ok(status.pack(waitable)) } @@ -2377,7 +2460,7 @@ impl Instance { ) -> Result> { let token = StoreToken::new(store.as_context_mut()); let state = self.concurrent_state_mut(store.0); - let caller = state.guest_task.unwrap(); + let caller = state.guest_thread.unwrap(); // Create an abortable future which hooks calls to poll and manages call // context state for the future. @@ -2519,7 +2602,7 @@ impl Instance { // If there is no current guest task set, that means the host function // was registered using e.g. `LinkerInstance::func_wrap`, in which case // it should complete immediately. - let Some(caller) = state.guest_task else { + let Some(caller) = state.guest_thread else { return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) { Poll::Ready(result) => result, Poll::Pending => { @@ -2531,7 +2614,7 @@ impl Instance { // Save any existing result stashed in `GuestTask::result` so we can // replace it with the new result. let old_result = state - .get_mut(caller) + .get_task_mut(caller) .with_context(|| format!("bad handle: {caller:?}"))? .result .take(); @@ -2550,7 +2633,7 @@ impl Instance { let result = future.await?; tls::get(move |store| { let state = self.concurrent_state_mut(store); - state.get_mut(caller)?.result = Some(Box::new(result) as _); + state.get_task_mut(caller)?.result = Some(Box::new(result) as _); Waitable::Host(task).set_event( state, @@ -2589,16 +2672,25 @@ impl Instance { let state = self.concurrent_state_mut(store); state.push_future(future); - let set = state.get_mut(caller)?.sync_call_set; + let set = state.get_task_mut(caller)?.sync_call_set; Waitable::Host(task).join(state, Some(set))?; - self.suspend(store, SuspendReason::Waiting { set, task: caller })?; + self.suspend( + store, + SuspendReason::Waiting { + set, + thread: caller, + }, + )?; } } // Retrieve and return the result. Ok(*mem::replace( - &mut self.concurrent_state_mut(store).get_mut(caller)?.result, + &mut self + .concurrent_state_mut(store) + .get_task_mut(caller)? + .result, old_result, ) .unwrap() @@ -2623,15 +2715,16 @@ impl Instance { data_model, .. } = *state.options(options); - let guest_task = state.guest_task.unwrap(); + let guest_thread = state.guest_thread.unwrap(); + let guest_task = state.get_task_id(guest_thread)?; let lift = state - .get_mut(guest_task)? + .get_task_mut(guest_thread)? .lift_result .take() .ok_or_else(|| { anyhow!("`task.return` or `task.cancel` called more than once for current task") })?; - assert!(state.get_mut(guest_task)?.result.is_none()); + assert!(state.get_task_mut(guest_thread)?.result.is_none()); let invalid = ty != lift.ty || string_encoding != lift.string_encoding @@ -2654,10 +2747,9 @@ impl Instance { bail!("invalid `task.return` signature and/or options for current task"); } - log::trace!("task.return for {guest_task:?}"); + log::trace!("task.return for {guest_thread:?}"); let result = (lift.lift)(store, self, storage)?; - self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0)) } @@ -2669,8 +2761,8 @@ impl Instance { ) -> Result<()> { self.id().get(store).check_may_leave(caller)?; let state = self.concurrent_state_mut(store); - let guest_task = state.guest_task.unwrap(); - let task = state.get_mut(guest_task)?; + let guest_thread = state.guest_thread.unwrap(); + let task = state.get_task_mut(guest_thread)?; if !task.cancel_sent { bail!("`task.cancel` called by task which has not been cancelled") } @@ -2680,8 +2772,9 @@ impl Instance { assert!(task.result.is_none()); - log::trace!("task.cancel for {guest_task:?}"); + log::trace!("task.cancel for {guest_thread:?}"); + let guest_task = state.get_task_id(guest_thread)?; self.task_complete( store, guest_task, @@ -2803,54 +2896,259 @@ impl Instance { ) } - /// Implements the `thread.yield` intrinsic. - pub(crate) fn thread_yield( + unsafe fn read_funcref_from_table( + self, + store: &mut dyn VMStore, + table_idx: RuntimeTableIndex, + func_idx: u64, + ) -> Result> { + let table_import = self.id().get_mut(store).runtime_table(table_idx); + let vmctx = table_import.vmctx.as_non_null(); + // SAFETY: `vmctx` is a valid pointer, and the `Instance` is + // located immediately before the `vmctx`. See `vm::Instance::sibling_vmctx`, + // which we can't call here as we don't have a `vm::Instance` to bind the lifetime to. + let mut instance_ptr = unsafe { + vmctx + .byte_sub(mem::size_of::()) + .cast::() + }; + // SAFETY: We just constructed `instance_ptr` from a valid pointer. This pointer won't leave + // this call, so we don't need a lifetime to bind it to. + let instance = unsafe { Pin::new_unchecked(instance_ptr.as_mut()) }; + let table = instance.get_defined_table_with_lazy_init(table_import.index, [func_idx]); + match table.get_func(func_idx) { + // SAFETY: The `VMFuncRef` is not null, properly aligned, and points to a valid + // `VMFuncRef` because it came from a `Table` in a `vm::Instance`. + // We clone it here so that we don't need to worry about its lifetime. + Ok(Some(func)) => Ok(func), + Ok(None) => Err(anyhow!("function index {func_idx} out of bounds")), + Err(e) => Err(anyhow!("failed to get function from table: {e}")), + } + } + + /// Implements the `thread.new_indirect` intrinsic. + pub(crate) fn thread_new_indirect( self, store: &mut dyn VMStore, caller: RuntimeComponentInstanceIndex, - cancellable: bool, - ) -> Result { + _func_ty_idx: TypeFuncIndex, // currently unused + start_func_table_idx: RuntimeTableIndex, + start_func_idx: u32, + context: i32, + ) -> Result { self.id().get(store).check_may_leave(caller)?; - self.waitable_check(store, cancellable, WaitableCheck::Yield) - .map(|_| { - if cancellable { - let state = self.concurrent_state_mut(store); - let task = state.guest_task.unwrap(); - if let Some(event) = state.get_mut(task).unwrap().event.take() { - assert!(matches!(event, Event::Cancelled)); - true - } else { - false - } + + log::trace!("creating new thread"); + + let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []); + let funcref = unsafe { + self.read_funcref_from_table(store, start_func_table_idx, start_func_idx as u64) + }?; + if unsafe { funcref.as_ref().type_index } != start_func_ty.type_index() { + bail!( + "start function does not match expected type (currently only `(i32) -> ()` is supported)" + ); + } + + let state = self.concurrent_state_mut(store); + let current_thread = state.guest_thread.unwrap(); + let parent_task = state.get_task_id(current_thread)?; + + let new_thread = + GuestThread::new_explicit(parent_task, start_func_table_idx, start_func_idx, context); + let thread_id = state.push(new_thread)?; + + log::trace!("new thread with id {thread_id:?} created"); + + Ok(thread_id.rep()) + } + + fn start_thread( + &self, + store: &mut StoreContextMut, + thread: TableId, + start_func_table_idx: RuntimeTableIndex, + start_func_idx: u32, + context: i32, + high_priority: bool, + ) -> Result<()> { + let guest_thread = self.concurrent_state_mut(store.0).guest_thread.unwrap(); + let task_id = self + .concurrent_state_mut(store.0) + .get_task_id(guest_thread)?; + log::trace!("starting thread {task_id:?}:{thread:?}"); + let callee = unsafe { + self.read_funcref_from_table(store.0, start_func_table_idx, start_func_idx as u64)? + }; + + let token = StoreToken::new(store.as_context_mut()); + let callee = SendSyncPtr::new(callee); + let start_func = Box::new( + move |store: &mut dyn VMStore, instance: Instance| -> Result<()> { + let old_thread = instance + .concurrent_state_mut(store) + .guest_thread + .replace(thread); + log::trace!( + "thread start: replaced {old_thread:?} with {thread:?} as current thread" + ); + + let mut store = token.as_context_mut(store); + + let params = [ValRaw::i32(context)]; + unsafe { + crate::Func::call_unchecked_raw( + &mut store, + callee.as_non_null(), + params.as_slice().into(), + )? + } + + let state = instance.concurrent_state_mut(store.0); + state.guest_thread = old_thread; + log::trace!("thread start: restored {old_thread:?} as current thread"); + + Ok(()) + }, + ); + let guest_call = WorkItem::GuestCall(GuestCall { + thread, + kind: GuestCallKind::StartExplicit(start_func), + }); + if high_priority { + self.concurrent_state_mut(store.0) + .push_high_priority(guest_call); + } else { + self.concurrent_state_mut(store.0) + .push_low_priority(guest_call); + } + + Ok(()) + } + + pub(crate) fn resume_suspended_thread( + self, + mut store: StoreContextMut, + thread_id: TableId, + high_priority: bool, + ) -> Result<()> { + let state = self.concurrent_state_mut(store.0); + let thread = state.get_mut(thread_id)?; + + match mem::replace(&mut thread.state, GuestThreadState::Running) { + GuestThreadState::NotStartedExplicit { + start_func_table_idx, + start_func_idx, + context, + } => { + self.start_thread( + &mut store, + thread_id, + start_func_table_idx, + start_func_idx, + context, + high_priority, + )?; + } + GuestThreadState::Suspended(fiber) => { + let task_id = state.get_task_id(thread_id)?; + log::trace!("resuming thread {task_id:?}:{thread_id:?} that was suspended"); + if high_priority { + self.concurrent_state_mut(store.0) + .push_high_priority(WorkItem::ResumeFiber(fiber)); } else { - false + self.concurrent_state_mut(store.0) + .push_low_priority(WorkItem::ResumeFiber(fiber)); } - }) + } + _ => { + bail!("cannot resume thread which is not suspended"); + } + } + Ok(()) + } + + fn pending_cancellation(&self, store: &mut StoreOpaque) -> bool { + let state = self.concurrent_state_mut(store); + let thread = state.guest_thread.unwrap(); + if let Some(event) = state.get_task_mut(thread).unwrap().event.take() { + assert!(matches!(event, Event::Cancelled)); + true + } else { + false + } + } + + /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`, + /// and `thread.switch-to` intrinsics. + pub(crate) fn suspension_intrinsic( + self, + mut store: StoreContextMut, + caller: RuntimeComponentInstanceIndex, + cancellable: bool, + yielding: bool, + to_thread: Option>, + ) -> Result { + // There could be a pending cancellation from a previous uncancellable wait + if cancellable && self.pending_cancellation(store.0) { + return Ok(true); + } + + self.id().get(store.0).check_may_leave(caller)?; + + if let Some(thread) = to_thread { + self.resume_suspended_thread(store.as_context_mut(), thread, true)?; + } + + let guest_thread = self.concurrent_state_mut(store.0).guest_thread.unwrap(); + let reason = if yielding { + SuspendReason::Yielding { + thread: guest_thread, + to: to_thread, + cancellable, + } + } else { + SuspendReason::ExplicitlySuspending { + thread: guest_thread, + to: to_thread, + cancellable, + } + }; + + self.suspend(store.0, reason)?; + + Ok(cancellable && self.pending_cancellation(store.0)) } - /// Helper function for the `waitable-set.wait`, `waitable-set.poll`, and - /// `yield` intrinsics. + /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics. fn waitable_check( self, store: &mut dyn VMStore, cancellable: bool, check: WaitableCheck, ) -> Result { - let guest_task = self.concurrent_state_mut(store).guest_task.unwrap(); + let guest_thread = self.concurrent_state_mut(store).guest_thread.unwrap(); let (wait, set) = match &check { WaitableCheck::Wait(params) => (true, Some(params.set)), WaitableCheck::Poll(params) => (false, Some(params.set)), - WaitableCheck::Yield => (false, None), }; - // First, suspend this fiber, allowing any other tasks to run. - self.suspend(store, SuspendReason::Yielding { task: guest_task })?; + log::trace!("waitable check for {guest_thread:?}; set {set:?}"); + // First, suspend this fiber, allowing any other threads to run. + self.suspend( + store, + SuspendReason::Yielding { + thread: guest_thread, + to: None, + cancellable, + }, + )?; - log::trace!("waitable check for {guest_task:?}; set {set:?}"); + log::trace!("waitable check for {guest_thread:?}; set {set:?}"); let state = self.concurrent_state_mut(store); - let task = state.get_mut(guest_task)?; + let task = state.get_task_mut(guest_thread)?; if wait && task.callback.is_some() { bail!("cannot call `task.wait` from async-lifted export with callback"); @@ -2866,7 +3164,7 @@ impl Instance { && state.get_mut(set)?.ready.is_empty() { if cancellable { - let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set); + let old = state.get_mut(guest_thread)?.wake_on_cancel.replace(set); assert!(old.is_none()); } @@ -2874,22 +3172,22 @@ impl Instance { store, SuspendReason::Waiting { set, - task: guest_task, + thread: guest_thread, }, )?; } } - log::trace!("waitable check for {guest_task:?}; set {set:?}, part two"); + log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two"); let result = match check { // Deliver any pending events to the guest and return. WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => { - let event = self.id().get_mut(store).get_event( - guest_task, - Some(params.set), - cancellable, - )?; + let task_id = self.concurrent_state_mut(store).get_task_id(guest_thread)?; + let event = + self.id() + .get_mut(store) + .get_event(task_id, Some(params.set), cancellable)?; let (ordinal, handle, result) = if wait { let (event, waitable) = event.unwrap(); @@ -2903,7 +3201,8 @@ impl Instance { (ordinal, handle, result) } else { log::trace!( - "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}", + "no events ready to deliver via waitable-set.poll to {:?}; set {:?}", + task_id, params.set ); let (ordinal, result) = Event::None.parts(); @@ -2920,7 +3219,6 @@ impl Instance { options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes()); Ok(ordinal) } - WaitableCheck::Yield => Ok(0), }; result @@ -2969,7 +3267,7 @@ impl Instance { return Ok(Status::ReturnCancelled as u32); } } else { - let caller = concurrent_state.guest_task.unwrap(); + let caller = concurrent_state.guest_thread.unwrap(); let guest_task = TableId::::new(rep); let task = concurrent_state.get_mut(guest_task)?; if task.lower_params.is_some() { @@ -2979,14 +3277,19 @@ impl Instance { // Not yet started; cancel and remove from pending let callee_instance = task.instance; - let kind = concurrent_state + /* todo + if !concurrent_state .instance_state(callee_instance) .pending - .remove(&guest_task); - - if kind.is_none() { + .iter() + .any(|(thread, _)| thread.task == guest_task) + { bail!("`subtask.cancel` called after terminal status delivered"); } + concurrent_state + .instance_state(callee_instance) + .pending + .retain(|thread, _| thread.task != guest_task);*/ return Ok(Status::StartCancelled as u32); } else if task.lift_result.is_some() { @@ -2998,22 +3301,37 @@ impl Instance { // `Event::Cancelled` if it was already cancelled), but that's // okay -- this should supersede the previous state. task.event = Some(Event::Cancelled); - if let Some(set) = task.wake_on_cancel.take() { - let item = match concurrent_state - .get_mut(set)? - .waiting - .remove(&guest_task) + for thread in task.threads.clone() { + if let Some(set) = concurrent_state + .get_mut(thread) .unwrap() + .wake_on_cancel + .take() { - WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), - WaitMode::Callback => WorkItem::GuestCall(GuestCall { - task: guest_task, - kind: GuestCallKind::DeliverEvent { set: None }, - }), - }; - concurrent_state.push_high_priority(item); + let item = match concurrent_state + .get_mut(set)? + .waiting + .remove(&thread) + .unwrap() + { + WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), + WaitMode::Callback => WorkItem::GuestCall(GuestCall { + thread: thread, + kind: GuestCallKind::DeliverEvent { set: None }, + }), + }; + concurrent_state.push_high_priority(item); - self.suspend(store, SuspendReason::Yielding { task: caller })?; + self.suspend( + store, + SuspendReason::Yielding { + thread: caller, + to: None, + cancellable: false, + }, + )?; + break; + } } let concurrent_state = self.concurrent_state_mut(store); @@ -3043,11 +3361,17 @@ impl Instance { fn wait_for_event(self, store: &mut dyn VMStore, waitable: Waitable) -> Result<()> { let state = self.concurrent_state_mut(store); - let caller = state.guest_task.unwrap(); + let caller = state.guest_thread.unwrap(); let old_set = waitable.common(state)?.set; - let set = state.get_mut(caller)?.sync_call_set; + let set = state.get_task_mut(caller)?.sync_call_set; waitable.join(state, Some(set))?; - self.suspend(store, SuspendReason::Waiting { set, task: caller })?; + self.suspend( + store, + SuspendReason::Waiting { + set, + thread: caller, + }, + )?; let state = self.concurrent_state_mut(store); waitable.join(state, old_set) } @@ -3262,6 +3586,23 @@ pub trait VMComponentAsyncStore { err_ctx_handle: u32, debug_msg_address: u32, ) -> Result<()>; + + /// The `thread.yield`, `thread.yield-to`, `thread.suspend`, and `thread.switch-to` intrinsics. + fn suspension_intrinsic( + &mut self, + instance: Instance, + caller: RuntimeComponentInstanceIndex, + cancellable: bool, + yielding: bool, + to_thread: Option>, + ) -> Result; + + /// The `thread.resume-later` intrinsic. + fn thread_resume_later( + &mut self, + instance: Instance, + thread: TableId, + ) -> Result<()>; } /// SAFETY: See trait docs. @@ -3557,6 +3898,31 @@ impl VMComponentAsyncStore for StoreInner { debug_msg_address, ) } + + fn suspension_intrinsic( + &mut self, + instance: Instance, + caller: RuntimeComponentInstanceIndex, + cancellable: bool, + yielding: bool, + to_thread: Option>, + ) -> Result { + instance.suspension_intrinsic( + StoreContextMut(self), + caller, + cancellable, + yielding, + to_thread, + ) + } + + fn thread_resume_later( + &mut self, + instance: Instance, + thread: TableId, + ) -> Result<()> { + instance.resume_suspended_thread(StoreContextMut(self), thread, false) + } } type HostTaskFuture = Pin> + Send + 'static>>; @@ -3613,10 +3979,10 @@ enum Caller { /// If true, call `post-return` function (if any) automatically. call_post_return_automatically: bool, }, - /// Another guest task called the guest task + /// Another guest thread called the guest task Guest { /// The id of the caller - task: TableId, + thread: TableId, /// The instance to use to enforce reentrance rules. /// /// Note that this might not be the same as the instance the caller task @@ -3635,8 +4001,67 @@ struct LiftResult { string_encoding: StringEncoding, } +enum GuestThreadState { + NotStartedImplicit, + NotStartedExplicit { + start_func_table_idx: RuntimeTableIndex, + start_func_idx: u32, + context: i32, + }, + Running, + Suspended(StoreFiber<'static>), + Pending, + Completed, +} +pub(crate) struct GuestThread { + /// Context-local state used to implement the `context.{get,set}` + /// intrinsics. + context: [u32; 2], + /// The owning guest task. + parent_task: TableId, + /// If present, indicates that the thread is currently waiting on the + /// specified set but may be cancelled and woken immediately. + wake_on_cancel: Option>, + state: GuestThreadState, +} + +impl GuestThread { + fn new_implicit(parent_task: TableId) -> Self { + Self { + context: [0; 2], + parent_task: parent_task, + wake_on_cancel: None, + state: GuestThreadState::NotStartedImplicit, + } + } + + fn new_explicit( + parent_task: TableId, + start_func_table_idx: RuntimeTableIndex, + start_func_idx: u32, + context: i32, + ) -> Self { + Self { + context: [0; 2], + parent_task: parent_task, + wake_on_cancel: None, + state: GuestThreadState::NotStartedExplicit { + start_func_table_idx, + start_func_idx, + context, + }, + } + } +} + +impl TableDebug for GuestThread { + fn type_name() -> &'static str { + "GuestThread" + } +} + /// Represents a pending guest task. -struct GuestTask { +pub(crate) struct GuestTask { /// See `WaitableCommon` common: WaitableCommon, /// Closure to lower the parameters passed to this task. @@ -3663,9 +4088,6 @@ struct GuestTask { /// Whether or not we've sent a `Status::Starting` event to any current or /// future waiters for this waitable. starting_sent: bool, - /// Context-local state used to implement the `context.{get,set}` - /// intrinsics. - context: [u32; 2], /// Pending guest subtasks created by this task (directly or indirectly). /// /// This is used to re-parent subtasks which are still running when their @@ -3682,13 +4104,12 @@ struct GuestTask { /// If present, a pending `Event::None` or `Event::Cancelled` to be /// delivered to this task. event: Option, - /// If present, indicates that the task is currently waiting on the - /// specified set but may be cancelled and woken immediately. - wake_on_cancel: Option>, /// The `ExportIndex` of the guest function being called, if known. function_index: Option, /// Whether or not the task has exited. exited: bool, + /// Threads belonging to this task + threads: Vec>, } impl GuestTask { @@ -3713,14 +4134,13 @@ impl GuestTask { sync_result: None, cancel_sent: false, starting_sent: false, - context: [0u32; 2], subtasks: HashSet::new(), sync_call_set, instance: component_instance, event: None, - wake_on_cancel: None, function_index: None, exited: false, + threads: Vec::new(), }) } @@ -3743,10 +4163,10 @@ impl GuestTask { // Reparent any pending subtasks to the caller. match &self.caller { Caller::Guest { - task, + thread, instance: runtime_instance, } => { - let task_mut = state.get_mut(*task)?; + let task_mut = state.get_task_mut(*thread)?; let present = task_mut.subtasks.remove(&me); assert!(present); @@ -3756,7 +4176,7 @@ impl GuestTask { for subtask in &self.subtasks { state.get_mut(*subtask)?.caller = Caller::Guest { - task: *task, + thread: *thread, instance: *runtime_instance, }; } @@ -3914,14 +4334,14 @@ impl Waitable { fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> { if let Some(set) = self.common(state)?.set { state.get_mut(set)?.ready.insert(*self); - if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() { - let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take(); + if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() { + let wake_on_cancel = state.get_mut(thread)?.wake_on_cancel.take(); assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set)); let item = match mode { WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), WaitMode::Callback => WorkItem::GuestCall(GuestCall { - task, + thread, kind: GuestCallKind::DeliverEvent { set: Some(set) }, }), }; @@ -4015,8 +4435,8 @@ impl fmt::Debug for Waitable { struct WaitableSet { /// Which waitables in this set have pending events, if any. ready: BTreeSet, - /// Which guest tasks are currently waiting on this set, if any. - waiting: BTreeMap, WaitMode>, + /// Which guest threads are currently waiting on this set, if any. + waiting: BTreeMap, WaitMode>, } impl TableDebug for WaitableSet { @@ -4072,14 +4492,15 @@ struct InstanceState { do_not_enter: bool, /// Pending calls for this instance which require `Self::backpressure` to be /// `true` and/or `Self::do_not_enter` to be false before they can proceed. - pending: BTreeMap, GuestCallKind>, + pending: BTreeMap, GuestCallKind>, } /// Represents the Component Model Async state of a top-level component instance /// (i.e. a `super::ComponentInstance`). pub struct ConcurrentState { - /// The currently running guest task, if any. - guest_task: Option>, + /// The currently running guest thread, if any. + guest_thread: Option>, + /// The set of pending host and background tasks, if any. /// /// See `ComponentInstance::poll_until` for where we temporarily take this @@ -4095,7 +4516,7 @@ pub struct ConcurrentState { instance_states: HashMap, /// The "high priority" work queue for this instance's event loop. high_priority: Vec, - /// The "high priority" work queue for this instance's event loop. + /// The "low priority" work queue for this instance's event loop. low_priority: Vec, /// A place to stash the reason a fiber is suspending so that the code which /// resumed it will know under what conditions the fiber should be resumed @@ -4131,7 +4552,7 @@ pub struct ConcurrentState { impl ConcurrentState { pub(crate) fn new(component: &Component) -> Self { Self { - guest_task: None, + guest_thread: None, table: AlwaysMut::new(ResourceTable::new()), futures: AlwaysMut::new(Some(FuturesUnordered::new())), instance_states: HashMap::new(), @@ -4221,6 +4642,22 @@ impl ConcurrentState { self.table.get_mut().get_mut(&Resource::from(id)) } + fn get_task_id( + &mut self, + id: TableId, + ) -> Result, ResourceTableError> { + let thread = self.get_mut(id)?; + Ok(thread.parent_task) + } + + fn get_task_mut( + &mut self, + id: TableId, + ) -> Result<&mut GuestTask, ResourceTableError> { + self.get_task_id(id) + .and_then(|task_id| self.get_mut(task_id)) + } + pub fn add_child( &mut self, child: TableId, @@ -4285,16 +4722,18 @@ impl ConcurrentState { // that task or an ancestor of that task, in which case this would be a // constant time check. loop { - match &self.get_mut(guest_task).unwrap().caller { + let next_thread = match &self.get_mut(guest_task).unwrap().caller { Caller::Host { .. } => break true, - Caller::Guest { task, instance } => { + Caller::Guest { thread, instance } => { if *instance == guest_instance { break false; } else { - guest_task = *task; + *thread } } - } + }; + let task = self.get_task_id(next_thread).unwrap(); + guest_task = task; } } @@ -4318,14 +4757,14 @@ impl ConcurrentState { /// /// See `GuestCall::is_ready` for details. fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> { - for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() { - let call = GuestCall { task, kind }; + for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() { + let call = GuestCall { thread, kind }; if call.is_ready(self)? { self.push_high_priority(WorkItem::GuestCall(call)); } else { self.instance_state(instance) .pending - .insert(call.task, call.kind); + .insert(call.thread, call.kind); } } @@ -4354,20 +4793,28 @@ impl ConcurrentState { /// Implements the `context.get` intrinsic. pub(crate) fn context_get(&mut self, slot: u32) -> Result { - let task = self.guest_task.unwrap(); - let val = self.get_mut(task)?.context[usize::try_from(slot).unwrap()]; - log::trace!("context_get {task:?} slot {slot} val {val:#x}"); + let thread = self.guest_thread.unwrap(); + let val = self.get_mut(thread)?.context[usize::try_from(slot).unwrap()]; + log::trace!("context_get {thread:?} slot {slot} val {val:#x}"); Ok(val) } /// Implements the `context.set` intrinsic. pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { - let task = self.guest_task.unwrap(); - log::trace!("context_set {task:?} slot {slot} val {val:#x}"); - self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val; + let thread = self.guest_thread.unwrap(); + log::trace!("context_set {thread:?} slot {slot} val {val:#x}"); + self.get_mut(thread)?.context[usize::try_from(slot).unwrap()] = val; Ok(()) } + /// Implements the `thread.index` intrinsic. + pub(crate) fn thread_index(&self) -> Result { + Ok(self + .guest_thread + .ok_or_else(|| anyhow!("no current thread"))? + .rep()) + } + fn options(&self, options: OptionsIndex) -> &CanonicalOptions { &self.component.env_component().options[options] } @@ -4457,7 +4904,6 @@ struct WaitableCheckParams { enum WaitableCheck { Wait(WaitableCheckParams), Poll(WaitableCheckParams), - Yield, } /// Represents a guest task called from the host, prepared using `prepare_call`. @@ -4466,6 +4912,8 @@ pub(crate) struct PreparedCall { handle: Func, /// The guest task created by `prepare_call` task: TableId, + /// The guest thread created by `prepare_call` + thread: TableId, /// The number of lowered core Wasm parameters to pass to the call. param_count: usize, /// The `oneshot::Receiver` to which the result of the call will be @@ -4539,7 +4987,7 @@ pub(crate) fn prepare_call( let token = StoreToken::new(store.as_context_mut()); let state = handle.instance().concurrent_state_mut(store.0); - assert!(state.guest_task.is_none()); + assert!(state.guest_thread.is_none()); let (tx, rx) = oneshot::channel(); let (exit_tx, exit_rx) = oneshot::channel(); @@ -4594,10 +5042,13 @@ pub(crate) fn prepare_call( task.function_index = Some(handle.index()); let task = state.push(task)?; + let thread = state.push(GuestThread::new_implicit(task))?; + state.get_mut(task)?.threads.push(thread); Ok(PreparedCall { handle, task, + thread, param_count, rx, exit_rx, @@ -4617,14 +5068,14 @@ pub(crate) fn queue_call( ) -> Result)>> + Send + 'static + use> { let PreparedCall { handle, - task, + thread, param_count, rx, exit_rx, .. } = prepared; - queue_call0(store.as_context_mut(), handle, task, param_count)?; + queue_call0(store.as_context_mut(), handle, thread, param_count)?; Ok(checked( handle.instance(), @@ -4641,7 +5092,7 @@ pub(crate) fn queue_call( fn queue_call0( store: StoreContextMut, handle: Func, - guest_task: TableId, + guest_thread: TableId, param_count: usize, ) -> Result<()> { let (options, flags, _ty, raw_options) = handle.abi_info(store.0); @@ -4651,7 +5102,7 @@ fn queue_call0( let callback = options.callback(); let post_return = handle.post_return_core_func(store.0); - log::trace!("queueing call {guest_task:?}"); + log::trace!("queueing call {guest_thread:?}"); let instance_flags = if callback.is_none() { None @@ -4665,7 +5116,7 @@ fn queue_call0( unsafe { instance.queue_call( store, - guest_task, + guest_thread, SendSyncPtr::new(callee), param_count, 1, diff --git a/crates/wasmtime/src/runtime/component/func/typed.rs b/crates/wasmtime/src/runtime/component/func/typed.rs index e86aea10ccdd..faa10f0cd57e 100644 --- a/crates/wasmtime/src/runtime/component/func/typed.rs +++ b/crates/wasmtime/src/runtime/component/func/typed.rs @@ -193,7 +193,7 @@ where let ptr = SendSyncPtr::from(NonNull::from(¶ms).cast::()); let prepared = - self.prepare_call(store.as_context_mut(), false, false, move |cx, ty, dst| { + self.prepare_call(store.as_context_mut(), true, false, move |cx, ty, dst| { // SAFETY: The goal here is to get `Params`, a non-`'static` // value, to live long enough to the lowering of the // parameters. We're guaranteed that `Params` lives in the @@ -222,7 +222,7 @@ where impl<'a, T> Drop for RemoveOnDrop<'a, T> { fn drop(&mut self) { - self.task.remove(self.store.as_context_mut()).unwrap(); + // self.task.remove(self.store.as_context_mut()).unwrap(); } } diff --git a/crates/wasmtime/src/runtime/gc/.DS_Store b/crates/wasmtime/src/runtime/gc/.DS_Store new file mode 100644 index 000000000000..9429fe79751d Binary files /dev/null and b/crates/wasmtime/src/runtime/gc/.DS_Store differ diff --git a/crates/wasmtime/src/runtime/vm/.DS_Store b/crates/wasmtime/src/runtime/vm/.DS_Store new file mode 100644 index 000000000000..f6f88603b159 Binary files /dev/null and b/crates/wasmtime/src/runtime/vm/.DS_Store differ diff --git a/crates/wasmtime/src/runtime/vm/component.rs b/crates/wasmtime/src/runtime/vm/component.rs index a8fc564d0ea2..3ead19db94bf 100644 --- a/crates/wasmtime/src/runtime/vm/component.rs +++ b/crates/wasmtime/src/runtime/vm/component.rs @@ -693,7 +693,12 @@ impl ComponentInstance { let offset = self.offsets.runtime_table(i); // SAFETY: see above unsafe { - *self.as_mut().vmctx_plus_offset_mut(offset) = INVALID_PTR; + *self.as_mut().vmctx_plus_offset_mut::( + offset + offset_of!(VMTableImport, from) as u32, + ) = INVALID_PTR; + *self.as_mut().vmctx_plus_offset_mut::( + offset + offset_of!(VMTableImport, vmctx) as u32, + ) = INVALID_PTR; } } } diff --git a/crates/wasmtime/src/runtime/vm/component/libcalls.rs b/crates/wasmtime/src/runtime/vm/component/libcalls.rs index 4346c56a1239..2b68ee4c083e 100644 --- a/crates/wasmtime/src/runtime/vm/component/libcalls.rs +++ b/crates/wasmtime/src/runtime/vm/component/libcalls.rs @@ -1,6 +1,8 @@ //! Implementation of string transcoding required by the component model. use crate::component::Instance; +#[cfg(feature = "component-model-async")] +use crate::component::concurrent::table::TableId; use crate::prelude::*; #[cfg(feature = "component-model-async")] use crate::runtime::component::concurrent::ResourcePair; @@ -808,10 +810,12 @@ fn thread_yield( caller_instance: u32, cancellable: u8, ) -> Result { - instance.thread_yield( - store, + store.component_async_store().suspension_intrinsic( + instance, RuntimeComponentInstanceIndex::from_u32(caller_instance), cancellable != 0, + true, + None, ) } @@ -1361,3 +1365,85 @@ fn context_set( val, ) } + +#[cfg(feature = "component-model-async")] +fn thread_index(store: &mut dyn VMStore, instance: Instance) -> Result { + instance.concurrent_state_mut(store).thread_index() +} + +#[cfg(feature = "component-model-async")] +fn thread_new_indirect( + store: &mut dyn VMStore, + instance: Instance, + caller: u32, + func_ty_id: u32, + func_table_idx: u32, + func_idx: u32, + context: u32, +) -> Result { + instance.thread_new_indirect( + store, + RuntimeComponentInstanceIndex::from_u32(caller), + TypeFuncIndex::from_u32(func_ty_id), + RuntimeTableIndex::from_u32(func_table_idx), + func_idx, + context as i32, + ) +} + +#[cfg(feature = "component-model-async")] +fn thread_switch_to( + store: &mut dyn VMStore, + instance: Instance, + caller: u32, + cancellable: u8, + thread_idx: u32, +) -> Result { + store.component_async_store().suspension_intrinsic( + instance, + RuntimeComponentInstanceIndex::from_u32(caller), + cancellable != 0, + false, + Some(TableId::new(thread_idx)), + ) +} + +#[cfg(feature = "component-model-async")] +fn thread_suspend( + store: &mut dyn VMStore, + instance: Instance, + caller: u32, + cancellable: u8, +) -> Result { + store.component_async_store().suspension_intrinsic( + instance, + RuntimeComponentInstanceIndex::from_u32(caller), + cancellable != 0, + false, + None, + ) +} + +#[cfg(feature = "component-model-async")] +fn thread_resume_later(store: &mut dyn VMStore, instance: Instance, thread_idx: u32) -> Result<()> { + store + .component_async_store() + .thread_resume_later(instance, TableId::new(thread_idx)) +} + +#[cfg(feature = "component-model-async")] +fn thread_yield_to( + store: &mut dyn VMStore, + instance: Instance, + caller_instance: u32, + cancellable: u8, + thread_idx: u32, +) -> Result { + store.component_async_store().suspension_intrinsic( + instance, + RuntimeComponentInstanceIndex::from_u32(caller_instance), + cancellable != 0, + true, + Some(TableId::new(thread_idx)), + ) +} diff --git a/crates/wasmtime/src/runtime/vm/sys/.DS_Store b/crates/wasmtime/src/runtime/vm/sys/.DS_Store new file mode 100644 index 000000000000..0e4f80fb96aa Binary files /dev/null and b/crates/wasmtime/src/runtime/vm/sys/.DS_Store differ diff --git a/tests/all/async_functions.rs b/tests/all/async_functions.rs index 06a657fd1ef3..3dbeb856cf66 100644 --- a/tests/all/async_functions.rs +++ b/tests/all/async_functions.rs @@ -819,10 +819,10 @@ async fn non_stacky_async_activations() -> Result<()> { &engine, r#" (module $m2 - (import "" "yield" (func $yield)) + (import "" "thread-yield" (func $thread-yield)) (func $run_async (export "run_async") - call $yield + call $thread-yield ) ) "#, @@ -852,7 +852,7 @@ async fn non_stacky_async_activations() -> Result<()> { let mut store2 = Store::new(caller.engine(), ()); let mut linker2 = Linker::<()>::new(caller.engine()); linker2 - .func_wrap_async("", "yield", { + .func_wrap_async("", "thread-yield", { let stacks = stacks.clone(); move |caller, _: ()| { let stacks = stacks.clone(); diff --git a/tests/all/component_model.rs b/tests/all/component_model.rs index 3bc1891df876..42389811b5bf 100644 --- a/tests/all/component_model.rs +++ b/tests/all/component_model.rs @@ -19,6 +19,7 @@ mod nested; mod post_return; mod resources; mod strings; +mod threading; #[test] #[cfg_attr(miri, ignore)] diff --git a/tests/all/component_model/async.rs b/tests/all/component_model/async.rs index 18b509b000cb..dee8ed514501 100644 --- a/tests/all/component_model/async.rs +++ b/tests/all/component_model/async.rs @@ -112,32 +112,32 @@ async fn resume_separate_thread() -> Result<()> { let component = format!( r#" (component - (import "yield" (func $yield (result (list u8)))) + (import "thread-yield" (func $thread-yield (result (list u8)))) (core module $libc (memory (export "memory") 1) {REALLOC_AND_FREE} ) (core instance $libc (instantiate $libc)) - (core func $yield + (core func $thread-yield (canon lower - (func $yield) + (func $thread-yield) (memory $libc "memory") (realloc (func $libc "realloc")) ) ) (core module $m - (import "" "yield" (func $yield (param i32))) + (import "" "thread-yield" (func $thread-yield (param i32))) (import "libc" "memory" (memory 0)) (func $start i32.const 8 - call $yield + call $thread-yield ) (start $start) ) (core instance (instantiate $m - (with "" (instance (export "yield" (func $yield)))) + (with "" (instance (export "thread-yield" (func $thread-yield)))) (with "libc" (instance $libc)) )) ) @@ -147,7 +147,7 @@ async fn resume_separate_thread() -> Result<()> { let mut linker = Linker::new(&engine); linker .root() - .func_wrap_async("yield", |_: StoreContextMut<()>, _: ()| { + .func_wrap_async("thread-yield", |_: StoreContextMut<()>, _: ()| { Box::new(async { tokio::task::yield_now().await; Ok((vec![1u8, 2u8],)) diff --git a/tests/all/component_model/func.rs b/tests/all/component_model/func.rs index 158cc9adfa70..55d0b49cf7fe 100644 --- a/tests/all/component_model/func.rs +++ b/tests/all/component_model/func.rs @@ -910,6 +910,7 @@ async fn async_reentrance() -> Result<()> { let mut config = Config::new(); config.wasm_component_model_async(true); + config.wasm_component_model_async_stackful(true); config.async_support(true); let engine = &Engine::new(&config)?; let component = Component::new(&engine, component)?; @@ -1040,6 +1041,7 @@ async fn task_return_trap(component: &str, substring: &str) -> Result<()> { let mut config = Config::new(); config.wasm_component_model_async(true); config.wasm_component_model_async_stackful(true); + config.wasm_component_model_threading(true); config.async_support(true); let engine = &Engine::new(&config)?; let component = Component::new(&engine, component)?; diff --git a/tests/all/component_model/threading.rs b/tests/all/component_model/threading.rs new file mode 100644 index 000000000000..f64af0c6b7d1 --- /dev/null +++ b/tests/all/component_model/threading.rs @@ -0,0 +1,137 @@ +use anyhow::Result; +use wasmtime::component::types::ComponentItem; +use wasmtime::component::{Component, Linker, Type}; +use wasmtime::{Config, Engine, Module, Precompiled, Store}; + +#[tokio::test] +async fn threads() -> Result<()> { + use std::io::IsTerminal; + use tracing_subscriber::{EnvFilter, FmtSubscriber}; + let builder = FmtSubscriber::builder() + .with_writer(std::io::stderr) + .with_env_filter(EnvFilter::from_env("WASMTIME_LOG")) + .with_ansi(std::io::stderr().is_terminal()) + .init(); + let mut config = Config::new(); + config.async_support(true); + config.wasm_component_model_async(true); + config.wasm_component_model_threading(true); + config.wasm_component_model_async_stackful(true); + config.wasm_component_model_async_builtins(true); + let engine = Engine::new(&config)?; + let component = Component::new( + &engine, + r#";;! component_model_async = true +;;! component_model_threading = true + +;; Tests for basic functioning of all threading builtins with the implicit thread + one explicit thread +;; Switches between threads using all of the different threading intrinsics. + +(component + ;; Defines the table for the thread start function + (core module $libc + (table (export "__indirect_function_table") 1 funcref)) + ;; Defines the thread start function and a function that calls thread.new_indirect + (core module $m + ;; Import the threading builtins and the table from libc + (import "" "thread.new_indirect" (func $thread-new-indirect (param i32 i32) (result i32))) + (import "" "thread.suspend" (func $thread-suspend (result i32))) + (import "" "thread.yield-to" (func $thread-yield-to (param i32) (result i32))) + (import "" "thread.switch-to" (func $thread-switch-to (param i32) (result i32))) + (import "" "thread.yield" (func $thread-yield (result i32))) + (import "" "thread.index" (func $thread-index (result i32))) + (import "" "thread.resume-later" (func $thread-resume-later (param i32))) + (import "libc" "__indirect_function_table" (table $indirect-function-table 1 funcref)) + + ;; A global that we will set from the spawned thread + (global $g (mut i32) (i32.const 0)) + (global $main-thread-index (mut i32) (i32.const 0)) + + ;; The thread entry point, which sets the global to incrementing values starting from the context value + (func $thread-start (param i32) + ;; Set the global to the context value + (global.set $g (local.get 0)) + ;; The main thread switched to us, so is no longer scheduled, so we explicitly schedule it + (call $thread-resume-later (global.get $main-thread-index)) + ;; Yield back to the main thread (since that is the only other one) + (drop (call $thread-yield) + ;; Increment the global + (global.set $g (i32.add (global.get $g) (i32.const 1))) + ;; The main thread will have explicitly requested suspension, so yield to it directly + (drop (call $thread-yield-to (global.get $main-thread-index))) + ;; Increment the global again + (global.set $g (i32.add (global.get $g) (i32.const 1))) + ;; Reschedule the main thread so that it runs after we exit + (call $thread-resume-later (global.get $main-thread-index)))) + (export "thread-start" (func $thread-start)) + + ;; Initialize the function table with our thread-start function; this will be + ;; used by thread.new_indirect + (elem (table $indirect-function-table) (i32.const 0) func $thread-start) + + ;; The main entry point, which spawns a new thread to run `thread-start`, passing 42 + ;; as the context value, and then yields to it + (func (export "run") (result i32) + ;; Store the main thread's index for the spawned thread to yield to + (global.set $main-thread-index (call $thread-index)) + ;; Create a new thread, which starts suspended, and switch to it + (drop + (call $thread-switch-to + (call $thread-new-indirect (i32.const 0) (i32.const 42)))) + ;; After the thread yields back to us, check that the global was set to 42 + (if (i32.ne (global.get $g) (i32.const 42)) (then unreachable)) + ;; Suspend ourselves, which will cause the spawned thread to run + (drop (call $thread-suspend)) + ;; The spawned thread will resume us after incrementing the global, so check that it is now 43 + (if (i32.ne (global.get $g) (i32.const 43)) (then unreachable)) + ;; Suspend again, which will cause the spawned thread to run again + (drop (call $thread-suspend)) + ;; The spawned thread will reschedule us before it exits, so when we resume here the global should be 44 + (if (i32.ne (global.get $g) (i32.const 44)) (then unreachable)) + ;; Return success + (i32.const 42))) + + ;; Instantiate the libc module to get the table + (core instance $libc (instantiate $libc)) + ;; Get access to `thread.new_indirect` that uses the table from libc + (core type $start-func-ty (func (param i32))) + (alias core export $libc "__indirect_function_table" (core table $indirect-function-table)) + + (core func $thread-new-indirect + (canon thread.new_indirect $start-func-ty (table $indirect-function-table))) + (core func $thread-yield (canon thread.yield)) + (core func $thread-index (canon thread.index)) + (core func $thread-yield-to (canon thread.yield-to)) + (core func $thread-resume-later (canon thread.resume-later)) + (core func $thread-switch-to (canon thread.switch-to)) + (core func $thread-suspend (canon thread.suspend)) + + ;; Instantiate the main module + (core instance $i ( + instantiate $m + (with "" (instance + (export "thread.new_indirect" (func $thread-new-indirect)) + (export "thread.index" (func $thread-index)) + (export "thread.yield-to" (func $thread-yield-to)) + (export "thread.yield" (func $thread-yield)) + (export "thread.switch-to" (func $thread-switch-to)) + (export "thread.suspend" (func $thread-suspend)) + (export "thread.resume-later" (func $thread-resume-later)))) + (with "libc" (instance $libc)))) + + ;; Export the main entry point + (func (export "run") (result u32) (canon lift (core func $i "run")))) + "#, + )? + .serialize()?; + + let component = unsafe { Component::deserialize(&engine, &component)? }; + let mut store = Store::new(&engine, ()); + let instance = Linker::new(&engine) + .instantiate_async(&mut store, &component) + .await?; + let func = instance.get_typed_func::<(), (u32,)>(&mut store, "run")?; + assert_eq!(func.call_async(&mut store, ()).await?, (42,)); + + Ok(()) +} diff --git a/tests/all/pooling_allocator.rs b/tests/all/pooling_allocator.rs index fddd26bf64e8..7f9412a725e3 100644 --- a/tests/all/pooling_allocator.rs +++ b/tests/all/pooling_allocator.rs @@ -920,7 +920,7 @@ async fn total_stacks_limit() -> Result<()> { let mut linker = Linker::new(&engine); linker.func_new_async( "async", - "yield", + "thread-yield", FuncType::new(&engine, [], []), |_caller, _params, _results| { Box::new(async { @@ -934,9 +934,9 @@ async fn total_stacks_limit() -> Result<()> { &engine, r#" (module - (import "async" "yield" (func $yield)) + (import "async" "thread-yield" (func $thread-yield)) (func (export "run") - call $yield + call $thread-yield ) (func $empty) diff --git a/tests/all/pulley.rs b/tests/all/pulley.rs index eea59a97f107..aa656f41ac2c 100644 --- a/tests/all/pulley.rs +++ b/tests/all/pulley.rs @@ -85,8 +85,9 @@ fn provenance_test_config() -> Config { config.memory_guard_size(0); config.signals_based_traps(false); config.wasm_component_model_async(true); - config.wasm_component_model_async_stackful(true); config.wasm_component_model_async_builtins(true); + config.wasm_component_model_async_stackful(true); + config.wasm_component_model_threading(true); config.wasm_component_model_error_context(true); config } diff --git a/tests/misc_testsuite/component-model-async/fused.wast b/tests/misc_testsuite/component-model-async/fused.wast index 00e8a8d9d3d2..3ced2e1ed228 100644 --- a/tests/misc_testsuite/component-model-async/fused.wast +++ b/tests/misc_testsuite/component-model-async/fused.wast @@ -1,5 +1,6 @@ ;;! component_model_async = true ;;! component_model_async_stackful = true +;;! component_model_threading = true ;;! reference_types = true ;;! gc_types = true ;;! multi_memory = true diff --git a/tests/misc_testsuite/component-model-async/futures.wast b/tests/misc_testsuite/component-model-async/futures.wast index ec5c3457c7d0..062b53a55dcc 100644 --- a/tests/misc_testsuite/component-model-async/futures.wast +++ b/tests/misc_testsuite/component-model-async/futures.wast @@ -1,5 +1,6 @@ ;;! component_model_async = true ;;! component_model_async_builtins = true +;;! component_model_threading = true ;; future.new (component diff --git a/tests/misc_testsuite/component-model-async/lift.wast b/tests/misc_testsuite/component-model-async/lift.wast index f1175b0e89a4..d7682a33a7ca 100644 --- a/tests/misc_testsuite/component-model-async/lift.wast +++ b/tests/misc_testsuite/component-model-async/lift.wast @@ -1,5 +1,6 @@ ;;! component_model_async = true ;;! component_model_async_stackful = true +;;! component_model_threading = true ;; async lift; no callback (component diff --git a/tests/misc_testsuite/component-model-async/streams.wast b/tests/misc_testsuite/component-model-async/streams.wast index fa0b2e02c0a6..3e3f500642f1 100644 --- a/tests/misc_testsuite/component-model-async/streams.wast +++ b/tests/misc_testsuite/component-model-async/streams.wast @@ -1,5 +1,6 @@ ;;! component_model_async = true ;;! component_model_async_builtins = true +;;! component_model_threading = true ;; stream.new (component diff --git a/tests/misc_testsuite/component-model-async/task-builtins.wast b/tests/misc_testsuite/component-model-async/task-builtins.wast index fa8025fd8c45..fa327b08a387 100644 --- a/tests/misc_testsuite/component-model-async/task-builtins.wast +++ b/tests/misc_testsuite/component-model-async/task-builtins.wast @@ -1,5 +1,6 @@ ;;! component_model_async = true ;;! component_model_async_stackful = true +;;! component_model_threading = true ;; backpressure.set (component @@ -59,13 +60,13 @@ (core instance $i (instantiate $m (with "" (instance (export "waitable-set.poll" (func $waitable-set-poll)))))) ) -;; yield +;; thread.yield (component (core module $m - (import "" "yield" (func $yield (result i32))) + (import "" "thread.yield" (func $thread-yield (result i32))) ) - (core func $yield (canon thread.yield cancellable)) - (core instance $i (instantiate $m (with "" (instance (export "yield" (func $yield)))))) + (core func $thread-yield (canon thread.yield cancellable)) + (core instance $i (instantiate $m (with "" (instance (export "thread.yield" (func $thread-yield)))))) ) ;; subtask.drop diff --git a/tests/misc_testsuite/component-model-threading/many-threads-indexed.wast b/tests/misc_testsuite/component-model-threading/many-threads-indexed.wast new file mode 100644 index 000000000000..852b7f00c982 --- /dev/null +++ b/tests/misc_testsuite/component-model-threading/many-threads-indexed.wast @@ -0,0 +1,126 @@ +;;! component_model_async = true +;;! component_model_threading = true + +;; Spawns 5 threads, makes them write their indices into a buffer out-of-order, then yields to them in-order, +;; ensuring that the yield order is as-expected. + +;; More concretely: +;; Thread Index | Assigned Number +;; 1 | 16 +;; 2 | 12 +;; 3 | 04 +;; 4 | 00 +;; 5 | 08 + +;; After all threads have spawned and written their indices to the byte position given by their assigned number, +;; the buffer state will be: +;; 4 3 5 2 1 + +;; The main thread will then yield to these threads in the order that they are stored in the buffer, +;; and they will write their assigned number into the buffer, after the indices. +;; After all threads have been yielded to, the buffer contents will be: +;; 4 3 5 2 1 0 4 8 12 16 + +;; The main thread then ensures that the assigned numbers have been written into the correct locations. + +(component + ;; Defines the table for the thread start function + (core module $libc + (table (export "__indirect_function_table") 1 funcref)) + ;; Defines the thread start function and a function that calls thread.new_indirect + (core module $m + ;; Import the threading builtins and the table from libc + (import "" "thread.new_indirect" (func $thread-new-indirect (param i32 i32) (result i32))) + (import "" "thread.suspend" (func $thread-suspend (result i32))) + (import "" "thread.yield-to" (func $thread-yield-to (param i32) (result i32))) + (import "" "thread.switch-to" (func $thread-switch-to (param i32) (result i32))) + (import "" "thread.yield" (func $thread-yield (result i32))) + (import "" "thread.index" (func $thread-index (result i32))) + (import "" "thread.resume-later" (func $thread-resume-later (param i32))) + (import "libc" "__indirect_function_table" (table $indirect-function-table 1 funcref)) + + ;; A memory block that threads will write their thread indexes and assigned values into + (memory 1) + + ;; A global that points to the next memory index to write into + ;; We initialize this to 20 (threads * 4 bytes of storage per thread) + (global $g (mut i32) (i32.const 20)) + + ;; The thread entry point, which writes the thread's index into memory at the assigned location, + ;; suspends back to the main thread, then writes the assigned value into memory + (func $thread-start (param i32) + ;; Store the thread index into the assigned location + (i32.store (local.get 0) (call $thread-index)) + (drop (call $thread-suspend)) + (i32.store (global.get $g) (local.get 0)) + (global.set $g + (i32.add (global.get $g) (i32.const 4)))) + (export "thread-start" (func $thread-start)) + + ;; Initialize the function table with our thread-start function; this will be + ;; used by thread.new_indirect + (elem (table $indirect-function-table) (i32.const 0) func $thread-start) + + (func $new-thread (param i32) + (drop + (call $thread-yield-to + (call $thread-new-indirect (i32.const 0) (local.get 0))))) + + ;; The main entry point + (func (export "run") (result i32) + ;; Spawn 5 new threads with assigned numbers + (call $new-thread (i32.const 16)) + (call $new-thread (i32.const 12)) + (call $new-thread (i32.const 4)) + (call $new-thread (i32.const 0)) + (call $new-thread (i32.const 8)) + + ;; Yield to all threads in ascending order of assigned number + (drop (call $thread-yield-to (i32.load (i32.const 0)))) + (drop (call $thread-yield-to (i32.load (i32.const 4)))) + (drop (call $thread-yield-to (i32.load (i32.const 8)))) + (drop (call $thread-yield-to (i32.load (i32.const 12)))) + (drop (call $thread-yield-to (i32.load (i32.const 16)))) + + ;; Ensure all assigned numbers have been written to the buffer in order + (if (i32.ne (i32.load (i32.const 20)) (i32.const 0)) (then unreachable)) + (if (i32.ne (i32.load (i32.const 24)) (i32.const 4)) (then unreachable)) + (if (i32.ne (i32.load (i32.const 28)) (i32.const 8)) (then unreachable)) + (if (i32.ne (i32.load (i32.const 32)) (i32.const 12)) (then unreachable)) + (if (i32.ne (i32.load (i32.const 36)) (i32.const 16)) (then unreachable)) + + ;; Sentinel value + (i32.const 42))) + + ;; Instantiate the libc module to get the table + (core instance $libc (instantiate $libc)) + ;; Get access to `thread.new_indirect` that uses the table from libc + (core type $start-func-ty (func (param i32))) + (alias core export $libc "__indirect_function_table" (core table $indirect-function-table)) + + (core func $thread-new-indirect + (canon thread.new_indirect $start-func-ty (table $indirect-function-table))) + (core func $thread-yield (canon thread.yield)) + (core func $thread-index (canon thread.index)) + (core func $thread-yield-to (canon thread.yield-to)) + (core func $thread-resume-later (canon thread.resume-later)) + (core func $thread-switch-to (canon thread.switch-to)) + (core func $thread-suspend (canon thread.suspend)) + + ;; Instantiate the main module + (core instance $i ( + instantiate $m + (with "" (instance + (export "thread.new_indirect" (func $thread-new-indirect)) + (export "thread.index" (func $thread-index)) + (export "thread.yield-to" (func $thread-yield-to)) + (export "thread.yield" (func $thread-yield)) + (export "thread.switch-to" (func $thread-switch-to)) + (export "thread.suspend" (func $thread-suspend)) + (export "thread.resume-later" (func $thread-resume-later)))) + (with "libc" (instance $libc)))) + + ;; Export the main entry point + (func (export "run") (result u32) (canon lift (core func $i "run")))) + +(assert_return (invoke "run") (u32.const 42)) \ No newline at end of file diff --git a/tests/misc_testsuite/component-model-threading/stackful-cancellation.wast b/tests/misc_testsuite/component-model-threading/stackful-cancellation.wast new file mode 100644 index 000000000000..4eaa0abad99b --- /dev/null +++ b/tests/misc_testsuite/component-model-threading/stackful-cancellation.wast @@ -0,0 +1,385 @@ +;;! component_model_async = true +;;! component_model_async_stackful = true +;;! component_model_async_builtins = true +;;! component_model_threading = true +;;! reference_types = true + +;; Tests that cancellation works with the async threading intrinsics. +;; Consists of two components, C and D. C implements functions that mix cancellable and uncancellable yields and suspensions. +;; D calls these functions and cancels the resulting subtasks, ensuring that cancellation is only seen when expected. + +;; -- Component C -- + +;; `run-yield`: Yields twice, first with an uncancellable yield, then with a cancellable yield. +;; The caller cancels the subtask during the first yield, and ensures that the cancellation only takes effect +;; on the second yield. + +;; `run-yield-to`: Yields twice to a spawned thread, first with an uncancellable yield, then with a cancellable yield. +;; A complication is that we can't guarantee that if the spawned thread yields, the supertask will be scheduled to +;; cancel the subtask before the subtask's implicit thread is rescheduled. To handle this, the subtask's implicit +;; thread first waits on a future to be written by the supertask, then yields to the spawned thread. + +;; `run-suspend`: More complex, because executing an uncancellable suspension requires another +;; thread in the same subtask to explicitly wake it up. This is done by the subtask spawning a new thread that +;; waits on a future to be written by the supertask, and then resumes the main thread once that happens. +;; After setting up this thread, `run-suspend` performs an uncancellable suspend, then a cancellable suspend. +;; The caller cancels the subtask during the first suspend, writes to the future to make the spawned thread +;; resume the implicit thread, and ensures that the cancellation only takes effect on the second suspend. + +;; `run-switch-to`: Similar to `run-suspend`, but uses `thread.switch-to` instead of `thread.suspend`. + +;; -- Component D -- + +;; `run-test`: Calls one of the functions in C based on a test id, cancels the resulting subtask, and ensures that +;; cancellation is only seen when expected. + +;; `run`: Calls `run-test` for each of the functions in C. + +(component + (component $C + (type $FT (future)) + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + ;; Defines the table for the thread start functions, of which there are two + (core module $libc + (table (export "__indirect_function_table") 2 funcref)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "task.cancel" (func $task-cancel)) + (import "" "thread.new_indirect" (func $thread-new-indirect (param i32 i32) (result i32))) + (import "" "thread.suspend" (func $thread-suspend (result i32))) + (import "" "thread.suspend-cancellable" (func $thread-suspend-cancellable (result i32))) + (import "" "thread.yield-to" (func $thread-yield-to (param i32) (result i32))) + (import "" "thread.yield-to-cancellable" (func $thread-yield-to-cancellable (param i32) (result i32))) + (import "" "thread.switch-to" (func $thread-switch-to (param i32) (result i32))) + (import "" "thread.switch-to-cancellable" (func $thread-switch-to-cancellable (param i32) (result i32))) + (import "" "thread.yield" (func $thread-yield (result i32))) + (import "" "thread.yield-cancellable" (func $thread-yield-cancellable (result i32))) + (import "" "thread.index" (func $thread-index (result i32))) + (import "" "thread.resume-later" (func $thread-resume-later (param i32))) + (import "" "future.read" (func $future.read (param i32 i32) (result i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "libc" "__indirect_function_table" (table $indirect-function-table 2 funcref)) + + ;; Indices into the function table for the thread start functions + (global $wake-from-suspend-ftbl-idx i32 (i32.const 0)) + (global $just-yield-ftbl-idx i32 (i32.const 1)) + + (func (export "run-yield") + ;; Yield back to the caller, who will attempt to cancel us, but we won't see it + ;; because we're using an uncancellable yield + (if (i32.ne (call $thread-yield) (i32.const 0)) (then unreachable)) + ;; Yield back to the caller again. This time, we should receive the cancellation immediately. + (if (i32.ne (call $thread-yield-cancellable) (i32.const 1)) (then unreachable)) + (call $task-cancel) + ) + + (func $wait-for-future-write (param i32) + (local $ret i32) + ;; Perform a future.read, which will block, waiting for the supertask to write + (local.set $ret (call $future.read (local.get 0) (i32.const 0xba5eba11))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + ) + + (func $wake-from-suspend (param i32) + ;; Extract the thread index and future to wait on from the argument structure + (local $thread-index i32) (local $future i32) + (local.set $thread-index (i32.load offset=0 (local.get 0))) + (local.set $future (i32.load offset=4 (local.get 0))) + + ;; Wait for the supertask to signal us to wake up suspended thread. + (call $wait-for-future-write (local.get $future)) + ;; Resume the main thread, which is suspended in an uncancellable suspend + (call $thread-resume-later (local.get $thread-index)) + ) + + (func $just-yield (param $explicit-thread-idx i32) + ;; Yield nondeterministically, either back to the supertask, who will then wait on cancellation to be acknowledged, + ;; or to the implicit thread, who will acknowledge the cancellation. + (if (i32.ne (call $thread-yield) (i32.const 0)) (then unreachable)) + ) + + ;; Initialize the function table that will be used by thread.new_indirect + (elem (table $indirect-function-table) (i32.const 0 (; wake-from-suspend-ftbl-idx ;)) func $wake-from-suspend) + (elem (table $indirect-function-table) (i32.const 1 (; just-yield-ftbl-idx ;)) func $just-yield) + + (func (export "run-yield-to") (param $futr i32) + (local $thread-index i32) + ;; Spawn a new thread that will wake us up from our uncancellable suspend; we'll switch to it next + (local.set $thread-index + (call $thread-new-indirect (global.get $just-yield-ftbl-idx) (call $thread-index))) + + ;; We can't guarantee that the supertask will be scheduled to cancel us before we're rescheduled, so we first + ;; wait on the future to be written, then yield to the spawned thread. This means that cancellation will be + ;; sent while we're waiting on the future rather than at the yield point, but the cancel will still be pending + ;; when we reach the yield point, so it should still be ignored by the uncancellable yield and only take effect + ;; when we reach the second, cancellable yield. + (call $wait-for-future-write (local.get $futr)) + + ;; Yield to the spawned thread uncancellably. We should eventually be rescheduled without being notified + ;; of the pending cancellation. + (if (i32.ne (call $thread-yield-to (local.get $thread-index)) (i32.const 0)) (then unreachable)) + ;; Yield to the spawned thread again. This time we should see the cancellation immediately. + (if (i32.ne (call $thread-yield-to-cancellable (local.get $thread-index)) (i32.const 1)) (then unreachable)) + (call $task-cancel) + ) + + (func (export "run-suspend") (param $futr i32) + ;; Set up the arguments for the wake-for-suspend thread start function. + ;; It expects a pointer to a structure containing the thread index to resume + ;; and the future to wait on before resuming it. + (local $wake-from-suspend-argp i32) + (local.set $wake-from-suspend-argp (i32.const 4)) + (i32.store offset=0 (local.get $wake-from-suspend-argp) (call $thread-index)) + (i32.store offset=4 (local.get $wake-from-suspend-argp) (local.get $futr)) + + ;; Spawn a new thread that will wake us up from our uncancellable suspend and schedule + ;; it to resume after we suspend. + (call $thread-resume-later + (call $thread-new-indirect (global.get $wake-from-suspend-ftbl-idx) (local.get $wake-from-suspend-argp))) + + ;; Request suspension. We will not be woken up by cancellation, because this is an uncancellable + ;; suspend. We will be woken up by the other thread we spawned above, which will be resumed after + ;; the supertask cancels our subtask. + (if (i32.ne (call $thread-suspend) (i32.const 0)) (then unreachable)) + ;; Request suspension again. This time we should see the cancellation immediately. + (if (i32.ne (call $thread-suspend-cancellable) (i32.const 1)) (then unreachable)) + (call $task-cancel) + ) + + (func (export "run-switch-to") (param $futr i32) + (local $thread-index i32) + ;; Set up the arguments for the wake-for-suspend thread start function. + ;; It expects a pointer to a structure containing the thread index to resume + ;; and the future to wait on before resuming it. + (local $wake-from-suspend-argp i32) + (local.set $wake-from-suspend-argp (i32.const 4)) + (i32.store offset=0 (local.get $wake-from-suspend-argp) (call $thread-index)) + (i32.store offset=4 (local.get $wake-from-suspend-argp) (local.get $futr)) + + ;; Spawn a new thread that will wake us up from our uncancellable suspend; we'll switch to it next + (local.set $thread-index + (call $thread-new-indirect (global.get $wake-from-suspend-ftbl-idx) (local.get $wake-from-suspend-argp))) + + ;; Request suspension by switching to the spawned thread. + ;; We will not be woken up by cancellation, because this is an uncancellable suspend. + ;; We will be woken up by the other thread we spawned above, which will be resumed after + ;; the supertask cancels our subtask. + (if (i32.ne (call $thread-switch-to (local.get $thread-index)) (i32.const 0)) (then unreachable)) + ;; Request suspension again. This time we should see the cancellation immediately. + (if (i32.ne (call $thread-switch-to-cancellable (local.get $thread-index)) (i32.const 1)) (then unreachable)) + (call $task-cancel) + ) + ) + + ;; Instantiate the libc module to get the table + (core instance $libc (instantiate $libc)) + ;; Get access to `thread.new_indirect` that uses the table from libc + (core type $start-func-ty (func (param i32))) + (alias core export $libc "__indirect_function_table" (core table $indirect-function-table)) + + (core func $task-cancel (canon task.cancel)) + (core func $thread-new-indirect + (canon thread.new_indirect $start-func-ty (table $indirect-function-table))) + (core func $thread-yield (canon thread.yield)) + (core func $thread-yield-cancellable (canon thread.yield cancellable)) + (core func $thread-index (canon thread.index)) + (core func $thread-yield-to (canon thread.yield-to)) + (core func $thread-yield-to-cancellable (canon thread.yield-to cancellable)) + (core func $thread-resume-later (canon thread.resume-later)) + (core func $thread-switch-to (canon thread.switch-to)) + (core func $thread-switch-to-cancellable (canon thread.switch-to cancellable)) + (core func $thread-suspend (canon thread.suspend)) + (core func $thread-suspend-cancellable (canon thread.suspend cancellable)) + (core func $future.read (canon future.read $FT (memory $memory "mem"))) + (core func $waitable-set.new (canon waitable-set.new)) + (core func $waitable.join (canon waitable.join)) + (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem"))) + + ;; Instantiate the main module + (core instance $cm ( + instantiate $CM + (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.cancel" (func $task-cancel)) + (export "thread.new_indirect" (func $thread-new-indirect)) + (export "thread.index" (func $thread-index)) + (export "thread.yield-to" (func $thread-yield-to)) + (export "thread.yield-to-cancellable" (func $thread-yield-to-cancellable)) + (export "thread.yield" (func $thread-yield)) + (export "thread.yield-cancellable" (func $thread-yield-cancellable)) + (export "thread.switch-to" (func $thread-switch-to)) + (export "thread.switch-to-cancellable" (func $thread-switch-to-cancellable)) + (export "thread.suspend" (func $thread-suspend)) + (export "thread.suspend-cancellable" (func $thread-suspend-cancellable)) + (export "thread.resume-later" (func $thread-resume-later)) + (export "future.read" (func $future.read)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "waitable-set.new" (func $waitable-set.new)))) + (with "libc" (instance $libc)))) + + (func (export "run-yield") (result u32) (canon lift (core func $cm "run-yield") async)) + (func (export "run-yield-to") (param "fut" $FT) (result u32) (canon lift (core func $cm "run-yield-to") async)) + (func (export "run-suspend") (param "fut" $FT) (result u32) (canon lift (core func $cm "run-suspend") async)) + (func (export "run-switch-to") (param "fut" $FT) (result u32) (canon lift (core func $cm "run-switch-to") async)) + ) + + (component $D + (type $FT (future)) + (import "run-yield" (func $run-yield (result u32))) + (import "run-yield-to" (func $run-yield-to (param "fut" $FT) (result u32))) + (import "run-suspend" (func $run-suspend (param "fut" $FT) (result u32))) + (import "run-switch-to" (func $run-switch-to (param "fut" $FT) (result u32))) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $DM + (import "" "mem" (memory 1)) + (import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32))) + (import "" "run-yield" (func $run-yield (param i32) (result i32))) + (import "" "run-yield-to" (func $run-yield-to (param i32 i32) (result i32))) + (import "" "run-suspend" (func $run-suspend (param i32 i32) (result i32))) + (import "" "run-switch-to" (func $run-switch-to (param i32 i32) (result i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "future.new" (func $future.new (result i64))) + (import "" "future.write" (func $future.write (param i32 i32) (result i32))) + (import "" "thread.yield" (func $thread-yield (result i32))) + + (func $run-test (param $test-id i32) (result i32) + (local $ret i32) (local $subtask i32) + (local $ws i32) (local $event_code i32) + (local $run-retp i32) (local $wait-retp i32) + (local $ret64 i64) (local $futr i32) (local $futw i32) + + ;; Set up return value storage for run-suspend/switch-to and waitable-set.wait + (local.set $run-retp (i32.const 4)) + (local.set $wait-retp (i32.const 8)) + (i32.store (local.get $run-retp) (i32.const 0xbad0bad0)) + (i32.store (local.get $wait-retp) (i32.const 0xbad0bad0)) + + ;; Create a future that the subtask may wait on + (local.set $ret64 (call $future.new)) + (local.set $futr (i32.wrap_i64 (local.get $ret64))) + (local.set $futw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + + ;; Calling run-suspend/switch-to will start the thread, which will suspend. + ;; This is basically a switch statement: + ;; 0: run-yield + ;; 1: run-yield-to + ;; 2: run-suspend + ;; 3: run-switch-to + (if (i32.eq (local.get $test-id) (i32.const 0)) + (then (local.set $ret (call $run-yield (local.get $run-retp)))) + (else (if (i32.eq (local.get $test-id) (i32.const 1)) + (then (local.set $ret (call $run-yield-to (local.get $futr) (local.get $run-retp)))) + (else (if (i32.eq (local.get $test-id) (i32.const 2)) + (then (local.set $ret (call $run-suspend (local.get $futr) (local.get $run-retp)))) + (else (if (i32.eq (local.get $test-id) (i32.const 3)) + (then (local.set $ret (call $run-switch-to (local.get $futr) (local.get $run-retp)))) + (else unreachable)))))))) + + ;; Ensure that the thread started + (if (i32.ne (i32.and (local.get $ret) (i32.const 0xF)) (i32.const 1 (; STARTED ;))) + (then unreachable)) + ;; Extract the subtask index + (local.set $subtask (i32.shr_u (local.get $ret) (i32.const 4))) + ;; Cancel the subtask, which should block, because the initial suspend/yield is uncancellable + (local.set $ret (call $subtask.cancel (local.get $subtask))) + ;; Ensure the cancellation blocked + (if (i32.ne (local.get $ret) (i32.const -1 (; BLOCKED ;))) + (then unreachable)) + + ;; If we're not testing run-yield, the subtask is expecting a write to our future, so write to it + (if (i32.ne (local.get $test-id) (i32.const 0)) + (then + (local.set $ret (call $future.write (local.get $futw) (i32.const 0xdeadbeef))) + ;; The write should succeed + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)))) + + ;; Wait on the subtask, which will eventually progress to a cancellable yield/suspend and acknowledge the cancellation + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $subtask) (local.get $ws)) + (local.set $event_code (call $waitable-set.wait (local.get $ws) (local.get $wait-retp))) + ;; Ensure we got the subtask event + (if (i32.ne (local.get $event_code) (i32.const 1 (; SUBTASK ;))) + (then unreachable)) + ;; Ensure the subtask index matches + (if (i32.ne (local.get $subtask) (i32.load (local.get $wait-retp))) + (then unreachable)) + ;; Ensure the subtask was cancelled before it returned + (if (i32.ne (i32.const 4 (; CANCELLED_BEFORE_RETURNED=4 | (0<<4) ;)) + (i32.load offset=4 (local.get $wait-retp))) + (then unreachable)) + + ;; Return success + (i32.const 42) + ) + + (func $run (export "run") (result i32) + ;; test-id 0: run-yield + (if (i32.ne (call $run-test (i32.const 0)) (i32.const 42)) + (then unreachable)) + + ;; test-id 1: run-yield-to + (if (i32.ne (call $run-test (i32.const 1)) (i32.const 42)) + (then unreachable)) + + ;; test-id 2: run-suspend + (if (i32.ne (call $run-test (i32.const 2)) (i32.const 42)) + (then unreachable)) + + ;; test-id 3: run-switch-to + (if (i32.ne (call $run-test (i32.const 3)) (i32.const 42)) + (then unreachable)) + + ;; Return success + (i32.const 42) + ) + ) + + (core func $waitable-set.new (canon waitable-set.new)) + (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem"))) + (core func $waitable.join (canon waitable.join)) + (core func $subtask.cancel (canon subtask.cancel async)) + (core func $future.new (canon future.new $FT)) + (core func $future.write (canon future.write $FT (memory $memory "mem"))) + (core func $thread.yield (canon thread.yield)) + (canon lower (func $run-yield) async (memory $memory "mem") (core func $run-yield')) + (canon lower (func $run-suspend) async (memory $memory "mem") (core func $run-suspend')) + (canon lower (func $run-switch-to) async (memory $memory "mem") (core func $run-switch-to')) + (canon lower (func $run-yield-to) async (memory $memory "mem") (core func $run-yield-to')) + (core instance $dm (instantiate $DM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "run-yield" (func $run-yield')) + (export "run-suspend" (func $run-suspend')) + (export "run-switch-to" (func $run-switch-to')) + (export "run-yield-to" (func $run-yield-to')) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "subtask.cancel" (func $subtask.cancel)) + (export "future.new" (func $future.new)) + (export "future.write" (func $future.write)) + (export "thread.yield" (func $thread.yield)) + )))) + (func (export "run") (result u32) (canon lift (core func $dm "run"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D + (with "run-yield" (func $c "run-yield")) + (with "run-yield-to" (func $c "run-yield-to")) + (with "run-suspend" (func $c "run-suspend")) + (with "run-switch-to" (func $c "run-switch-to")) + )) + (func (export "run") (alias export $d "run")) +) + +(assert_return (invoke "run") (u32.const 42)) \ No newline at end of file diff --git a/tests/misc_testsuite/component-model-threading/threading-builtins.wast b/tests/misc_testsuite/component-model-threading/threading-builtins.wast new file mode 100644 index 000000000000..2402f4bb505c --- /dev/null +++ b/tests/misc_testsuite/component-model-threading/threading-builtins.wast @@ -0,0 +1,102 @@ +;;! component_model_async = true +;;! component_model_threading = true + +;; Tests for basic functioning of all threading builtins with the implicit thread + one explicit thread +;; Switches between threads using all of the different threading intrinsics. + +(component + ;; Defines the table for the thread start function + (core module $libc + (table (export "__indirect_function_table") 1 funcref)) + ;; Defines the thread start function and a function that calls thread.new_indirect + (core module $m + ;; Import the threading builtins and the table from libc + (import "" "thread.new_indirect" (func $thread-new-indirect (param i32 i32) (result i32))) + (import "" "thread.suspend" (func $thread-suspend (result i32))) + (import "" "thread.yield-to" (func $thread-yield-to (param i32) (result i32))) + (import "" "thread.switch-to" (func $thread-switch-to (param i32) (result i32))) + (import "" "thread.yield" (func $thread-yield (result i32))) + (import "" "thread.index" (func $thread-index (result i32))) + (import "" "thread.resume-later" (func $thread-resume-later (param i32))) + (import "libc" "__indirect_function_table" (table $indirect-function-table 1 funcref)) + + ;; A global that we will set from the spawned thread + (global $g (mut i32) (i32.const 0)) + (global $main-thread-index (mut i32) (i32.const 0)) + + ;; The thread entry point, which sets the global to incrementing values starting from the context value + (func $thread-start (param i32) + ;; Set the global to the context value + (global.set $g (local.get 0)) + ;; The main thread switched to us, so is no longer scheduled, so we explicitly schedule it + (call $thread-resume-later (global.get $main-thread-index)) + ;; Yield back to the main thread (since that is the only other one) + (drop (call $thread-yield) + ;; Increment the global + (global.set $g (i32.add (global.get $g) (i32.const 1))) + ;; The main thread will have explicitly requested suspension, so yield to it directly + (drop (call $thread-yield-to (global.get $main-thread-index))) + ;; Increment the global again + (global.set $g (i32.add (global.get $g) (i32.const 1))) + ;; Reschedule the main thread so that it runs after we exit + (call $thread-resume-later (global.get $main-thread-index)))) + (export "thread-start" (func $thread-start)) + + ;; Initialize the function table with our thread-start function; this will be + ;; used by thread.new_indirect + (elem (table $indirect-function-table) (i32.const 0) func $thread-start) + + ;; The main entry point, which spawns a new thread to run `thread-start`, passing 42 + ;; as the context value, and then yields to it + (func (export "run") (result i32) + ;; Store the main thread's index for the spawned thread to yield to + (global.set $main-thread-index (call $thread-index)) + ;; Create a new thread, which starts suspended, and switch to it + (drop + (call $thread-switch-to + (call $thread-new-indirect (i32.const 0) (i32.const 42)))) + ;; After the thread yields back to us, check that the global was set to 42 + (if (i32.ne (global.get $g) (i32.const 42)) (then unreachable)) + ;; Suspend ourselves, which will cause the spawned thread to run + (drop (call $thread-suspend)) + ;; The spawned thread will resume us after incrementing the global, so check that it is now 43 + (if (i32.ne (global.get $g) (i32.const 43)) (then unreachable)) + ;; Suspend again, which will cause the spawned thread to run again + (drop (call $thread-suspend)) + ;; The spawned thread will reschedule us before it exits, so when we resume here the global should be 44 + (if (i32.ne (global.get $g) (i32.const 44)) (then unreachable)) + ;; Return success + (i32.const 42))) + + ;; Instantiate the libc module to get the table + (core instance $libc (instantiate $libc)) + ;; Get access to `thread.new_indirect` that uses the table from libc + (core type $start-func-ty (func (param i32))) + (alias core export $libc "__indirect_function_table" (core table $indirect-function-table)) + + (core func $thread-new-indirect + (canon thread.new_indirect $start-func-ty (table $indirect-function-table))) + (core func $thread-yield (canon thread.yield)) + (core func $thread-index (canon thread.index)) + (core func $thread-yield-to (canon thread.yield-to)) + (core func $thread-resume-later (canon thread.resume-later)) + (core func $thread-switch-to (canon thread.switch-to)) + (core func $thread-suspend (canon thread.suspend)) + + ;; Instantiate the main module + (core instance $i ( + instantiate $m + (with "" (instance + (export "thread.new_indirect" (func $thread-new-indirect)) + (export "thread.index" (func $thread-index)) + (export "thread.yield-to" (func $thread-yield-to)) + (export "thread.yield" (func $thread-yield)) + (export "thread.switch-to" (func $thread-switch-to)) + (export "thread.suspend" (func $thread-suspend)) + (export "thread.resume-later" (func $thread-resume-later)))) + (with "libc" (instance $libc)))) + + ;; Export the main entry point + (func (export "run") (result u32) (canon lift (core func $i "run")))) + +(assert_return (invoke "run") (u32.const 42)) \ No newline at end of file diff --git a/tests/wasi_testsuite/wasi-common b/tests/wasi_testsuite/wasi-common index c11cd6bbee3e..2fec29ea6de1 160000 --- a/tests/wasi_testsuite/wasi-common +++ b/tests/wasi_testsuite/wasi-common @@ -1 +1 @@ -Subproject commit c11cd6bbee3e209431415262f9701e42e9fe050a +Subproject commit 2fec29ea6de1244c124f7fe3bfe9f2946113f66e