Skip to content

Commit c9d935b

Browse files
authored
Merge pull request #51 from influxdata/crepererum/split_wasm_init
refactor: split WASM init in two phases
2 parents a169f17 + 3301cff commit c9d935b

File tree

5 files changed

+68
-45
lines changed

5 files changed

+68
-45
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ future_not_send = "deny"
4343
todo = "deny"
4444
use_self = "deny"
4545
allow_attributes = "warn"
46+
undocumented_unsafe_blocks = "deny"
4647

4748
[workspace.lints.rustdoc]
4849
bare_urls = "deny"

host/src/compilation_cache.rs

Lines changed: 0 additions & 23 deletions
This file was deleted.

host/src/lib.rs

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@ use wasmtime_wasi::{
1414
};
1515

1616
use crate::{
17-
bindings::exports::datafusion_udf_wasm::udf::types as wit_types,
18-
compilation_cache::CompilationCache, error::DataFusionResultExt,
17+
bindings::exports::datafusion_udf_wasm::udf::types as wit_types, error::DataFusionResultExt,
1918
tokio_helpers::async_in_sync_context,
2019
};
2120
use crate::{error::WasmToDataFusionResultExt, tokio_helpers::blocking_io};
2221

2322
mod bindings;
24-
mod compilation_cache;
2523
mod conversion;
2624
mod error;
2725
mod tokio_helpers;
@@ -59,6 +57,54 @@ impl WasiView for WasmStateImpl {
5957
}
6058
}
6159

60+
/// Pre-compiled WASM component.
61+
///
62+
/// The pre-compilation is stateless and can be used to [create](WasmScalarUdf::new) multiple instances that do not share
63+
/// any state.
64+
pub struct WasmComponentPrecompiled {
65+
engine: Engine,
66+
component: Component,
67+
}
68+
69+
impl WasmComponentPrecompiled {
70+
pub async fn new(wasm_binary: Arc<[u8]>) -> DataFusionResult<Self> {
71+
tokio::task::spawn_blocking(move || {
72+
let engine = Engine::new(
73+
wasmtime::Config::new()
74+
.async_support(true)
75+
.memory_init_cow(true),
76+
)
77+
.context("create WASM engine", None)?;
78+
79+
let compiled_component = engine
80+
.precompile_component(&wasm_binary)
81+
.context("pre-compile component", None)?;
82+
83+
// SAFETY: the compiled version was produced by us with the same engine. This is NOT external/untrusted input.
84+
let component_res = unsafe { Component::deserialize(&engine, compiled_component) };
85+
let component = component_res.context("create WASM component", None)?;
86+
87+
Ok(Self { engine, component })
88+
})
89+
.await
90+
.map_err(|e| DataFusionError::External(Box::new(e)))?
91+
}
92+
}
93+
94+
impl std::fmt::Debug for WasmComponentPrecompiled {
95+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96+
let Self {
97+
engine,
98+
component: _,
99+
} = self;
100+
101+
f.debug_struct("WasmComponentPrecompiled")
102+
.field("engine", engine)
103+
.field("component", &"<COMPONENT>")
104+
.finish()
105+
}
106+
}
107+
62108
pub struct WasmScalarUdf {
63109
store: Arc<Mutex<Store<WasmStateImpl>>>,
64110
bindings: Arc<bindings::Datafusion>,
@@ -68,7 +114,15 @@ pub struct WasmScalarUdf {
68114
}
69115

70116
impl WasmScalarUdf {
71-
pub async fn new(wasm_binary: &[u8], source: String) -> DataFusionResult<Vec<Self>> {
117+
/// Create multiple UDFs from a single WASM VM.
118+
///
119+
/// UDFs bound to the same VM share state, however calling this method multiple times will yield independent WASM VMs.
120+
pub async fn new(
121+
component: &WasmComponentPrecompiled,
122+
source: String,
123+
) -> DataFusionResult<Vec<Self>> {
124+
let WasmComponentPrecompiled { engine, component } = component;
125+
72126
// TODO: we need an in-mem file system for this, see
73127
// - https://github.com/bytecodealliance/wasmtime/issues/8963
74128
// - https://github.com/Timmmm/wasmtime_fs_demo
@@ -88,24 +142,13 @@ impl WasmScalarUdf {
88142
wasi_ctx,
89143
resource_table: ResourceTable::new(),
90144
};
145+
let mut store = Store::new(engine, state);
91146

92-
let engine = Engine::new(
93-
wasmtime::Config::new()
94-
.async_support(true)
95-
.enable_incremental_compilation(Arc::new(CompilationCache::default()))
96-
.context("enable incremental compilation", None)?,
97-
)
98-
.context("create WASM engine", None)?;
99-
let mut store = Store::new(&engine, state);
100-
101-
let component =
102-
Component::from_binary(&engine, wasm_binary).context("create WASM component", None)?;
103-
104-
let mut linker = Linker::new(&engine);
147+
let mut linker = Linker::new(engine);
105148
wasmtime_wasi::p2::add_to_linker_async(&mut linker).context("link WASI p2", None)?;
106149

107150
let bindings = Arc::new(
108-
bindings::Datafusion::instantiate_async(&mut store, &component, &linker)
151+
bindings::Datafusion::instantiate_async(&mut store, component, &linker)
109152
.await
110153
.context("initialize bindings", Some(&store.data().stderr.contents()))?,
111154
);

host/tests/integration_tests/python.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use arrow::datatypes::{DataType, Field};
44
use datafusion_common::{ScalarValue, assert_contains};
55
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
6-
use datafusion_udf_wasm_host::WasmScalarUdf;
6+
use datafusion_udf_wasm_host::{WasmComponentPrecompiled, WasmScalarUdf};
77

88
#[tokio::test(flavor = "multi_thread")]
99
async fn test() {
@@ -14,7 +14,8 @@ async fn test() {
1414
.await
1515
.unwrap();
1616

17-
let mut udfs = WasmScalarUdf::new(&data, "".to_owned()).await.unwrap();
17+
let component = WasmComponentPrecompiled::new(data.into()).await.unwrap();
18+
let mut udfs = WasmScalarUdf::new(&component, "".to_owned()).await.unwrap();
1819
assert_eq!(udfs.len(), 1);
1920
let udf = udfs.pop().unwrap();
2021

host/tests/integration_tests/rust.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use arrow::{
66
};
77
use datafusion_common::ScalarValue;
88
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
9-
use datafusion_udf_wasm_host::WasmScalarUdf;
9+
use datafusion_udf_wasm_host::{WasmComponentPrecompiled, WasmScalarUdf};
1010

1111
#[tokio::test(flavor = "multi_thread")]
1212
async fn test_add_one() {
@@ -17,7 +17,8 @@ async fn test_add_one() {
1717
.await
1818
.unwrap();
1919

20-
let mut udfs = WasmScalarUdf::new(&data, "".to_owned()).await.unwrap();
20+
let component = WasmComponentPrecompiled::new(data.into()).await.unwrap();
21+
let mut udfs = WasmScalarUdf::new(&component, "".to_owned()).await.unwrap();
2122
assert_eq!(udfs.len(), 1);
2223
let udf = udfs.pop().unwrap();
2324

0 commit comments

Comments
 (0)