diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index e061f852637ca..03310a7bde193 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -693,10 +693,14 @@ pub mod datafusion_strsim { } /// Calculates the minimum number of insertions, deletions, and substitutions - /// required to change one sequence into the other. - fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>( + /// required to change one sequence into the other, using a reusable cache buffer. + /// + /// This is the generic implementation that works with any iterator types. + /// The `cache` buffer will be resized as needed and reused across calls. + fn generic_levenshtein_with_buffer<'a, 'b, Iter1, Iter2, Elem1, Elem2>( a: &'a Iter1, b: &'b Iter2, + cache: &mut Vec, ) -> usize where &'a Iter1: IntoIterator, @@ -709,7 +713,9 @@ pub mod datafusion_strsim { return b_len; } - let mut cache: Vec = (1..b_len + 1).collect(); + // Resize cache to fit b_len elements + cache.clear(); + cache.extend(1..=b_len); let mut result = 0; @@ -729,6 +735,21 @@ pub mod datafusion_strsim { result } + /// Calculates the minimum number of insertions, deletions, and substitutions + /// required to change one sequence into the other. + fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>( + a: &'a Iter1, + b: &'b Iter2, + ) -> usize + where + &'a Iter1: IntoIterator, + &'b Iter2: IntoIterator, + Elem1: PartialEq, + { + let mut cache = Vec::new(); + generic_levenshtein_with_buffer(a, b, &mut cache) + } + /// Calculates the minimum number of insertions, deletions, and substitutions /// required to change one string into the other. /// @@ -741,6 +762,15 @@ pub mod datafusion_strsim { generic_levenshtein(&StringWrapper(a), &StringWrapper(b)) } + /// Calculates the Levenshtein distance using a reusable cache buffer. + /// This avoids allocating a new Vec for each call, improving performance + /// when computing many distances. + /// + /// The `cache` buffer will be resized as needed and reused across calls. + pub fn levenshtein_with_buffer(a: &str, b: &str, cache: &mut Vec) -> usize { + generic_levenshtein_with_buffer(&StringWrapper(a), &StringWrapper(b), cache) + } + /// Calculates the normalized Levenshtein distance between two strings. /// The normalized distance is a value between 0.0 and 1.0, where 1.0 indicates /// that the strings are identical and 0.0 indicates no similarity. diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 765f5d865a60e..6391ad57d83a7 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -264,3 +264,8 @@ required-features = ["string_expressions"] harness = false name = "ends_with" required-features = ["string_expressions"] + +[[bench]] +harness = false +name = "levenshtein" +required-features = ["unicode_expressions"] diff --git a/datafusion/functions/benches/levenshtein.rs b/datafusion/functions/benches/levenshtein.rs new file mode 100644 index 0000000000000..19f81b6cafcb3 --- /dev/null +++ b/datafusion/functions/benches/levenshtein.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::OffsetSizeTrait; +use arrow::datatypes::{DataType, Field}; +use arrow::util::bench_util::create_string_array_with_len; +use criterion::{Criterion, SamplingMode, criterion_group, criterion_main}; +use datafusion_common::DataFusionError; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_functions::string; +use std::hint::black_box; +use std::sync::Arc; +use std::time::Duration; + +fn create_args(size: usize, str_len: usize) -> Vec { + let string1_array = Arc::new(create_string_array_with_len::(size, 0.1, str_len)); + let string2_array = Arc::new(create_string_array_with_len::(size, 0.1, str_len)); + + vec![ + ColumnarValue::Array(string1_array), + ColumnarValue::Array(string2_array), + ] +} + +fn invoke_levenshtein_with_args( + args: Vec, + number_rows: usize, +) -> Result { + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect::>(); + let config_options = Arc::new(ConfigOptions::default()); + + string::levenshtein().invoke_with_args(ScalarFunctionArgs { + args, + arg_fields, + number_rows, + return_field: Field::new("f", DataType::Int32, true).into(), + config_options: Arc::clone(&config_options), + }) +} + +fn criterion_benchmark(c: &mut Criterion) { + for size in [1024, 4096] { + let mut group = c.benchmark_group(format!("levenshtein size={size}")); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); + + for str_len in [8, 32] { + let args = create_args::(size, str_len); + group.bench_function( + format!("levenshtein_string [size={size}, str_len={str_len}]"), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(invoke_levenshtein_with_args(args_cloned, size)) + }) + }, + ); + } + + group.finish(); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/string/levenshtein.rs b/datafusion/functions/src/string/levenshtein.rs index f46c674cd7f0c..4fedd0d3ebad0 100644 --- a/datafusion/functions/src/string/levenshtein.rs +++ b/datafusion/functions/src/string/levenshtein.rs @@ -151,12 +151,18 @@ fn levenshtein(args: &[ArrayRef]) -> Result { DataType::Utf8View => { let str1_array = as_string_view_array(&str1)?; let str2_array = as_string_view_array(&str2)?; + + // Reusable buffer to avoid allocating for each row + let mut cache = Vec::new(); + let result = str1_array .iter() .zip(str2_array.iter()) .map(|(string1, string2)| match (string1, string2) { (Some(string1), Some(string2)) => { - Some(datafusion_strsim::levenshtein(string1, string2) as i32) + Some(datafusion_strsim::levenshtein_with_buffer( + string1, string2, &mut cache, + ) as i32) } _ => None, }) @@ -166,12 +172,18 @@ fn levenshtein(args: &[ArrayRef]) -> Result { DataType::Utf8 => { let str1_array = as_generic_string_array::(&str1)?; let str2_array = as_generic_string_array::(&str2)?; + + // Reusable buffer to avoid allocating for each row + let mut cache = Vec::new(); + let result = str1_array .iter() .zip(str2_array.iter()) .map(|(string1, string2)| match (string1, string2) { (Some(string1), Some(string2)) => { - Some(datafusion_strsim::levenshtein(string1, string2) as i32) + Some(datafusion_strsim::levenshtein_with_buffer( + string1, string2, &mut cache, + ) as i32) } _ => None, }) @@ -181,12 +193,18 @@ fn levenshtein(args: &[ArrayRef]) -> Result { DataType::LargeUtf8 => { let str1_array = as_generic_string_array::(&str1)?; let str2_array = as_generic_string_array::(&str2)?; + + // Reusable buffer to avoid allocating for each row + let mut cache = Vec::new(); + let result = str1_array .iter() .zip(str2_array.iter()) .map(|(string1, string2)| match (string1, string2) { (Some(string1), Some(string2)) => { - Some(datafusion_strsim::levenshtein(string1, string2) as i64) + Some(datafusion_strsim::levenshtein_with_buffer( + string1, string2, &mut cache, + ) as i64) } _ => None, })