Skip to content

Commit 0c1dcc7

Browse files
authored
Merge pull request #168 from influxdata/crepererum/lazy-component
feat: allow lazy compilation of WASM component during query
2 parents 08decd8 + 13af4bc commit 0c1dcc7

File tree

2 files changed

+80
-19
lines changed

2 files changed

+80
-19
lines changed

query/src/lib.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![allow(unused_crate_dependencies)]
33

44
use std::collections::HashMap;
5+
use std::pin::Pin;
56

67
use datafusion::execution::TaskContext;
78
use datafusion_common::{DataFusionError, Result as DataFusionResult};
@@ -17,12 +18,74 @@ use crate::format::UdfCodeFormatter;
1718
/// Module for UDF code formatting implementations
1819
pub mod format;
1920

21+
/// Inner type of [`ComponentFn`].
22+
///
23+
/// This is deliberately NOT exposed directly to the user because:
24+
///
25+
/// - it's a rather complex type
26+
/// - embedding it into a struct allows us to implement some convenience methods
27+
type ComponentFnInner<'a> = Box<
28+
dyn Fn() -> Pin<Box<dyn Future<Output = &'a WasmComponentPrecompiled> + Send + 'a>> + Sync + 'a,
29+
>;
30+
31+
/// A type-erased async function that returns a [`WasmComponentPrecompiled`].
32+
pub struct ComponentFn<'a>(ComponentFnInner<'a>);
33+
34+
impl<'a> ComponentFn<'a> {
35+
/// Create function from pre-compiled component.
36+
pub fn eager(component: &'a WasmComponentPrecompiled) -> Self {
37+
Self(Box::new(move || Box::pin(async move { component })))
38+
}
39+
40+
/// Create function lazily.
41+
///
42+
/// # Example
43+
/// You can either use an async closure, in which case `'a` can be a local lifetime:
44+
///
45+
/// ```
46+
/// # use datafusion_udf_wasm_query::ComponentFn;
47+
/// let component = ComponentFn::lazy(async || {
48+
/// todo!("create Arc<WasmComponentPrecompiled>")
49+
/// });
50+
/// ```
51+
///
52+
/// or some existing async function, in which `'a` must be `'static`:
53+
///
54+
/// ```
55+
/// # use datafusion_udf_wasm_host::WasmComponentPrecompiled;
56+
/// # use datafusion_udf_wasm_query::ComponentFn;
57+
/// async fn get_component() -> &'static WasmComponentPrecompiled {
58+
/// todo!("create Arc<WasmComponentPrecompiled>")
59+
/// }
60+
///
61+
/// let component = ComponentFn::lazy(get_component);
62+
/// ```
63+
pub fn lazy<F, Fut>(f: F) -> Self
64+
where
65+
F: Fn() -> Fut + Sync + 'a,
66+
Fut: Future<Output = &'a WasmComponentPrecompiled> + Send + 'a,
67+
{
68+
Self(Box::new(move || Box::pin(f())))
69+
}
70+
71+
/// Get underlying component.
72+
pub async fn get(&self) -> &'a WasmComponentPrecompiled {
73+
(self.0)().await
74+
}
75+
}
76+
77+
impl<'a> std::fmt::Debug for ComponentFn<'a> {
78+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79+
f.debug_struct("ComponentFn").finish_non_exhaustive()
80+
}
81+
}
82+
2083
/// Represents a supported UDF language with its associated WASM component
2184
/// and code formatter.
2285
#[derive(Debug)]
2386
pub struct Lang<'a> {
2487
/// Pre-compiled WASM component for the language
25-
pub component: &'a WasmComponentPrecompiled,
88+
pub component: ComponentFn<'a>,
2689
/// Code formatter for the language
2790
pub formatter: Box<dyn UdfCodeFormatter>,
2891
}
@@ -81,7 +144,13 @@ impl<'a> UdfQueryParser<'a> {
81144
for code in blocks {
82145
let code = lang.formatter.format(code);
83146
udfs.extend(
84-
WasmScalarUdf::new(lang.component, permissions, io_rt.clone(), code).await?,
147+
WasmScalarUdf::new(
148+
lang.component.get().await,
149+
permissions,
150+
io_rt.clone(),
151+
code,
152+
)
153+
.await?,
85154
);
86155
}
87156
}

query/tests/integration.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use datafusion::{
1414
use datafusion_common::{Result as DataFusionResult, test_util::batches_to_string};
1515
use datafusion_udf_wasm_host::WasmPermissions;
1616
use datafusion_udf_wasm_query::{
17-
Lang, ParsedQuery, UdfQueryParser,
17+
ComponentFn, Lang, ParsedQuery, UdfQueryParser,
1818
format::{NoOpFormatter, StripIndentationFormatter},
1919
};
2020
use tokio::runtime::Handle;
@@ -53,13 +53,12 @@ SELECT add_one(1);
5353
"#;
5454

5555
let ctx = SessionContext::new();
56-
let component = python_component().await;
5756
let formatter = Box::new(NoOpFormatter);
5857

5958
let parser = UdfQueryParser::new(HashMap::from_iter([(
6059
"python".to_string(),
6160
Lang {
62-
component,
61+
component: ComponentFn::lazy(python_component),
6362
formatter,
6463
},
6564
)]));
@@ -109,13 +108,12 @@ SELECT add_one(1), multiply_two(3);
109108
"#;
110109

111110
let ctx = SessionContext::new();
112-
let component = python_component().await;
113111
let formatter = Box::new(NoOpFormatter);
114112

115113
let parser = UdfQueryParser::new(HashMap::from_iter([(
116114
"python".to_string(),
117115
Lang {
118-
component,
116+
component: ComponentFn::lazy(python_component),
119117
formatter,
120118
},
121119
)]));
@@ -161,13 +159,12 @@ SELECT add_one(1), multiply_two(3);
161159
"#;
162160

163161
let ctx = SessionContext::new();
164-
let component = python_component().await;
165162
let formatter = Box::new(NoOpFormatter);
166163

167164
let parser = UdfQueryParser::new(HashMap::from_iter([(
168165
"python".to_string(),
169166
Lang {
170-
component,
167+
component: ComponentFn::lazy(python_component),
171168
formatter,
172169
},
173170
)]));
@@ -207,13 +204,12 @@ SELECT add_one(1)
207204
"#;
208205

209206
let ctx = SessionContext::new();
210-
let component = python_component().await;
211207
let formatter = Box::new(NoOpFormatter);
212208

213209
let parser = UdfQueryParser::new(HashMap::from_iter([(
214210
"python".to_string(),
215211
Lang {
216-
component,
212+
component: ComponentFn::lazy(python_component),
217213
formatter,
218214
},
219215
)]));
@@ -248,13 +244,12 @@ EXPLAIN SELECT add_one(1);
248244
"#;
249245

250246
let ctx = SessionContext::new();
251-
let component = python_component().await;
252247
let formatter = Box::new(NoOpFormatter);
253248

254249
let parser = UdfQueryParser::new(HashMap::from_iter([(
255250
"python".to_string(),
256251
Lang {
257-
component,
252+
component: ComponentFn::lazy(python_component),
258253
formatter,
259254
},
260255
)]));
@@ -304,13 +299,12 @@ async fn test_strip_indentation_everything_indented() {
304299
let query = query_lines.join("\n");
305300

306301
let ctx = SessionContext::new();
307-
let component = python_component().await;
308302
let formatter = Box::new(StripIndentationFormatter);
309303

310304
let parser = UdfQueryParser::new(HashMap::from_iter([(
311305
"python".to_string(),
312306
Lang {
313-
component,
307+
component: ComponentFn::lazy(python_component),
314308
formatter,
315309
},
316310
)]));
@@ -355,13 +349,12 @@ async fn test_strip_indentation_empty_lines_not_indented() {
355349
let query = query_lines.join("\n");
356350

357351
let ctx = SessionContext::new();
358-
let component = python_component().await;
359352
let formatter = Box::new(StripIndentationFormatter);
360353

361354
let parser = UdfQueryParser::new(HashMap::from_iter([(
362355
"python".to_string(),
363356
Lang {
364-
component,
357+
component: ComponentFn::lazy(python_component),
365358
formatter,
366359
},
367360
)]));
@@ -405,13 +398,12 @@ async fn test_strip_indentation_python_further_indented() {
405398
let query = query_lines.join("\n");
406399

407400
let ctx = SessionContext::new();
408-
let component = python_component().await;
409401
let formatter = Box::new(StripIndentationFormatter);
410402

411403
let parser = UdfQueryParser::new(HashMap::from_iter([(
412404
"python".to_string(),
413405
Lang {
414-
component,
406+
component: ComponentFn::lazy(python_component),
415407
formatter,
416408
},
417409
)]));

0 commit comments

Comments
 (0)