Skip to content

Commit 13af4bc

Browse files
committed
feat: allow lazy compilation of WASM component during query
Technically we only need to compile the WASM component once we have a query that actually uses it. Using this property is helpful for our integration into InfluxDB, where many tests don't need the Python UDF and hence can run without the rather expensive WASM compilation step.
1 parent c586f10 commit 13af4bc

File tree

2 files changed

+80
-19
lines changed

2 files changed

+80
-19
lines changed

query/src/lib.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![allow(unused_crate_dependencies)]
33

44
use std::collections::HashMap;
5+
use std::pin::Pin;
56

67
use datafusion::execution::TaskContext;
78
use datafusion_common::{DataFusionError, Result as DataFusionResult};
@@ -17,12 +18,74 @@ use crate::format::UdfCodeFormatter;
1718
/// Module for UDF code formatting implementations
1819
pub mod format;
1920

21+
/// Inner type of [`ComponentFn`].
22+
///
23+
/// This is deliberately NOT exposed directly to the user because:
24+
///
25+
/// - it's a rather complex type
26+
/// - embedding it into a struct allows us to implement some convenience methods
27+
type ComponentFnInner<'a> = Box<
28+
dyn Fn() -> Pin<Box<dyn Future<Output = &'a WasmComponentPrecompiled> + Send + 'a>> + Sync + 'a,
29+
>;
30+
31+
/// A type-erased async function that returns a [`WasmComponentPrecompiled`].
32+
pub struct ComponentFn<'a>(ComponentFnInner<'a>);
33+
34+
impl<'a> ComponentFn<'a> {
35+
/// Create function from pre-compiled component.
36+
pub fn eager(component: &'a WasmComponentPrecompiled) -> Self {
37+
Self(Box::new(move || Box::pin(async move { component })))
38+
}
39+
40+
/// Create function lazily.
41+
///
42+
/// # Example
43+
/// You can either use an async closure, in which case `'a` can be a local lifetime:
44+
///
45+
/// ```
46+
/// # use datafusion_udf_wasm_query::ComponentFn;
47+
/// let component = ComponentFn::lazy(async || {
48+
/// todo!("create Arc<WasmComponentPrecompiled>")
49+
/// });
50+
/// ```
51+
///
52+
/// or some existing async function, in which `'a` must be `'static`:
53+
///
54+
/// ```
55+
/// # use datafusion_udf_wasm_host::WasmComponentPrecompiled;
56+
/// # use datafusion_udf_wasm_query::ComponentFn;
57+
/// async fn get_component() -> &'static WasmComponentPrecompiled {
58+
/// todo!("create Arc<WasmComponentPrecompiled>")
59+
/// }
60+
///
61+
/// let component = ComponentFn::lazy(get_component);
62+
/// ```
63+
pub fn lazy<F, Fut>(f: F) -> Self
64+
where
65+
F: Fn() -> Fut + Sync + 'a,
66+
Fut: Future<Output = &'a WasmComponentPrecompiled> + Send + 'a,
67+
{
68+
Self(Box::new(move || Box::pin(f())))
69+
}
70+
71+
/// Get underlying component.
72+
pub async fn get(&self) -> &'a WasmComponentPrecompiled {
73+
(self.0)().await
74+
}
75+
}
76+
77+
impl<'a> std::fmt::Debug for ComponentFn<'a> {
78+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79+
f.debug_struct("ComponentFn").finish_non_exhaustive()
80+
}
81+
}
82+
2083
/// Represents a supported UDF language with its associated WASM component
2184
/// and code formatter.
2285
#[derive(Debug)]
2386
pub struct Lang<'a> {
2487
/// Pre-compiled WASM component for the language
25-
pub component: &'a WasmComponentPrecompiled,
88+
pub component: ComponentFn<'a>,
2689
/// Code formatter for the language
2790
pub formatter: Box<dyn UdfCodeFormatter>,
2891
}
@@ -81,7 +144,13 @@ impl<'a> UdfQueryParser<'a> {
81144
for code in blocks {
82145
let code = lang.formatter.format(code);
83146
udfs.extend(
84-
WasmScalarUdf::new(lang.component, permissions, io_rt.clone(), code).await?,
147+
WasmScalarUdf::new(
148+
lang.component.get().await,
149+
permissions,
150+
io_rt.clone(),
151+
code,
152+
)
153+
.await?,
85154
);
86155
}
87156
}

query/tests/integration.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use datafusion::{
1414
use datafusion_common::{Result as DataFusionResult, test_util::batches_to_string};
1515
use datafusion_udf_wasm_host::WasmPermissions;
1616
use datafusion_udf_wasm_query::{
17-
Lang, ParsedQuery, UdfQueryParser,
17+
ComponentFn, Lang, ParsedQuery, UdfQueryParser,
1818
format::{NoOpFormatter, StripIndentationFormatter},
1919
};
2020
use tokio::runtime::Handle;
@@ -53,13 +53,12 @@ SELECT add_one(1);
5353
"#;
5454

5555
let ctx = SessionContext::new();
56-
let component = python_component().await;
5756
let formatter = Box::new(NoOpFormatter);
5857

5958
let parser = UdfQueryParser::new(HashMap::from_iter([(
6059
"python".to_string(),
6160
Lang {
62-
component,
61+
component: ComponentFn::lazy(python_component),
6362
formatter,
6463
},
6564
)]));
@@ -109,13 +108,12 @@ SELECT add_one(1), multiply_two(3);
109108
"#;
110109

111110
let ctx = SessionContext::new();
112-
let component = python_component().await;
113111
let formatter = Box::new(NoOpFormatter);
114112

115113
let parser = UdfQueryParser::new(HashMap::from_iter([(
116114
"python".to_string(),
117115
Lang {
118-
component,
116+
component: ComponentFn::lazy(python_component),
119117
formatter,
120118
},
121119
)]));
@@ -161,13 +159,12 @@ SELECT add_one(1), multiply_two(3);
161159
"#;
162160

163161
let ctx = SessionContext::new();
164-
let component = python_component().await;
165162
let formatter = Box::new(NoOpFormatter);
166163

167164
let parser = UdfQueryParser::new(HashMap::from_iter([(
168165
"python".to_string(),
169166
Lang {
170-
component,
167+
component: ComponentFn::lazy(python_component),
171168
formatter,
172169
},
173170
)]));
@@ -207,13 +204,12 @@ SELECT add_one(1)
207204
"#;
208205

209206
let ctx = SessionContext::new();
210-
let component = python_component().await;
211207
let formatter = Box::new(NoOpFormatter);
212208

213209
let parser = UdfQueryParser::new(HashMap::from_iter([(
214210
"python".to_string(),
215211
Lang {
216-
component,
212+
component: ComponentFn::lazy(python_component),
217213
formatter,
218214
},
219215
)]));
@@ -248,13 +244,12 @@ EXPLAIN SELECT add_one(1);
248244
"#;
249245

250246
let ctx = SessionContext::new();
251-
let component = python_component().await;
252247
let formatter = Box::new(NoOpFormatter);
253248

254249
let parser = UdfQueryParser::new(HashMap::from_iter([(
255250
"python".to_string(),
256251
Lang {
257-
component,
252+
component: ComponentFn::lazy(python_component),
258253
formatter,
259254
},
260255
)]));
@@ -304,13 +299,12 @@ async fn test_strip_indentation_everything_indented() {
304299
let query = query_lines.join("\n");
305300

306301
let ctx = SessionContext::new();
307-
let component = python_component().await;
308302
let formatter = Box::new(StripIndentationFormatter);
309303

310304
let parser = UdfQueryParser::new(HashMap::from_iter([(
311305
"python".to_string(),
312306
Lang {
313-
component,
307+
component: ComponentFn::lazy(python_component),
314308
formatter,
315309
},
316310
)]));
@@ -355,13 +349,12 @@ async fn test_strip_indentation_empty_lines_not_indented() {
355349
let query = query_lines.join("\n");
356350

357351
let ctx = SessionContext::new();
358-
let component = python_component().await;
359352
let formatter = Box::new(StripIndentationFormatter);
360353

361354
let parser = UdfQueryParser::new(HashMap::from_iter([(
362355
"python".to_string(),
363356
Lang {
364-
component,
357+
component: ComponentFn::lazy(python_component),
365358
formatter,
366359
},
367360
)]));
@@ -405,13 +398,12 @@ async fn test_strip_indentation_python_further_indented() {
405398
let query = query_lines.join("\n");
406399

407400
let ctx = SessionContext::new();
408-
let component = python_component().await;
409401
let formatter = Box::new(StripIndentationFormatter);
410402

411403
let parser = UdfQueryParser::new(HashMap::from_iter([(
412404
"python".to_string(),
413405
Lang {
414-
component,
406+
component: ComponentFn::lazy(python_component),
415407
formatter,
416408
},
417409
)]));

0 commit comments

Comments
 (0)