From 42efd4666cc44f62204ae3517067a8b67c8cc1ed Mon Sep 17 00:00:00 2001 From: Jeff Charles Date: Fri, 8 Aug 2025 16:01:56 -0400 Subject: [PATCH] Support in-memory I/O APIs --- src/engine.rs | 86 ++++++-------------- src/io.rs | 211 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 234 insertions(+), 64 deletions(-) create mode 100644 src/io.rs diff --git a/src/engine.rs b/src/engine.rs index f8d0370..48a01d0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,13 +1,12 @@ use anyhow::{anyhow, Result}; -use rust_embed::RustEmbed; +use std::path::PathBuf; use std::string::String; -use std::{collections::HashSet, path::PathBuf}; use wasmtime::{AsContextMut, Config, Engine, Linker, Module, ResourceLimiter, Store}; -use wasmtime_wasi::pipe::{MemoryInputPipe, MemoryOutputPipe}; use wasmtime_wasi::preview1::WasiP1Ctx; -use wasmtime_wasi::{I32Exit, WasiCtxBuilder}; +use wasmtime_wasi::I32Exit; use crate::function_run_result::FunctionRunResult; +use crate::io::{IOHandler, OutputAndLogs}; use crate::{BytesContainer, BytesContainerType}; #[derive(Clone)] @@ -16,44 +15,12 @@ pub struct ProfileOpts { pub out: PathBuf, } -#[derive(RustEmbed)] -#[folder = "providers/"] -struct StandardProviders; - pub fn uses_msgpack_provider(module: &Module) -> bool { module.imports().map(|i| i.module()).any(|module| { module.starts_with("shopify_function_v") || module == "shopify_functions_javy_v2" }) } -fn import_modules( - module: &Module, - engine: &Engine, - linker: &mut Linker, - mut store: &mut Store, -) { - let imported_modules: HashSet = - module.imports().map(|i| i.module().to_string()).collect(); - - imported_modules.iter().for_each(|module_name| { - let provider_path = format!("{module_name}.wasm"); - let imported_module_bytes = StandardProviders::get(&provider_path); - - if let Some(bytes) = imported_module_bytes { - let imported_module = Module::from_binary(engine, &bytes.data) - .unwrap_or_else(|_| panic!("Failed to load module {module_name}")); - - let imported_module_instance = linker - .instantiate(&mut store, &imported_module) - .expect("Failed to instantiate imported instance"); - - linker - .instance(&mut store, module_name, imported_module_instance) - .expect("Failed to import module"); - } - }); -} - pub struct FunctionRunParams<'a> { pub function_path: PathBuf, pub input: BytesContainer, @@ -68,12 +35,12 @@ const STARTING_FUEL: u64 = u64::MAX; const MAXIMUM_MEMORIES: usize = 2; // 1 for the module, 1 for Javy's provider struct FunctionContext { - wasi: WasiP1Ctx, + wasi: Option, limiter: MemoryLimiter, } impl FunctionContext { - fn new(wasi: WasiP1Ctx) -> Self { + fn new(wasi: Option) -> Self { Self { wasi, limiter: Default::default(), @@ -128,33 +95,29 @@ pub fn run(params: FunctionRunParams) -> Result { module, } = params; - let input_stream = MemoryInputPipe::new(input.raw.clone()); - let output_stream = MemoryOutputPipe::new(usize::MAX); - let error_stream = MemoryOutputPipe::new(usize::MAX); + let mut io_handler = IOHandler::new(module, input.clone()); let mut error_logs: String = String::new(); let mut linker = Linker::new(&engine); - wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| { - &mut ctx.wasi - })?; - deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?; - let mut wasi_builder = WasiCtxBuilder::new(); - wasi_builder.stdin(input_stream); - wasi_builder.stdout(output_stream.clone()); - wasi_builder.stderr(error_stream.clone()); - deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder); - let wasi = wasi_builder.build_p1(); + let wasi = io_handler.wasi(); + if wasi.is_some() { + wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| { + ctx.wasi.as_mut().expect("Should have WASI context") + })?; + deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?; + } + let function_context = FunctionContext::new(wasi); let mut store = Store::new(&engine, function_context); store.limiter(|s| &mut s.limiter); + + io_handler.initialize(&engine, &mut linker, &mut store)?; + store.set_fuel(STARTING_FUEL)?; store.set_epoch_deadline(1); - import_modules(&module, &engine, &mut linker, &mut store); - - linker.module(&mut store, "Function", &module)?; - let instance = linker.instantiate(&mut store, &module)?; + let instance = linker.instantiate(&mut store, io_handler.module())?; let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?; @@ -163,7 +126,6 @@ pub fn run(params: FunctionRunParams) -> Result { .frequency(profile_opts.interval) .weight_unit(wasmprof::WeightUnit::Fuel) .profile(|store| func.call(store.as_context_mut(), ())); - ( result, Some(profile_data.into_collapsed_stacks().to_string()), @@ -191,18 +153,14 @@ pub fn run(params: FunctionRunParams) -> Result { } } - drop(store); - - let mut logs = error_stream - .try_into_inner() - .expect("Log stream reference still exists"); + let OutputAndLogs { + output: raw_output, + mut logs, + } = io_handler.finalize(store)?; logs.extend_from_slice(error_logs.as_bytes()); let output_codec = input.codec; - let raw_output = output_stream - .try_into_inner() - .expect("Output stream reference still exists"); let output = BytesContainer::new( BytesContainerType::Output, output_codec, diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..a5cf4e0 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,211 @@ +use std::collections::HashSet; + +use anyhow::{anyhow, Result}; +use rust_embed::RustEmbed; +use wasmtime::{AsContext, AsContextMut, Engine, Instance, Linker, Module, Store}; +use wasmtime_wasi::{ + pipe::{MemoryInputPipe, MemoryOutputPipe}, + preview1::WasiP1Ctx, + WasiCtxBuilder, +}; + +use crate::BytesContainer; + +#[derive(RustEmbed)] +#[folder = "providers/"] +struct StandardProviders; + +pub(crate) struct OutputAndLogs { + pub output: Vec, + pub logs: Vec, +} + +struct WasiIO { + output: MemoryOutputPipe, + logs: MemoryOutputPipe, +} + +enum IOStrategy { + Wasi(WasiIO), + Memory(Option), +} + +pub(crate) struct IOHandler { + strategy: IOStrategy, + module: Module, + input: BytesContainer, +} + +impl IOHandler { + pub(crate) fn new(module: Module, input: BytesContainer) -> Self { + Self { + strategy: if uses_mem_io(&module) { + IOStrategy::Memory(None) + } else { + IOStrategy::Wasi(WasiIO { + output: MemoryOutputPipe::new(usize::MAX), + logs: MemoryOutputPipe::new(usize::MAX), + }) + }, + module, + input, + } + } + + pub(crate) fn module(&self) -> &Module { + &self.module + } + + pub(crate) fn wasi(&self) -> Option { + match &self.strategy { + IOStrategy::Wasi(WasiIO { output, logs }) => { + let input_stream = MemoryInputPipe::new(self.input.raw.clone()); + let mut wasi_builder = WasiCtxBuilder::new(); + wasi_builder.stdin(input_stream); + wasi_builder.stdout(output.clone()); + wasi_builder.stderr(logs.clone()); + deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder); + Some(wasi_builder.build_p1()) + } + IOStrategy::Memory(_instance) => None, + } + } + + pub(crate) fn initialize( + &mut self, + engine: &Engine, + linker: &mut Linker, + store: &mut Store, + ) -> Result<()> { + let mem_io_instance = instantiate_imports(&self.module, engine, linker, store); + if let IOStrategy::Memory(ref mut instance) = self.strategy { + *instance = mem_io_instance; + } + + if let Some(instance) = mem_io_instance { + let input_offset = instance + .get_typed_func::(store.as_context_mut(), "initialize")? + .call(store.as_context_mut(), self.input.raw.len() as _)?; + instance + .get_memory(store.as_context_mut(), "memory") + .ok_or_else(|| anyhow!("Missing memory export named memory"))? + .write(store.as_context_mut(), input_offset as _, &self.input.raw)?; + } + Ok(()) + } + + pub(crate) fn finalize(self, mut store: Store) -> Result { + match self.strategy { + IOStrategy::Memory(instance) => { + let instance = instance.expect("Should have been defined in initialize"); + let results_offset = instance + .get_typed_func::<(), i32>(store.as_context_mut(), "finalize")? + .call(store.as_context_mut(), ())? + as usize; + + let memory = instance + .get_memory(store.as_context_mut(), "memory") + .ok_or_else(|| anyhow!("Missing memory export named memory"))?; + + let mut buf = [0; 24]; + memory.read(store.as_context(), results_offset, &mut buf)?; + + let output_offset = u32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize; + let output_len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; + let log_offset1 = u32::from_le_bytes(buf[8..12].try_into().unwrap()) as usize; + let log_len1 = u32::from_le_bytes(buf[12..16].try_into().unwrap()) as usize; + let log_offset2 = u32::from_le_bytes(buf[16..20].try_into().unwrap()) as usize; + let log_len2 = u32::from_le_bytes(buf[20..24].try_into().unwrap()) as usize; + + let mut output = vec![0; output_len]; + memory.read(store.as_context(), output_offset, &mut output)?; + + let mut logs = vec![0; log_len1]; + memory.read(store.as_context(), log_offset1, &mut logs)?; + + let mut logs2 = vec![0; log_len2]; + memory.read(store.as_context(), log_offset2, &mut logs2)?; + + logs.append(&mut logs2); + + Ok(OutputAndLogs { output, logs }) + } + IOStrategy::Wasi(WasiIO { output, logs }) => { + // Need to drop store to have only one reference to output and error streams. + drop(store); + + let output = output + .try_into_inner() + .expect("Should have only one reference to output stream at this point") + .to_vec(); + let logs = logs + .try_into_inner() + .expect("Should have only one reference to error stream at this point") + .to_vec(); + Ok(OutputAndLogs { output, logs }) + } + } + } +} + +// Whether a module imports a provider that uses in-memory buffers for I/O. +fn uses_mem_io(module: &Module) -> bool { + module.imports().map(|i| i.module()).any(is_mem_io_provider) +} + +// Whether a provider exports functions for working with in-memory buffers for I/O. +fn is_mem_io_provider(module: &str) -> bool { + let javy_plugin_version = module + .strip_prefix("shopify_functions_javy_v") + .map(|s| s.parse::()) + .and_then(|result| result.ok()); + if javy_plugin_version.is_some_and(|version| version >= 3) { + return true; + } + + let functions_provider_version = module + .strip_prefix("shopify_function_v") + .map(|s| s.parse::()) + .and_then(|result| result.ok()); + if functions_provider_version.is_some_and(|version| version >= 2) { + return true; + } + + false +} + +fn instantiate_imports( + module: &Module, + engine: &Engine, + linker: &mut Linker, + mut store: &mut Store, +) -> Option { + let imported_modules: HashSet = + module.imports().map(|i| i.module().to_string()).collect(); + + let mut mem_io_instance = None; + + imported_modules.iter().for_each(|module_name| { + let provider_path = format!("{module_name}.wasm"); + let imported_module_bytes = StandardProviders::get(&provider_path); + + if let Some(bytes) = imported_module_bytes { + let imported_module = Module::from_binary(engine, &bytes.data) + .unwrap_or_else(|_| panic!("Failed to load module {module_name}")); + + let imported_module_instance = linker + .instantiate(&mut store, &imported_module) + .expect("Failed to instantiate imported instance"); + + if is_mem_io_provider(module_name) { + mem_io_instance = Some(imported_module_instance); + } + + linker + .instance(&mut store, module_name, imported_module_instance) + .expect("Failed to import module"); + } + }); + + mem_io_instance +} diff --git a/src/lib.rs b/src/lib.rs index ecde0ba..d28e3cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod bluejay_schema_analyzer; pub mod container; pub mod engine; pub mod function_run_result; +mod io; pub mod scale_limits_analyzer; use clap::ValueEnum;