Skip to content

Support in-memory I/O APIs #493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 58 additions & 101 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use anyhow::{anyhow, Result};
use rust_embed::RustEmbed;
use std::path::PathBuf;
use std::string::String;
use std::{collections::HashSet, path::PathBuf};
use wasmtime::{AsContextMut, Config, Engine, Linker, Module, ResourceLimiter, Store};
use wasmtime_wasi::pipe::{MemoryInputPipe, MemoryOutputPipe};
use wasmtime_wasi::preview1::WasiP1Ctx;
use wasmtime_wasi::{I32Exit, WasiCtxBuilder};
use wasmtime_wasi::I32Exit;

use crate::function_run_result::FunctionRunResult;
use crate::io::{IOHandler, OutputAndLogs};
use crate::{BytesContainer, BytesContainerType};

#[derive(Clone)]
Expand All @@ -16,44 +15,12 @@ pub struct ProfileOpts {
pub out: PathBuf,
}

#[derive(RustEmbed)]
#[folder = "providers/"]
struct StandardProviders;

pub fn uses_msgpack_provider(module: &Module) -> bool {
module.imports().map(|i| i.module()).any(|module| {
module.starts_with("shopify_function_v") || module == "shopify_functions_javy_v2"
})
}

fn import_modules<T>(
module: &Module,
engine: &Engine,
linker: &mut Linker<T>,
mut store: &mut Store<T>,
) {
let imported_modules: HashSet<String> =
module.imports().map(|i| i.module().to_string()).collect();

imported_modules.iter().for_each(|module_name| {
let provider_path = format!("{module_name}.wasm");
let imported_module_bytes = StandardProviders::get(&provider_path);

if let Some(bytes) = imported_module_bytes {
let imported_module = Module::from_binary(engine, &bytes.data)
.unwrap_or_else(|_| panic!("Failed to load module {module_name}"));

let imported_module_instance = linker
.instantiate(&mut store, &imported_module)
.expect("Failed to instantiate imported instance");

linker
.instance(&mut store, module_name, imported_module_instance)
.expect("Failed to import module");
}
});
}

pub struct FunctionRunParams<'a> {
pub function_path: PathBuf,
pub input: BytesContainer,
Expand All @@ -68,12 +35,12 @@ const STARTING_FUEL: u64 = u64::MAX;
const MAXIMUM_MEMORIES: usize = 2; // 1 for the module, 1 for Javy's provider

struct FunctionContext {
wasi: WasiP1Ctx,
wasi: Option<WasiP1Ctx>,
limiter: MemoryLimiter,
}

impl FunctionContext {
fn new(wasi: WasiP1Ctx) -> Self {
fn new(wasi: Option<WasiP1Ctx>) -> Self {
Self {
wasi,
limiter: Default::default(),
Expand Down Expand Up @@ -128,85 +95,75 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
module,
} = params;

let input_stream = MemoryInputPipe::new(input.raw.clone());
let output_stream = MemoryOutputPipe::new(usize::MAX);
let error_stream = MemoryOutputPipe::new(usize::MAX);
let mut io_handler = IOHandler::new(module, input.clone());

let memory_usage: u64;
let instructions: u64;
let mut error_logs: String = String::new();
let mut module_result: Result<(), anyhow::Error>;
let profile_data: Option<String>;

{
let mut linker = Linker::new(&engine);
let mut linker = Linker::new(&engine);
let wasi = io_handler.wasi();
if wasi.is_some() {
wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| {
&mut ctx.wasi
ctx.wasi.as_mut().expect("Should have WASI context")
})?;
deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?;
let mut wasi_builder = WasiCtxBuilder::new();
wasi_builder.stdin(input_stream);
wasi_builder.stdout(output_stream.clone());
wasi_builder.stderr(error_stream.clone());
deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder);
let wasi = wasi_builder.build_p1();
let function_context = FunctionContext::new(wasi);
let mut store = Store::new(&engine, function_context);
store.limiter(|s| &mut s.limiter);
store.set_fuel(STARTING_FUEL)?;
store.set_epoch_deadline(1);

import_modules(&module, &engine, &mut linker, &mut store);

linker.module(&mut store, "Function", &module)?;
let instance = linker.instantiate(&mut store, &module)?;

let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?;

(module_result, profile_data) = if let Some(profile_opts) = profile_opts {
let (result, profile_data) = wasmprof::ProfilerBuilder::new(&mut store)
.frequency(profile_opts.interval)
.weight_unit(wasmprof::WeightUnit::Fuel)
.profile(|store| func.call(store.as_context_mut(), ()));

(
result,
Some(profile_data.into_collapsed_stacks().to_string()),
)
} else {
(func.call(store.as_context_mut(), ()), None)
};

// modules may exit with a specific exit code, an exit code of 0 is considered success but is reported as
// a GuestFault by wasmtime, so we need to map it to a success result. Any other exit code is considered
// a failure.
module_result = module_result.or_else(|error| match error.downcast_ref::<I32Exit>() {
Some(I32Exit(0)) => Ok(()),
Some(I32Exit(code)) => Err(anyhow!("module exited with code: {}", code)),
None => Err(error),
});
}

memory_usage = store.data().max_memory_bytes() as u64 / 1024;
instructions = STARTING_FUEL.saturating_sub(store.get_fuel().unwrap_or_default());
let function_context = FunctionContext::new(wasi);
let mut store = Store::new(&engine, function_context);
store.limiter(|s| &mut s.limiter);

match module_result {
Ok(_) => {}
Err(ref e) => {
error_logs = e.to_string();
}
}
io_handler.initialize(&engine, &mut linker, &mut store)?;

store.set_fuel(STARTING_FUEL)?;
store.set_epoch_deadline(1);

let instance = linker.instantiate(&mut store, io_handler.module())?;

let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?;

(module_result, profile_data) = if let Some(profile_opts) = profile_opts {
let (result, profile_data) = wasmprof::ProfilerBuilder::new(&mut store)
.frequency(profile_opts.interval)
.weight_unit(wasmprof::WeightUnit::Fuel)
.profile(|store| func.call(store.as_context_mut(), ()));

(
result,
Some(profile_data.into_collapsed_stacks().to_string()),
)
} else {
(func.call(store.as_context_mut(), ()), None)
};

let mut logs = error_stream
.try_into_inner()
.expect("Log stream reference still exists");
// modules may exit with a specific exit code, an exit code of 0 is considered success but is reported as
// a GuestFault by wasmtime, so we need to map it to a success result. Any other exit code is considered
// a failure.
module_result = module_result.or_else(|error| match error.downcast_ref::<I32Exit>() {
Some(I32Exit(0)) => Ok(()),
Some(I32Exit(code)) => Err(anyhow!("module exited with code: {}", code)),
None => Err(error),
});

let memory_usage = store.data().max_memory_bytes() as u64 / 1024;
let instructions = STARTING_FUEL.saturating_sub(store.get_fuel().unwrap_or_default());

match module_result {
Ok(_) => {}
Err(ref e) => {
error_logs = e.to_string();
}
}

let OutputAndLogs {
output: raw_output,
mut logs,
} = io_handler.finalize(store)?;

logs.extend_from_slice(error_logs.as_bytes());

let output_codec = input.codec;
let raw_output = output_stream
.try_into_inner()
.expect("Output stream reference still exists");
let output = BytesContainer::new(
BytesContainerType::Output,
output_codec,
Expand Down
Loading
Loading