diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index c973a5b37b..ea89c43204 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -76,6 +76,10 @@ harness = false name = "bloom_filter_agg" harness = false +[[bench]] +name = "padding" +harness = false + [[test]] name = "test_udf_registration" path = "tests/spark_expr_reg.rs" diff --git a/native/spark-expr/benches/padding.rs b/native/spark-expr/benches/padding.rs new file mode 100644 index 0000000000..cd9e28f2d7 --- /dev/null +++ b/native/spark-expr/benches/padding.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::builder::StringBuilder; +use arrow::array::ArrayRef; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::common::ScalarValue; +use datafusion::physical_plan::ColumnarValue; +use datafusion_comet_spark_expr::{spark_lpad, spark_rpad}; +use std::hint::black_box; +use std::sync::Arc; + +fn create_string_array(size: usize) -> ArrayRef { + let mut builder = StringBuilder::new(); + for i in 0..size { + if i % 10 == 0 { + builder.append_null(); + } else { + builder.append_value(format!("string{}", i % 100)); + } + } + Arc::new(builder.finish()) +} + +fn criterion_benchmark(c: &mut Criterion) { + let size = 8192; + let string_array = create_string_array(size); + + // lpad with default padding (space) + c.bench_function("spark_lpad: default padding", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(20))), + ]; + b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap())) + }); + + // lpad with custom padding character + c.bench_function("spark_lpad: custom padding", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(20))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("*".to_string()))), + ]; + b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap())) + }); + + // rpad with default padding (space) + c.bench_function("spark_rpad: default padding", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(20))), + ]; + b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap())) + }); + + // rpad with custom padding character + c.bench_function("spark_rpad: custom padding", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(20))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("*".to_string()))), + ]; + b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap())) + }); + + // lpad with multi-character padding string + c.bench_function("spark_lpad: multi-char padding", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(20))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("abc".to_string()))), + ]; + b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap())) + }); + + // rpad with multi-character padding string + c.bench_function("spark_rpad: multi-char padding", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(20))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("abc".to_string()))), + ]; + b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap())) + }); + + // lpad with truncation (target length shorter than some strings) + c.bench_function("spark_lpad: with truncation", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(5))), + ]; + b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap())) + }); + + // rpad with truncation (target length shorter than some strings) + c.bench_function("spark_rpad: with truncation", |b| { + let args = vec![ + ColumnarValue::Array(Arc::clone(&string_array)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(5))), + ]; + b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap())) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs index 89485ddec4..000b4810e7 100644 --- a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs +++ b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs @@ -194,6 +194,10 @@ fn spark_read_side_padding_internal( is_left_pad: bool, ) -> Result { let string_array = as_generic_string_array::(array)?; + + // Pre-compute pad characters once to avoid repeated iteration + let pad_chars: Vec = pad_string.chars().collect(); + match pad_type { ColumnarValue::Array(array_int) => { let int_pad_array = array_int.as_primitive::(); @@ -203,18 +207,24 @@ fn spark_read_side_padding_internal( string_array.len() * int_pad_array.len(), ); + // Reusable buffer to avoid per-element allocations + let mut buffer = String::with_capacity(pad_chars.len()); + for (string, length) in string_array.iter().zip(int_pad_array) { let length = length.unwrap(); match string { Some(string) => { if length >= 0 { - builder.append_value(add_padding_string( - string.parse().unwrap(), + buffer.clear(); + write_padded_string( + &mut buffer, + string, length as usize, truncate, - pad_string, + &pad_chars, is_left_pad, - )?) + ); + builder.append_value(&buffer); } else { builder.append_value(""); } @@ -232,15 +242,23 @@ fn spark_read_side_padding_internal( string_array.len() * length, ); + // Reusable buffer to avoid per-element allocations + let mut buffer = String::with_capacity(length); + for string in string_array.iter() { match string { - Some(string) => builder.append_value(add_padding_string( - string.parse().unwrap(), - length, - truncate, - pad_string, - is_left_pad, - )?), + Some(string) => { + buffer.clear(); + write_padded_string( + &mut buffer, + string, + length, + truncate, + &pad_chars, + is_left_pad, + ); + builder.append_value(&buffer); + } _ => builder.append_null(), } } @@ -249,44 +267,74 @@ fn spark_read_side_padding_internal( } } -fn add_padding_string( - string: String, +/// Writes a padded string to the provided buffer, avoiding allocations. +/// +/// The buffer is assumed to be cleared before calling this function. +/// Padding characters are written directly to the buffer without intermediate allocations. +#[inline] +fn write_padded_string( + buffer: &mut String, + string: &str, length: usize, truncate: bool, - pad_string: &str, + pad_chars: &[char], is_left_pad: bool, -) -> Result { - // It looks Spark's UTF8String is closer to chars rather than graphemes +) { + // Spark's UTF8String uses char count, not grapheme count // https://stackoverflow.com/a/46290728 let char_len = string.chars().count(); + if length <= char_len { if truncate { + // Find byte index for the truncation point let idx = string .char_indices() .nth(length) .map(|(i, _)| i) .unwrap_or(string.len()); - match string[..idx].parse() { - Ok(string) => Ok(string), - Err(err) => Err(DataFusionError::Internal(format!( - "Failed adding padding string {} error {:}", - string, err - ))), - } + buffer.push_str(&string[..idx]); } else { - Ok(string) + buffer.push_str(string); } } else { let pad_needed = length - char_len; - let pad: String = pad_string.chars().cycle().take(pad_needed).collect(); - let mut result = String::with_capacity(string.len() + pad.len()); + if is_left_pad { - result.push_str(&pad); - result.push_str(&string); + // Write padding first, then string + write_padding_chars(buffer, pad_chars, pad_needed); + buffer.push_str(string); } else { - result.push_str(&string); - result.push_str(&pad); + // Write string first, then padding + buffer.push_str(string); + write_padding_chars(buffer, pad_chars, pad_needed); + } + } +} + +/// Writes `count` characters from the cycling pad pattern directly to the buffer. +#[inline] +fn write_padding_chars(buffer: &mut String, pad_chars: &[char], count: usize) { + if pad_chars.is_empty() { + return; + } + + // Optimize for the common single-character padding case + if pad_chars.len() == 1 { + let ch = pad_chars[0]; + for _ in 0..count { + buffer.push(ch); + } + } else { + // Multi-character padding: cycle through pad_chars + let mut remaining = count; + while remaining > 0 { + for &ch in pad_chars { + if remaining == 0 { + break; + } + buffer.push(ch); + remaining -= 1; + } } - Ok(result) } }