Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/functions/src/unicode/substr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ fn substr(args: &[ArrayRef]) -> Result<ArrayRef> {
// `get_true_start_end('Hi🌏', 1, None) -> (0, 6)`
// `get_true_start_end('Hi🌏', 1, 1) -> (0, 1)`
// `get_true_start_end('Hi🌏', -10, 2) -> (0, 0)`
fn get_true_start_end(
pub fn get_true_start_end(
input: &str,
start: i64,
count: Option<u64>,
Expand Down Expand Up @@ -235,7 +235,7 @@ fn get_true_start_end(
// string, such as `substr(long_str_with_1k_chars, 1, 32)`.
// In such case the overhead of ASCII-validation may not be worth it, so
// skip the validation for short prefix for now.
fn enable_ascii_fast_path<'a, V: StringArrayType<'a>>(
pub fn enable_ascii_fast_path<'a, V: StringArrayType<'a>>(
string_array: &V,
start: &Int64Array,
count: Option<&Int64Array>,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/spark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ sha1 = "0.10"
url = { workspace = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
criterion = { workspace = true }

[[bench]]
Expand All @@ -74,3 +75,7 @@ name = "hex"
[[bench]]
harness = false
name = "slice"

[[bench]]
harness = false
name = "substring"
207 changes: 207 additions & 0 deletions datafusion/spark/benches/substring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@cht42 cht42 Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a comparison of the perf between the DF built-in and the new spark substring expressions. The perf is comparable

Benchmark Spark substring Time DF substring Time
SHORTER THAN 12/substr_string_view [size=1024, strlen=12] [9.9232 µs 10.026 µs 10.152 µs] [7.2600 µs 7.2906 µs 7.3209 µs]
SHORTER THAN 12/substr_string [size=1024, strlen=12] [7.6418 µs 7.7211 µs 7.8093 µs] [8.4374 µs 9.0174 µs 10.083 µs]
SHORTER THAN 12/substr_large_string [size=1024, strlen=12] [8.1936 µs 8.2795 µs 8.3950 µs] [8.8816 µs 8.9169 µs 8.9539 µs]
LONGER THAN 12/substr_string_view [size=1024, count=64, strlen=128] [14.973 µs 15.169 µs 15.429 µs] [12.647 µs 12.770 µs 12.904 µs]
LONGER THAN 12/substr_string [size=1024, count=64, strlen=128] [12.132 µs 12.183 µs 12.233 µs] [13.968 µs 14.022 µs 14.074 µs]
LONGER THAN 12/substr_large_string [size=1024, count=64, strlen=128] [12.259 µs 12.317 µs 12.379 µs] [13.995 µs 14.109 µs 14.280 µs]
SRC_LEN > 12, SUB_LEN < 12/substr_string_view [size=1024, count=6, strlen=128] [22.497 µs 22.631 µs 22.793 µs] [14.784 µs 14.901 µs 15.075 µs]
SRC_LEN > 12, SUB_LEN < 12/substr_string [size=1024, count=6, strlen=128] [20.755 µs 20.850 µs 20.978 µs] [15.804 µs 15.861 µs 15.914 µs]
SRC_LEN > 12, SUB_LEN < 12/substr_large_string [size=1024, count=6, strlen=128] [20.794 µs 21.045 µs 21.459 µs] [15.771 µs 15.816 µs 15.860 µs]
SHORTER THAN 12/substr_string_view [size=4096, strlen=12] [38.334 µs 38.569 µs 38.934 µs] [26.812 µs 27.032 µs 27.307 µs]
SHORTER THAN 12/substr_string [size=4096, strlen=12] [27.456 µs 27.548 µs 27.640 µs] [32.104 µs 32.449 µs 32.802 µs]
SHORTER THAN 12/substr_large_string [size=4096, strlen=12] [30.323 µs 30.777 µs 31.286 µs] [34.284 µs 34.514 µs 34.858 µs]
LONGER THAN 12/substr_string_view [size=4096, count=64, strlen=128] [55.663 µs 56.315 µs 57.106 µs] [47.091 µs 47.321 µs 47.639 µs]
LONGER THAN 12/substr_string [size=4096, count=64, strlen=128] [40.263 µs 40.768 µs 41.399 µs] [51.885 µs 52.049 µs 52.253 µs]
LONGER THAN 12/substr_large_string [size=4096, count=64, strlen=128] [43.083 µs 43.261 µs 43.452 µs] [51.839 µs 52.048 µs 52.348 µs]
SRC_LEN > 12, SUB_LEN < 12/substr_string_view [size=4096, count=6, strlen=128] [84.649 µs 85.064 µs 85.500 µs] [55.237 µs 55.607 µs 56.018 µs]
SRC_LEN > 12, SUB_LEN < 12/substr_string [size=4096, count=6, strlen=128] [76.146 µs 77.393 µs 79.422 µs] [60.728 µs 60.998 µs 61.235 µs]
SRC_LEN > 12, SUB_LEN < 12/substr_large_string [size=4096, count=6, strlen=128] [76.974 µs 77.166 µs 77.365 µs] [60.319 µs 60.630 µs 60.976 µs]

// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

extern crate criterion;

use arrow::array::{ArrayRef, Int64Array, OffsetSizeTrait};
use arrow::datatypes::{DataType, Field};
use arrow::util::bench_util::{
create_string_array_with_len, create_string_view_array_with_len,
};
use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
use datafusion_common::DataFusionError;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use datafusion_spark::function::string::substring;
use std::hint::black_box;
use std::sync::Arc;

fn create_args_without_count<O: OffsetSizeTrait>(
size: usize,
str_len: usize,
start_half_way: bool,
force_view_types: bool,
) -> Vec<ColumnarValue> {
let start_array = Arc::new(Int64Array::from(
(0..size)
.map(|_| {
if start_half_way {
(str_len / 2) as i64
} else {
1i64
}
})
.collect::<Vec<_>>(),
));

if force_view_types {
let string_array =
Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false));
vec![
ColumnarValue::Array(string_array),
ColumnarValue::Array(start_array),
]
} else {
let string_array =
Arc::new(create_string_array_with_len::<O>(size, 0.1, str_len));

vec![
ColumnarValue::Array(string_array),
ColumnarValue::Array(Arc::clone(&start_array) as ArrayRef),
]
}
}

fn create_args_with_count<O: OffsetSizeTrait>(
size: usize,
str_len: usize,
count_max: usize,
force_view_types: bool,
) -> Vec<ColumnarValue> {
let start_array =
Arc::new(Int64Array::from((0..size).map(|_| 1).collect::<Vec<_>>()));
let count = count_max.min(str_len) as i64;
let count_array = Arc::new(Int64Array::from(
(0..size).map(|_| count).collect::<Vec<_>>(),
));

if force_view_types {
let string_array =
Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false));
vec![
ColumnarValue::Array(string_array),
ColumnarValue::Array(start_array),
ColumnarValue::Array(count_array),
]
} else {
let string_array =
Arc::new(create_string_array_with_len::<O>(size, 0.1, str_len));

vec![
ColumnarValue::Array(string_array),
ColumnarValue::Array(Arc::clone(&start_array) as ArrayRef),
ColumnarValue::Array(Arc::clone(&count_array) as ArrayRef),
]
}
}

#[expect(clippy::needless_pass_by_value)]
fn invoke_substr_with_args(
args: Vec<ColumnarValue>,
number_rows: usize,
) -> Result<ColumnarValue, DataFusionError> {
let arg_fields = args
.iter()
.enumerate()
.map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into())
.collect::<Vec<_>>();
let config_options = Arc::new(ConfigOptions::default());

substring().invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields,
number_rows,
return_field: Field::new("f", DataType::Utf8View, true).into(),
config_options: Arc::clone(&config_options),
})
}

fn criterion_benchmark(c: &mut Criterion) {
for size in [1024, 4096] {
// string_len = 12, substring_len=6 (see `create_args_without_count`)
let len = 12;
let mut group = c.benchmark_group("SHORTER THAN 12");
group.sampling_mode(SamplingMode::Flat);
group.sample_size(10);

let args = create_args_without_count::<i32>(size, len, true, true);
group.bench_function(
format!("substr_string_view [size={size}, strlen={len}]"),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

let args = create_args_without_count::<i32>(size, len, false, false);
group.bench_function(format!("substr_string [size={size}, strlen={len}]"), |b| {
b.iter(|| black_box(invoke_substr_with_args(args.clone(), size)))
});

let args = create_args_without_count::<i64>(size, len, true, false);
group.bench_function(
format!("substr_large_string [size={size}, strlen={len}]"),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

group.finish();

// string_len = 128, start=1, count=64, substring_len=64
let len = 128;
let count = 64;
let mut group = c.benchmark_group("LONGER THAN 12");
group.sampling_mode(SamplingMode::Flat);
group.sample_size(10);

let args = create_args_with_count::<i32>(size, len, count, true);
group.bench_function(
format!("substr_string_view [size={size}, count={count}, strlen={len}]",),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

let args = create_args_with_count::<i32>(size, len, count, false);
group.bench_function(
format!("substr_string [size={size}, count={count}, strlen={len}]",),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

let args = create_args_with_count::<i64>(size, len, count, false);
group.bench_function(
format!("substr_large_string [size={size}, count={count}, strlen={len}]",),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

group.finish();

// string_len = 128, start=1, count=6, substring_len=6
let len = 128;
let count = 6;
let mut group = c.benchmark_group("SRC_LEN > 12, SUB_LEN < 12");
group.sampling_mode(SamplingMode::Flat);
group.sample_size(10);

let args = create_args_with_count::<i32>(size, len, count, true);
group.bench_function(
format!("substr_string_view [size={size}, count={count}, strlen={len}]",),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

let args = create_args_with_count::<i32>(size, len, count, false);
group.bench_function(
format!("substr_string [size={size}, count={count}, strlen={len}]",),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

let args = create_args_with_count::<i64>(size, len, count, false);
group.bench_function(
format!("substr_large_string [size={size}, count={count}, strlen={len}]",),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

group.finish();
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
8 changes: 8 additions & 0 deletions datafusion/spark/src/function/string/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod length;
pub mod like;
pub mod luhn_check;
pub mod space;
pub mod substring;

use datafusion_expr::ScalarUDF;
use datafusion_functions::make_udf_function;
Expand All @@ -40,6 +41,7 @@ make_udf_function!(like::SparkLike, like);
make_udf_function!(luhn_check::SparkLuhnCheck, luhn_check);
make_udf_function!(format_string::FormatStringFunc, format_string);
make_udf_function!(space::SparkSpace, space);
make_udf_function!(substring::SparkSubstring, substring);

pub mod expr_fn {
use datafusion_functions::export_functions;
Expand Down Expand Up @@ -90,6 +92,11 @@ pub mod expr_fn {
strfmt args
));
export_functions!((space, "Returns a string consisting of n spaces.", arg1));
export_functions!((
substring,
"Returns the substring from string str starting at position pos with length len.",
str pos
));
}

pub fn functions() -> Vec<Arc<ScalarUDF>> {
Expand All @@ -104,5 +111,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
luhn_check(),
format_string(),
space(),
substring(),
]
}
Loading