Skip to content

Commit b5e4290

Browse files
authored
perf: Optimize lpad/rpad to remove unnecessary memory allocations per element (#2963)
1 parent a122a14 commit b5e4290

File tree

3 files changed

+204
-31
lines changed

3 files changed

+204
-31
lines changed

native/spark-expr/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ harness = false
7676
name = "bloom_filter_agg"
7777
harness = false
7878

79+
[[bench]]
80+
name = "padding"
81+
harness = false
82+
7983
[[test]]
8084
name = "test_udf_registration"
8185
path = "tests/spark_expr_reg.rs"
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::builder::StringBuilder;
19+
use arrow::array::ArrayRef;
20+
use criterion::{criterion_group, criterion_main, Criterion};
21+
use datafusion::common::ScalarValue;
22+
use datafusion::physical_plan::ColumnarValue;
23+
use datafusion_comet_spark_expr::{spark_lpad, spark_rpad};
24+
use std::hint::black_box;
25+
use std::sync::Arc;
26+
27+
fn create_string_array(size: usize) -> ArrayRef {
28+
let mut builder = StringBuilder::new();
29+
for i in 0..size {
30+
if i % 10 == 0 {
31+
builder.append_null();
32+
} else {
33+
builder.append_value(format!("string{}", i % 100));
34+
}
35+
}
36+
Arc::new(builder.finish())
37+
}
38+
39+
fn criterion_benchmark(c: &mut Criterion) {
40+
let size = 8192;
41+
let string_array = create_string_array(size);
42+
43+
// lpad with default padding (space)
44+
c.bench_function("spark_lpad: default padding", |b| {
45+
let args = vec![
46+
ColumnarValue::Array(Arc::clone(&string_array)),
47+
ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
48+
];
49+
b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
50+
});
51+
52+
// lpad with custom padding character
53+
c.bench_function("spark_lpad: custom padding", |b| {
54+
let args = vec![
55+
ColumnarValue::Array(Arc::clone(&string_array)),
56+
ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
57+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("*".to_string()))),
58+
];
59+
b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
60+
});
61+
62+
// rpad with default padding (space)
63+
c.bench_function("spark_rpad: default padding", |b| {
64+
let args = vec![
65+
ColumnarValue::Array(Arc::clone(&string_array)),
66+
ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
67+
];
68+
b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
69+
});
70+
71+
// rpad with custom padding character
72+
c.bench_function("spark_rpad: custom padding", |b| {
73+
let args = vec![
74+
ColumnarValue::Array(Arc::clone(&string_array)),
75+
ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
76+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("*".to_string()))),
77+
];
78+
b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
79+
});
80+
81+
// lpad with multi-character padding string
82+
c.bench_function("spark_lpad: multi-char padding", |b| {
83+
let args = vec![
84+
ColumnarValue::Array(Arc::clone(&string_array)),
85+
ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
86+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("abc".to_string()))),
87+
];
88+
b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
89+
});
90+
91+
// rpad with multi-character padding string
92+
c.bench_function("spark_rpad: multi-char padding", |b| {
93+
let args = vec![
94+
ColumnarValue::Array(Arc::clone(&string_array)),
95+
ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
96+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("abc".to_string()))),
97+
];
98+
b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
99+
});
100+
101+
// lpad with truncation (target length shorter than some strings)
102+
c.bench_function("spark_lpad: with truncation", |b| {
103+
let args = vec![
104+
ColumnarValue::Array(Arc::clone(&string_array)),
105+
ColumnarValue::Scalar(ScalarValue::Int32(Some(5))),
106+
];
107+
b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
108+
});
109+
110+
// rpad with truncation (target length shorter than some strings)
111+
c.bench_function("spark_rpad: with truncation", |b| {
112+
let args = vec![
113+
ColumnarValue::Array(Arc::clone(&string_array)),
114+
ColumnarValue::Scalar(ScalarValue::Int32(Some(5))),
115+
];
116+
b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
117+
});
118+
}
119+
120+
criterion_group!(benches, criterion_benchmark);
121+
criterion_main!(benches);

native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs

Lines changed: 79 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
194194
is_left_pad: bool,
195195
) -> Result<ColumnarValue, DataFusionError> {
196196
let string_array = as_generic_string_array::<T>(array)?;
197+
198+
// Pre-compute pad characters once to avoid repeated iteration
199+
let pad_chars: Vec<char> = pad_string.chars().collect();
200+
197201
match pad_type {
198202
ColumnarValue::Array(array_int) => {
199203
let int_pad_array = array_int.as_primitive::<Int32Type>();
@@ -203,18 +207,24 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
203207
string_array.len() * int_pad_array.len(),
204208
);
205209

210+
// Reusable buffer to avoid per-element allocations
211+
let mut buffer = String::with_capacity(pad_chars.len());
212+
206213
for (string, length) in string_array.iter().zip(int_pad_array) {
207214
let length = length.unwrap();
208215
match string {
209216
Some(string) => {
210217
if length >= 0 {
211-
builder.append_value(add_padding_string(
212-
string.parse().unwrap(),
218+
buffer.clear();
219+
write_padded_string(
220+
&mut buffer,
221+
string,
213222
length as usize,
214223
truncate,
215-
pad_string,
224+
&pad_chars,
216225
is_left_pad,
217-
)?)
226+
);
227+
builder.append_value(&buffer);
218228
} else {
219229
builder.append_value("");
220230
}
@@ -232,15 +242,23 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
232242
string_array.len() * length,
233243
);
234244

245+
// Reusable buffer to avoid per-element allocations
246+
let mut buffer = String::with_capacity(length);
247+
235248
for string in string_array.iter() {
236249
match string {
237-
Some(string) => builder.append_value(add_padding_string(
238-
string.parse().unwrap(),
239-
length,
240-
truncate,
241-
pad_string,
242-
is_left_pad,
243-
)?),
250+
Some(string) => {
251+
buffer.clear();
252+
write_padded_string(
253+
&mut buffer,
254+
string,
255+
length,
256+
truncate,
257+
&pad_chars,
258+
is_left_pad,
259+
);
260+
builder.append_value(&buffer);
261+
}
244262
_ => builder.append_null(),
245263
}
246264
}
@@ -249,44 +267,74 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
249267
}
250268
}
251269

252-
fn add_padding_string(
253-
string: String,
270+
/// Writes a padded string to the provided buffer, avoiding allocations.
271+
///
272+
/// The buffer is assumed to be cleared before calling this function.
273+
/// Padding characters are written directly to the buffer without intermediate allocations.
274+
#[inline]
275+
fn write_padded_string(
276+
buffer: &mut String,
277+
string: &str,
254278
length: usize,
255279
truncate: bool,
256-
pad_string: &str,
280+
pad_chars: &[char],
257281
is_left_pad: bool,
258-
) -> Result<String, DataFusionError> {
259-
// It looks Spark's UTF8String is closer to chars rather than graphemes
282+
) {
283+
// Spark's UTF8String uses char count, not grapheme count
260284
// https://stackoverflow.com/a/46290728
261285
let char_len = string.chars().count();
286+
262287
if length <= char_len {
263288
if truncate {
289+
// Find byte index for the truncation point
264290
let idx = string
265291
.char_indices()
266292
.nth(length)
267293
.map(|(i, _)| i)
268294
.unwrap_or(string.len());
269-
match string[..idx].parse() {
270-
Ok(string) => Ok(string),
271-
Err(err) => Err(DataFusionError::Internal(format!(
272-
"Failed adding padding string {} error {:}",
273-
string, err
274-
))),
275-
}
295+
buffer.push_str(&string[..idx]);
276296
} else {
277-
Ok(string)
297+
buffer.push_str(string);
278298
}
279299
} else {
280300
let pad_needed = length - char_len;
281-
let pad: String = pad_string.chars().cycle().take(pad_needed).collect();
282-
let mut result = String::with_capacity(string.len() + pad.len());
301+
283302
if is_left_pad {
284-
result.push_str(&pad);
285-
result.push_str(&string);
303+
// Write padding first, then string
304+
write_padding_chars(buffer, pad_chars, pad_needed);
305+
buffer.push_str(string);
286306
} else {
287-
result.push_str(&string);
288-
result.push_str(&pad);
307+
// Write string first, then padding
308+
buffer.push_str(string);
309+
write_padding_chars(buffer, pad_chars, pad_needed);
310+
}
311+
}
312+
}
313+
314+
/// Writes `count` characters from the cycling pad pattern directly to the buffer.
315+
#[inline]
316+
fn write_padding_chars(buffer: &mut String, pad_chars: &[char], count: usize) {
317+
if pad_chars.is_empty() {
318+
return;
319+
}
320+
321+
// Optimize for the common single-character padding case
322+
if pad_chars.len() == 1 {
323+
let ch = pad_chars[0];
324+
for _ in 0..count {
325+
buffer.push(ch);
326+
}
327+
} else {
328+
// Multi-character padding: cycle through pad_chars
329+
let mut remaining = count;
330+
while remaining > 0 {
331+
for &ch in pad_chars {
332+
if remaining == 0 {
333+
break;
334+
}
335+
buffer.push(ch);
336+
remaining -= 1;
337+
}
289338
}
290-
Ok(result)
291339
}
292340
}

0 commit comments

Comments
 (0)