22//!
33//!
44//! [DataFusion]: https://datafusion.apache.org/
5- use std:: { any:: Any , io :: Cursor , ops:: DerefMut , sync:: Arc } ;
5+ use std:: { any:: Any , ops:: DerefMut , sync:: Arc } ;
66
77use arrow:: datatypes:: DataType ;
88use datafusion_common:: { DataFusionError , Result as DataFusionResult } ;
99use datafusion_expr:: { ColumnarValue , ScalarFunctionArgs , ScalarUDFImpl , Signature } ;
10- use tempfile:: TempDir ;
1110use tokio:: sync:: Mutex ;
1211use wasmtime:: {
1312 Engine , Store ,
1413 component:: { Component , ResourceAny } ,
1514} ;
16- use wasmtime_wasi:: {
17- DirPerms , FilePerms , ResourceTable , WasiCtx , WasiCtxView , WasiView , p2:: pipe:: MemoryOutputPipe ,
18- } ;
15+ use wasmtime_wasi:: { ResourceTable , WasiCtx , WasiCtxView , WasiView , p2:: pipe:: MemoryOutputPipe } ;
1916use wasmtime_wasi_http:: {
2017 HttpResult , WasiHttpCtx , WasiHttpView ,
2118 bindings:: http:: types:: ErrorCode as HttpErrorCode ,
@@ -25,12 +22,12 @@ use wasmtime_wasi_http::{
2522
2623use crate :: {
2724 bindings:: exports:: datafusion_udf_wasm:: udf:: types as wit_types,
28- error:: DataFusionResultExt ,
25+ error:: { DataFusionResultExt , WasmToDataFusionResultExt } ,
2926 http:: { HttpRequestValidator , RejectAllHttpRequests } ,
3027 linker:: link,
3128 tokio_helpers:: async_in_sync_context,
29+ vfs:: { VfsCtxView , VfsState , VfsView } ,
3230} ;
33- use crate :: { error:: WasmToDataFusionResultExt , tokio_helpers:: blocking_io} ;
3431
3532// unused-crate-dependencies false positives
3633#[ cfg( test) ]
@@ -46,13 +43,14 @@ mod error;
4643pub mod http;
4744mod linker;
4845mod tokio_helpers;
46+ mod vfs;
4947
5048/// State of the WASM payload.
5149struct WasmStateImpl {
52- /// Temporary directory that holds the root filesystem .
50+ /// Virtual filesystem for the WASM payload .
5351 ///
54- /// This filesystem is provided to the payload as read-only .
55- root : TempDir ,
52+ /// This filesystem is provided to the payload in memory with read-write support .
53+ vfs_state : VfsState ,
5654
5755 /// A limited buffer for stderr.
5856 ///
@@ -75,15 +73,15 @@ struct WasmStateImpl {
7573impl std:: fmt:: Debug for WasmStateImpl {
7674 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
7775 let Self {
78- root ,
76+ vfs_state ,
7977 stderr,
8078 wasi_ctx : _,
8179 wasi_http_ctx : _,
8280 resource_table,
8381 http_validator,
8482 } = self ;
8583 f. debug_struct ( "WasmStateImpl" )
86- . field ( "root " , root )
84+ . field ( "vfs_state " , vfs_state )
8785 . field ( "stderr" , stderr)
8886 . field ( "wasi_ctx" , & "<WASI_CTX>" )
8987 . field ( "resource_table" , resource_table)
@@ -136,6 +134,15 @@ impl WasiHttpView for WasmStateImpl {
136134 }
137135}
138136
137+ impl VfsView for WasmStateImpl {
138+ fn vfs ( & mut self ) -> VfsCtxView < ' _ > {
139+ VfsCtxView {
140+ table : & mut self . resource_table ,
141+ vfs_state : & mut self . vfs_state ,
142+ }
143+ }
144+ }
145+
139146/// Pre-compiled WASM component.
140147///
141148/// The pre-compilation is stateless and can be used to [create](WasmScalarUdf::new) multiple instances that do not share
@@ -266,21 +273,13 @@ impl WasmScalarUdf {
266273 ) -> DataFusionResult < Vec < Self > > {
267274 let WasmComponentPrecompiled { engine, component } = component;
268275
269- // TODO: we need an in-mem file system for this, see
270- // - https://github.com/bytecodealliance/wasmtime/issues/8963
271- // - https://github.com/Timmmm/wasmtime_fs_demo
272- let root = blocking_io ( TempDir :: new)
273- . await
274- . map_err ( DataFusionError :: IoError ) ?;
276+ // Create in-memory VFS
277+ let vfs_state = VfsState :: new ( ) ;
275278
276279 let stderr = MemoryOutputPipe :: new ( 1024 ) ;
277- let wasi_ctx = WasiCtx :: builder ( )
278- . stderr ( stderr. clone ( ) )
279- . preopened_dir ( root. path ( ) , "/" , DirPerms :: READ , FilePerms :: READ )
280- . context ( "pre-open root dir" , None ) ?
281- . build ( ) ;
280+ let wasi_ctx = WasiCtx :: builder ( ) . stderr ( stderr. clone ( ) ) . build ( ) ;
282281 let state = WasmStateImpl {
283- root ,
282+ vfs_state ,
284283 stderr,
285284 wasi_ctx,
286285 wasi_http_ctx : WasiHttpCtx :: new ( ) ,
@@ -291,7 +290,7 @@ impl WasmScalarUdf {
291290 . await
292291 . context ( "link WASM components" , None ) ?;
293292
294- // fill root FS
293+ // Populate VFS from tar archive
295294 let root_data = bindings
296295 . datafusion_udf_wasm_udf_types ( )
297296 . call_root_fs_tar ( & mut store)
@@ -301,13 +300,11 @@ impl WasmScalarUdf {
301300 Some ( & store. data ( ) . stderr . contents ( ) ) ,
302301 ) ?;
303302 if let Some ( root_data) = root_data {
304- let root_path = store. data ( ) . root . path ( ) . to_owned ( ) ;
305- blocking_io ( move || {
306- let mut a = tar:: Archive :: new ( Cursor :: new ( root_data) ) ;
307- a. unpack ( root_path)
308- } )
309- . await
310- . map_err ( DataFusionError :: IoError ) ?;
303+ store
304+ . data_mut ( )
305+ . vfs_state
306+ . populate_from_tar ( & root_data)
307+ . map_err ( DataFusionError :: IoError ) ?;
311308 }
312309
313310 let udf_resources = bindings
0 commit comments