Skip to content

Commit a4ac4d9

Browse files
authored
Merge pull request #358 from influxdata/crepererum/sub-str-overhead-bench
feat: extend `udf_overhead` benchmark w/ `sub-str` example
2 parents 28bfb94 + 4074d79 commit a4ac4d9

File tree

2 files changed

+272
-93
lines changed

2 files changed

+272
-93
lines changed

host/benches/udf_overhead.rs

Lines changed: 200 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,14 @@
7979
use std::{hint::black_box, io::Write, sync::Arc};
8080

8181
use arrow::{
82-
array::Int64Array,
82+
array::{ArrayRef, GenericStringBuilder, Int64Array, StringArray},
8383
datatypes::{DataType, Field},
8484
};
85-
use datafusion_common::{Result as DataFusionResult, cast::as_int64_array, config::ConfigOptions};
85+
use datafusion_common::{
86+
Result as DataFusionResult,
87+
cast::{as_int64_array, as_string_array},
88+
config::ConfigOptions,
89+
};
8690
use datafusion_execution::memory_pool::UnboundedMemoryPool;
8791
use datafusion_expr::{
8892
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
@@ -162,6 +166,75 @@ impl AsyncScalarUDFImpl for AddOne {
162166
}
163167
}
164168

169+
/// UDF that implements "sub str".
170+
#[derive(Debug, PartialEq, Eq, Hash)]
171+
pub(crate) struct SubStr {
172+
/// Signature of the UDF.
173+
///
174+
/// We store this here because [`ScalarUDFImpl::signature`] requires us to return a reference.
175+
signature: Signature,
176+
}
177+
178+
impl Default for SubStr {
179+
fn default() -> Self {
180+
Self {
181+
signature: Signature::uniform(1, vec![DataType::Utf8], Volatility::Immutable),
182+
}
183+
}
184+
}
185+
186+
impl ScalarUDFImpl for SubStr {
187+
fn as_any(&self) -> &dyn std::any::Any {
188+
self
189+
}
190+
191+
fn name(&self) -> &str {
192+
"sub_str"
193+
}
194+
195+
fn signature(&self) -> &Signature {
196+
&self.signature
197+
}
198+
199+
fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult<DataType> {
200+
Ok(DataType::Utf8)
201+
}
202+
203+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
204+
unimplemented!()
205+
}
206+
}
207+
208+
#[async_trait]
209+
impl AsyncScalarUDFImpl for SubStr {
210+
async fn invoke_async_with_args(
211+
&self,
212+
args: ScalarFunctionArgs,
213+
) -> DataFusionResult<ColumnarValue> {
214+
let ScalarFunctionArgs {
215+
args,
216+
arg_fields: _,
217+
number_rows: _,
218+
return_field: _,
219+
config_options: _,
220+
} = args;
221+
222+
let ColumnarValue::Array(array) = &args[0] else {
223+
unreachable!()
224+
};
225+
let array = as_string_array(array)?;
226+
227+
// perform calculation
228+
let array = array
229+
.iter()
230+
.map(|s| s.and_then(|s| s.split(".").nth(1).map(|s| s.to_owned())))
231+
.collect::<StringArray>();
232+
233+
// create output
234+
Ok(ColumnarValue::Array(Arc::new(array)))
235+
}
236+
}
237+
165238
/// Compile the WASM component outside of Valgrind, because otherwise the setup step takes like 3+min per benchmark.
166239
fn build_wasm_module(binary: &[u8]) -> WasmComponentPrecompiled {
167240
let mut child = std::process::Command::new(env!("CARGO_BIN_EXE_compile"))
@@ -198,6 +271,13 @@ fn build_wasm_module(binary: &[u8]) -> WasmComponentPrecompiled {
198271
res.unwrap()
199272
}
200273

274+
#[derive(Debug, Clone, Copy)]
275+
enum Payload {
276+
AddOne,
277+
SubStr,
278+
}
279+
280+
#[derive(Debug, Clone, Copy)]
201281
enum Mode {
202282
Native,
203283
Wasm,
@@ -217,21 +297,28 @@ struct SetupLeftovers {
217297
}
218298

219299
impl Setup {
220-
fn new(mode: Mode, batch_size: usize, num_batches: usize) -> Self {
300+
fn new(payload: Payload, mode: Mode, batch_size: usize, num_batches: usize) -> Self {
221301
let mut config_options = ConfigOptions::default();
222302
config_options.execution.batch_size = batch_size;
223303
let config_options = Arc::new(config_options);
224304

225305
let rt = tokio::runtime::Builder::new_current_thread()
306+
.enable_time()
226307
.build()
227308
.unwrap();
228309

229310
let udf = match mode {
230-
Mode::Native => Arc::new(AddOne::default()) as Arc<dyn AsyncScalarUDFImpl>,
311+
Mode::Native => match payload {
312+
Payload::AddOne => Arc::new(AddOne::default()) as Arc<dyn AsyncScalarUDFImpl>,
313+
Payload::SubStr => Arc::new(SubStr::default()) as Arc<dyn AsyncScalarUDFImpl>,
314+
},
231315
Mode::Wasm => {
232316
let udf = rt.block_on(async {
233-
let component =
234-
build_wasm_module(datafusion_udf_wasm_bundle::BIN_EXAMPLE_ADD_ONE);
317+
let binary = match payload {
318+
Payload::AddOne => datafusion_udf_wasm_bundle::BIN_EXAMPLE_ADD_ONE,
319+
Payload::SubStr => datafusion_udf_wasm_bundle::BIN_EXAMPLE_SUB_STR,
320+
};
321+
let component = build_wasm_module(binary);
235322

236323
WasmScalarUdf::new(
237324
&component,
@@ -252,16 +339,30 @@ impl Setup {
252339
let udf = rt.block_on(async {
253340
let component = build_wasm_module(datafusion_udf_wasm_bundle::BIN_PYTHON);
254341

342+
let code = match payload {
343+
Payload::AddOne => {
344+
"
345+
def add_one(a: int) -> int:
346+
return a + 1
347+
"
348+
}
349+
Payload::SubStr => {
350+
"
351+
def add_one(a: str) -> str | None:
352+
try:
353+
return a.split('.')[1]
354+
except IndexError:
355+
return None
356+
"
357+
}
358+
};
359+
255360
WasmScalarUdf::new(
256361
&component,
257362
&Default::default(),
258363
Handle::current(),
259364
&(Arc::new(UnboundedMemoryPool::default()) as _),
260-
"
261-
def add_one(a: int) -> int:
262-
return a + 1
263-
"
264-
.to_owned(),
365+
code.to_owned(),
265366
)
266367
.await
267368
.unwrap()
@@ -273,14 +374,60 @@ def add_one(a: int) -> int:
273374
}
274375
};
275376

276-
let mut input_gen = (0..).map(|x| (x % 2 == 0).then_some(x as i64));
277-
let arg_field = Arc::new(Field::new("a", DataType::Int64, true));
278-
let return_field = Arc::new(Field::new("r", DataType::Int64, true));
377+
let mut array_gen: Box<dyn FnMut() -> ArrayRef> = match payload {
378+
Payload::AddOne => {
379+
let mut input_gen = (0..).map(|x| (x % 2 == 0).then_some(x as i64));
380+
381+
Box::new(move || Arc::new(Int64Array::from_iter((&mut input_gen).take(batch_size))))
382+
}
383+
Payload::SubStr => {
384+
let mut char_gen = ('a'..='z').cycle();
385+
// The cost model says that the cost per row should be roughly constant. Hence, we shall use a constant string length.
386+
const SUB_STRING_LEN: usize = 97;
387+
let mut string_gen =
388+
move || (&mut char_gen).take(SUB_STRING_LEN).collect::<String>();
389+
390+
let mut input_gen = (0..).map(move |x| match x % 4 {
391+
0 => None,
392+
1 => Some(string_gen()),
393+
2 => Some(format!("{}.{}", string_gen(), string_gen())),
394+
3 => Some(format!(
395+
"{}.{}.{}",
396+
string_gen(),
397+
string_gen(),
398+
string_gen()
399+
)),
400+
_ => unreachable!(),
401+
});
402+
403+
Box::new(move || {
404+
// Collect strings first so we can allocate the string array with exact capacities. This is
405+
// important so that our cost model makes sense.
406+
let data = (&mut input_gen).take(batch_size).collect::<Vec<_>>();
407+
let mut builder = GenericStringBuilder::<i32>::with_capacity(
408+
data.len(),
409+
data.iter()
410+
.map(|maybe_str| {
411+
maybe_str.as_ref().map(|s| s.len()).unwrap_or_default()
412+
})
413+
.sum(),
414+
);
415+
builder.extend(data);
416+
Arc::new(builder.finish())
417+
})
418+
}
419+
};
420+
421+
let dt = match payload {
422+
Payload::AddOne => DataType::Int64,
423+
Payload::SubStr => DataType::Utf8,
424+
};
425+
let arg_field = Arc::new(Field::new("a", dt.clone(), true));
426+
let return_field = Arc::new(Field::new("r", dt, true));
427+
279428
let batch_args = (0..num_batches)
280429
.map(|_| ScalarFunctionArgs {
281-
args: vec![ColumnarValue::Array(Arc::new(Int64Array::from_iter(
282-
(&mut input_gen).take(batch_size),
283-
)))],
430+
args: vec![ColumnarValue::Array(array_gen())],
284431
arg_fields: vec![Arc::clone(&arg_field)],
285432
number_rows: batch_size,
286433
return_field: Arc::clone(&return_field),
@@ -318,38 +465,46 @@ mod actual_benchmark {
318465

319466
/// Instantiate benchmarks for given mode.
320467
macro_rules! impl_benchmark {
321-
($mode:ident, $bench_name:ident) => {
468+
($payload:ident, $mode:ident, $bench_name:ident) => {
322469
#[library_benchmark(setup = Setup::new, teardown=drop)]
323-
#[bench::batchsize_0_batches_0(Mode::$mode, 0, 0)]
324-
#[bench::batchsize_0_batches_1(Mode::$mode, 0, 1)]
325-
#[bench::batchsize_0_batches_2(Mode::$mode, 0, 2)]
326-
#[bench::batchsize_0_batches_3(Mode::$mode, 0, 3)]
327-
#[bench::batchsize_8192_batches_0(Mode::$mode, 8192, 0)]
328-
#[bench::batchsize_8192_batches_1(Mode::$mode, 8192, 1)]
329-
#[bench::batchsize_8192_batches_2(Mode::$mode, 8192, 2)]
330-
#[bench::batchsize_8192_batches_3(Mode::$mode, 8192, 3)]
331-
#[bench::batchsize_16384_batches_0(Mode::$mode, 16384, 0)]
332-
#[bench::batchsize_16384_batches_1(Mode::$mode, 16384, 1)]
333-
#[bench::batchsize_16384_batches_2(Mode::$mode, 16384, 2)]
334-
#[bench::batchsize_16384_batches_3(Mode::$mode, 16384, 3)]
335-
#[bench::batchsize_24576_batches_0(Mode::$mode, 24576, 0)]
336-
#[bench::batchsize_24576_batches_1(Mode::$mode, 24576, 1)]
337-
#[bench::batchsize_24576_batches_2(Mode::$mode, 24576, 2)]
338-
#[bench::batchsize_24576_batches_3(Mode::$mode, 24576, 3)]
470+
#[bench::batchsize_0_batches_0(Payload::$payload, Mode::$mode, 0, 0)]
471+
#[bench::batchsize_0_batches_1(Payload::$payload, Mode::$mode, 0, 1)]
472+
#[bench::batchsize_0_batches_2(Payload::$payload, Mode::$mode, 0, 2)]
473+
#[bench::batchsize_0_batches_3(Payload::$payload, Mode::$mode, 0, 3)]
474+
#[bench::batchsize_8192_batches_0(Payload::$payload, Mode::$mode, 8192, 0)]
475+
#[bench::batchsize_8192_batches_1(Payload::$payload, Mode::$mode, 8192, 1)]
476+
#[bench::batchsize_8192_batches_2(Payload::$payload, Mode::$mode, 8192, 2)]
477+
#[bench::batchsize_8192_batches_3(Payload::$payload, Mode::$mode, 8192, 3)]
478+
#[bench::batchsize_16384_batches_0(Payload::$payload, Mode::$mode, 16384, 0)]
479+
#[bench::batchsize_16384_batches_1(Payload::$payload, Mode::$mode, 16384, 1)]
480+
#[bench::batchsize_16384_batches_2(Payload::$payload, Mode::$mode, 16384, 2)]
481+
#[bench::batchsize_16384_batches_3(Payload::$payload, Mode::$mode, 16384, 3)]
482+
#[bench::batchsize_24576_batches_0(Payload::$payload, Mode::$mode, 24576, 0)]
483+
#[bench::batchsize_24576_batches_1(Payload::$payload, Mode::$mode, 24576, 1)]
484+
#[bench::batchsize_24576_batches_2(Payload::$payload, Mode::$mode, 24576, 2)]
485+
#[bench::batchsize_24576_batches_3(Payload::$payload, Mode::$mode, 24576, 3)]
339486
fn $bench_name(setup: Setup) -> SetupLeftovers {
340487
setup.run()
341488
}
342489
};
343490
}
344491

345-
impl_benchmark!(Native, bench_native);
346-
impl_benchmark!(Wasm, bench_wasm);
347-
impl_benchmark!(Python, bench_python);
492+
impl_benchmark!(AddOne, Native, bench_addone_native);
493+
impl_benchmark!(AddOne, Wasm, bench_addone_wasm);
494+
impl_benchmark!(AddOne, Python, bench_addone_python);
495+
impl_benchmark!(SubStr, Native, bench_substr_native);
496+
impl_benchmark!(SubStr, Wasm, bench_substr_wasm);
497+
impl_benchmark!(SubStr, Python, bench_substr_python);
348498

349499
library_benchmark_group!(
350500
name = add_one;
351501
compare_by_id = true;
352-
benchmarks = bench_native, bench_wasm, bench_python
502+
benchmarks = bench_addone_native, bench_addone_wasm, bench_addone_python
503+
);
504+
library_benchmark_group!(
505+
name = sub_str;
506+
compare_by_id = true;
507+
benchmarks = bench_substr_native, bench_substr_wasm, bench_substr_python
353508
);
354509

355510
main!(
@@ -360,7 +515,7 @@ mod actual_benchmark {
360515
"--trace-children=no",
361516
]);
362517
;
363-
library_benchmark_groups = add_one
518+
library_benchmark_groups = add_one, sub_str
364519
);
365520

366521
// re-export `main`
@@ -375,14 +530,12 @@ fn main() {
375530
let batch_size = 2;
376531
let num_batches = 1;
377532

378-
println!("native");
379-
Setup::new(Mode::Native, batch_size, num_batches).run();
380-
381-
println!("wasm");
382-
Setup::new(Mode::Wasm, batch_size, num_batches).run();
383-
384-
println!("python");
385-
Setup::new(Mode::Python, batch_size, num_batches).run();
533+
for payload in [Payload::AddOne, Payload::SubStr] {
534+
for mode in [Mode::Native, Mode::Wasm, Mode::Python] {
535+
println!("payload={payload:?} mode={mode:?}");
536+
Setup::new(payload, mode, batch_size, num_batches).run();
537+
}
538+
}
386539
} else {
387540
actual_benchmark::pub_main();
388541
}

0 commit comments

Comments
 (0)