Skip to content

Commit 9d797ac

Browse files
committed
Support in-memory I/O APIs
1 parent 3309eb3 commit 9d797ac

File tree

3 files changed

+270
-101
lines changed

3 files changed

+270
-101
lines changed

src/engine.rs

Lines changed: 58 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use anyhow::{anyhow, Result};
2-
use rust_embed::RustEmbed;
2+
use std::path::PathBuf;
33
use std::string::String;
4-
use std::{collections::HashSet, path::PathBuf};
54
use wasmtime::{AsContextMut, Config, Engine, Linker, Module, ResourceLimiter, Store};
6-
use wasmtime_wasi::pipe::{MemoryInputPipe, MemoryOutputPipe};
75
use wasmtime_wasi::preview1::WasiP1Ctx;
8-
use wasmtime_wasi::{I32Exit, WasiCtxBuilder};
6+
use wasmtime_wasi::I32Exit;
97

108
use crate::function_run_result::FunctionRunResult;
9+
use crate::io::{IOHandler, OutputAndLogs};
1110
use crate::{BytesContainer, BytesContainerType};
1211

1312
#[derive(Clone)]
@@ -16,44 +15,12 @@ pub struct ProfileOpts {
1615
pub out: PathBuf,
1716
}
1817

19-
#[derive(RustEmbed)]
20-
#[folder = "providers/"]
21-
struct StandardProviders;
22-
2318
pub fn uses_msgpack_provider(module: &Module) -> bool {
2419
module.imports().map(|i| i.module()).any(|module| {
2520
module.starts_with("shopify_function_v") || module == "shopify_functions_javy_v2"
2621
})
2722
}
2823

29-
fn import_modules<T>(
30-
module: &Module,
31-
engine: &Engine,
32-
linker: &mut Linker<T>,
33-
mut store: &mut Store<T>,
34-
) {
35-
let imported_modules: HashSet<String> =
36-
module.imports().map(|i| i.module().to_string()).collect();
37-
38-
imported_modules.iter().for_each(|module_name| {
39-
let provider_path = format!("{module_name}.wasm");
40-
let imported_module_bytes = StandardProviders::get(&provider_path);
41-
42-
if let Some(bytes) = imported_module_bytes {
43-
let imported_module = Module::from_binary(engine, &bytes.data)
44-
.unwrap_or_else(|_| panic!("Failed to load module {module_name}"));
45-
46-
let imported_module_instance = linker
47-
.instantiate(&mut store, &imported_module)
48-
.expect("Failed to instantiate imported instance");
49-
50-
linker
51-
.instance(&mut store, module_name, imported_module_instance)
52-
.expect("Failed to import module");
53-
}
54-
});
55-
}
56-
5724
pub struct FunctionRunParams<'a> {
5825
pub function_path: PathBuf,
5926
pub input: BytesContainer,
@@ -68,12 +35,12 @@ const STARTING_FUEL: u64 = u64::MAX;
6835
const MAXIMUM_MEMORIES: usize = 2; // 1 for the module, 1 for Javy's provider
6936

7037
struct FunctionContext {
71-
wasi: WasiP1Ctx,
38+
wasi: Option<WasiP1Ctx>,
7239
limiter: MemoryLimiter,
7340
}
7441

7542
impl FunctionContext {
76-
fn new(wasi: WasiP1Ctx) -> Self {
43+
fn new(wasi: Option<WasiP1Ctx>) -> Self {
7744
Self {
7845
wasi,
7946
limiter: Default::default(),
@@ -128,85 +95,75 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
12895
module,
12996
} = params;
13097

131-
let input_stream = MemoryInputPipe::new(input.raw.clone());
132-
let output_stream = MemoryOutputPipe::new(usize::MAX);
133-
let error_stream = MemoryOutputPipe::new(usize::MAX);
98+
let mut io_handler = IOHandler::new(module, input.clone());
13499

135-
let memory_usage: u64;
136-
let instructions: u64;
137100
let mut error_logs: String = String::new();
138101
let mut module_result: Result<(), anyhow::Error>;
139102
let profile_data: Option<String>;
140103

141-
{
142-
let mut linker = Linker::new(&engine);
104+
let mut linker = Linker::new(&engine);
105+
let wasi = io_handler.wasi();
106+
if wasi.is_some() {
143107
wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| {
144-
&mut ctx.wasi
108+
ctx.wasi.as_mut().expect("Should have WASI context")
145109
})?;
146110
deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?;
147-
let mut wasi_builder = WasiCtxBuilder::new();
148-
wasi_builder.stdin(input_stream);
149-
wasi_builder.stdout(output_stream.clone());
150-
wasi_builder.stderr(error_stream.clone());
151-
deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder);
152-
let wasi = wasi_builder.build_p1();
153-
let function_context = FunctionContext::new(wasi);
154-
let mut store = Store::new(&engine, function_context);
155-
store.limiter(|s| &mut s.limiter);
156-
store.set_fuel(STARTING_FUEL)?;
157-
store.set_epoch_deadline(1);
158-
159-
import_modules(&module, &engine, &mut linker, &mut store);
160-
161-
linker.module(&mut store, "Function", &module)?;
162-
let instance = linker.instantiate(&mut store, &module)?;
163-
164-
let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?;
165-
166-
(module_result, profile_data) = if let Some(profile_opts) = profile_opts {
167-
let (result, profile_data) = wasmprof::ProfilerBuilder::new(&mut store)
168-
.frequency(profile_opts.interval)
169-
.weight_unit(wasmprof::WeightUnit::Fuel)
170-
.profile(|store| func.call(store.as_context_mut(), ()));
171-
172-
(
173-
result,
174-
Some(profile_data.into_collapsed_stacks().to_string()),
175-
)
176-
} else {
177-
(func.call(store.as_context_mut(), ()), None)
178-
};
179-
180-
// modules may exit with a specific exit code, an exit code of 0 is considered success but is reported as
181-
// a GuestFault by wasmtime, so we need to map it to a success result. Any other exit code is considered
182-
// a failure.
183-
module_result = module_result.or_else(|error| match error.downcast_ref::<I32Exit>() {
184-
Some(I32Exit(0)) => Ok(()),
185-
Some(I32Exit(code)) => Err(anyhow!("module exited with code: {}", code)),
186-
None => Err(error),
187-
});
111+
}
188112

189-
memory_usage = store.data().max_memory_bytes() as u64 / 1024;
190-
instructions = STARTING_FUEL.saturating_sub(store.get_fuel().unwrap_or_default());
113+
let function_context = FunctionContext::new(wasi);
114+
let mut store = Store::new(&engine, function_context);
115+
store.limiter(|s| &mut s.limiter);
191116

192-
match module_result {
193-
Ok(_) => {}
194-
Err(ref e) => {
195-
error_logs = e.to_string();
196-
}
197-
}
117+
io_handler.initialize(&engine, &mut linker, &mut store)?;
118+
119+
store.set_fuel(STARTING_FUEL)?;
120+
store.set_epoch_deadline(1);
121+
122+
let instance = linker.instantiate(&mut store, io_handler.module())?;
123+
124+
let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?;
125+
126+
(module_result, profile_data) = if let Some(profile_opts) = profile_opts {
127+
let (result, profile_data) = wasmprof::ProfilerBuilder::new(&mut store)
128+
.frequency(profile_opts.interval)
129+
.weight_unit(wasmprof::WeightUnit::Fuel)
130+
.profile(|store| func.call(store.as_context_mut(), ()));
131+
132+
(
133+
result,
134+
Some(profile_data.into_collapsed_stacks().to_string()),
135+
)
136+
} else {
137+
(func.call(store.as_context_mut(), ()), None)
198138
};
199139

200-
let mut logs = error_stream
201-
.try_into_inner()
202-
.expect("Log stream reference still exists");
140+
// modules may exit with a specific exit code, an exit code of 0 is considered success but is reported as
141+
// a GuestFault by wasmtime, so we need to map it to a success result. Any other exit code is considered
142+
// a failure.
143+
module_result = module_result.or_else(|error| match error.downcast_ref::<I32Exit>() {
144+
Some(I32Exit(0)) => Ok(()),
145+
Some(I32Exit(code)) => Err(anyhow!("module exited with code: {}", code)),
146+
None => Err(error),
147+
});
148+
149+
let memory_usage = store.data().max_memory_bytes() as u64 / 1024;
150+
let instructions = STARTING_FUEL.saturating_sub(store.get_fuel().unwrap_or_default());
151+
152+
match module_result {
153+
Ok(_) => {}
154+
Err(ref e) => {
155+
error_logs = e.to_string();
156+
}
157+
}
158+
159+
let OutputAndLogs {
160+
output: raw_output,
161+
mut logs,
162+
} = io_handler.finalize(store)?;
203163

204164
logs.extend_from_slice(error_logs.as_bytes());
205165

206166
let output_codec = input.codec;
207-
let raw_output = output_stream
208-
.try_into_inner()
209-
.expect("Output stream reference still exists");
210167
let output = BytesContainer::new(
211168
BytesContainerType::Output,
212169
output_codec,

0 commit comments

Comments
 (0)