Skip to content

Commit 4dac745

Browse files
authored
feat: Improve performance of date truncate (#2997)
1 parent e041a1e commit 4dac745

File tree

5 files changed

+164
-97
lines changed

5 files changed

+164
-97
lines changed

native/spark-expr/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ harness = false
8080
name = "padding"
8181
harness = false
8282

83+
[[bench]]
84+
name = "date_trunc"
85+
harness = false
86+
8387
[[bench]]
8488
name = "normalize_nan"
8589
harness = false
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, Date32Array};
19+
use criterion::{criterion_group, criterion_main, Criterion};
20+
use datafusion_comet_spark_expr::date_trunc_dyn;
21+
use std::sync::Arc;
22+
23+
fn criterion_benchmark(c: &mut Criterion) {
24+
let date_array = create_date_array();
25+
26+
let mut group = c.benchmark_group("date_trunc");
27+
28+
// Benchmark each truncation format
29+
for format in ["YEAR", "QUARTER", "MONTH", "WEEK"] {
30+
let array_ref: ArrayRef = Arc::new(date_array.clone());
31+
group.bench_function(format!("date_trunc_{}", format.to_lowercase()), |b| {
32+
b.iter(|| date_trunc_dyn(&array_ref, format.to_string()).unwrap());
33+
});
34+
}
35+
36+
group.finish();
37+
}
38+
39+
fn create_date_array() -> Date32Array {
40+
// Create 10000 dates spanning several years (more realistic workload)
41+
// Days since Unix epoch: range from 0 (1970-01-01) to ~19000 (2022)
42+
let dates: Vec<i32> = (0..10000).map(|i| (i * 2) % 19000).collect();
43+
Date32Array::from(dates)
44+
}
45+
46+
fn config() -> Criterion {
47+
Criterion::default()
48+
}
49+
50+
criterion_group! {
51+
name = benches;
52+
config = config();
53+
targets = criterion_benchmark
54+
}
55+
criterion_main!(benches);

native/spark-expr/src/kernels/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
//! Kernels
1919
2020
pub mod strings;
21-
pub(crate) mod temporal;
21+
pub mod temporal;

native/spark-expr/src/kernels/temporal.rs

Lines changed: 102 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717

1818
//! temporal kernels
1919
20-
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Timelike, Utc};
20+
use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};
2121

2222
use std::sync::Arc;
2323

2424
use arrow::array::{
2525
downcast_dictionary_array, downcast_temporal_array,
2626
temporal_conversions::*,
2727
timezone::Tz,
28-
types::{ArrowDictionaryKeyType, ArrowTemporalType, Date32Type, TimestampMicrosecondType},
28+
types::{ArrowDictionaryKeyType, ArrowTemporalType, TimestampMicrosecondType},
2929
ArrowNumericType,
3030
};
3131
use arrow::{
@@ -46,47 +46,57 @@ macro_rules! return_compute_error_with {
4646
// and the beginning of the Unix Epoch (1970-01-01)
4747
const DAYS_TO_UNIX_EPOCH: i32 = 719_163;
4848

49-
// Copied from arrow_arith/temporal.rs with modification to the output datatype
50-
// Transforms a array of NaiveDate to an array of Date32 after applying an operation
51-
fn as_datetime_with_op<A: ArrayAccessor<Item = T::Native>, T: ArrowTemporalType, F>(
52-
iter: ArrayIter<A>,
53-
mut builder: PrimitiveBuilder<Date32Type>,
54-
op: F,
55-
) -> Date32Array
56-
where
57-
F: Fn(NaiveDateTime) -> i32,
58-
i64: From<T::Native>,
59-
{
60-
iter.into_iter().for_each(|value| {
61-
if let Some(value) = value {
62-
match as_datetime::<T>(i64::from(value)) {
63-
Some(dt) => builder.append_value(op(dt)),
64-
None => builder.append_null(),
65-
}
66-
} else {
67-
builder.append_null();
68-
}
69-
});
49+
// Optimized date truncation functions that work directly with days since epoch
50+
// These avoid the overhead of converting to/from NaiveDateTime
7051

71-
builder.finish()
52+
/// Convert days since Unix epoch to NaiveDate
53+
#[inline]
54+
fn days_to_date(days: i32) -> Option<NaiveDate> {
55+
NaiveDate::from_num_days_from_ce_opt(days + DAYS_TO_UNIX_EPOCH)
7256
}
7357

58+
/// Truncate date to first day of year - optimized version
59+
/// Uses ordinal (day of year) to avoid creating a new date
7460
#[inline]
75-
fn as_datetime_with_op_single<F>(
76-
value: Option<i32>,
77-
builder: &mut PrimitiveBuilder<Date32Type>,
78-
op: F,
79-
) where
80-
F: Fn(NaiveDateTime) -> i32,
81-
{
82-
if let Some(value) = value {
83-
match as_datetime::<Date32Type>(i64::from(value)) {
84-
Some(dt) => builder.append_value(op(dt)),
85-
None => builder.append_null(),
86-
}
87-
} else {
88-
builder.append_null();
89-
}
61+
fn trunc_days_to_year(days: i32) -> Option<i32> {
62+
let date = days_to_date(days)?;
63+
let day_of_year_offset = date.ordinal() as i32 - 1;
64+
Some(days - day_of_year_offset)
65+
}
66+
67+
/// Truncate date to first day of quarter - optimized version
68+
/// Computes offset from first day of quarter without creating a new date
69+
#[inline]
70+
fn trunc_days_to_quarter(days: i32) -> Option<i32> {
71+
let date = days_to_date(days)?;
72+
let month = date.month(); // 1-12
73+
let quarter = (month - 1) / 3; // 0-3
74+
let first_month_of_quarter = quarter * 3 + 1; // 1, 4, 7, or 10
75+
76+
// Find day of year for first day of quarter
77+
let first_day_of_quarter = NaiveDate::from_ymd_opt(date.year(), first_month_of_quarter, 1)?;
78+
let quarter_start_ordinal = first_day_of_quarter.ordinal() as i32;
79+
let current_ordinal = date.ordinal() as i32;
80+
81+
Some(days - (current_ordinal - quarter_start_ordinal))
82+
}
83+
84+
/// Truncate date to first day of month - optimized version
85+
/// Instead of creating a new date, just subtract day offset
86+
#[inline]
87+
fn trunc_days_to_month(days: i32) -> Option<i32> {
88+
let date = days_to_date(days)?;
89+
let day_offset = date.day() as i32 - 1;
90+
Some(days - day_offset)
91+
}
92+
93+
/// Truncate date to first day of week (Monday) - optimized version
94+
#[inline]
95+
fn trunc_days_to_week(days: i32) -> Option<i32> {
96+
let date = days_to_date(days)?;
97+
// weekday().num_days_from_monday() gives 0 for Monday, 1 for Tuesday, etc.
98+
let days_since_monday = date.weekday().num_days_from_monday() as i32;
99+
Some(days - days_since_monday)
90100
}
91101

92102
// Based on arrow_arith/temporal.rs:extract_component_from_datetime_array
@@ -143,11 +153,6 @@ where
143153
Ok(())
144154
}
145155

146-
#[inline]
147-
fn as_days_from_unix_epoch(dt: Option<NaiveDateTime>) -> i32 {
148-
dt.unwrap().num_days_from_ce() - DAYS_TO_UNIX_EPOCH
149-
}
150-
151156
// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
152157
#[inline]
153158
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
@@ -251,7 +256,7 @@ fn trunc_date_to_microsec<T: Timelike>(dt: T) -> Option<T> {
251256
/// array is an array of Date32 values. The array may be a dictionary array.
252257
///
253258
/// format is a scalar string specifying the format to apply to the timestamp value.
254-
pub(crate) fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, SparkError> {
259+
pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, SparkError> {
255260
match array.data_type().clone() {
256261
DataType::Dictionary(_, _) => {
257262
downcast_dictionary_array!(
@@ -282,41 +287,49 @@ where
282287
T: ArrowTemporalType + ArrowNumericType,
283288
i64: From<T::Native>,
284289
{
285-
let builder = Date32Builder::with_capacity(array.len());
286-
let iter = ArrayIter::new(array);
287290
match array.data_type() {
288-
DataType::Date32 => match format.to_uppercase().as_str() {
289-
"YEAR" | "YYYY" | "YY" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
290-
iter,
291-
builder,
292-
|dt| as_days_from_unix_epoch(trunc_date_to_year(dt)),
293-
)),
294-
"QUARTER" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
295-
iter,
296-
builder,
297-
|dt| as_days_from_unix_epoch(trunc_date_to_quarter(dt)),
298-
)),
299-
"MONTH" | "MON" | "MM" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
300-
iter,
301-
builder,
302-
|dt| as_days_from_unix_epoch(trunc_date_to_month(dt)),
303-
)),
304-
"WEEK" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
305-
iter,
306-
builder,
307-
|dt| as_days_from_unix_epoch(trunc_date_to_week(dt)),
308-
)),
309-
_ => Err(SparkError::Internal(format!(
310-
"Unsupported format: {format:?} for function 'date_trunc'"
311-
))),
312-
},
291+
DataType::Date32 => {
292+
// Use optimized path for Date32 that works directly with days
293+
date_trunc_date32(
294+
array
295+
.as_any()
296+
.downcast_ref::<Date32Array>()
297+
.expect("Date32 type mismatch"),
298+
format,
299+
)
300+
}
313301
dt => return_compute_error_with!(
314302
"Unsupported input type '{:?}' for function 'date_trunc'",
315303
dt
316304
),
317305
}
318306
}
319307

308+
/// Optimized date truncation for Date32 arrays
309+
/// Works directly with days since epoch instead of converting to/from NaiveDateTime
310+
fn date_trunc_date32(array: &Date32Array, format: String) -> Result<Date32Array, SparkError> {
311+
// Select the truncation function based on format
312+
let trunc_fn: fn(i32) -> Option<i32> = match format.to_uppercase().as_str() {
313+
"YEAR" | "YYYY" | "YY" => trunc_days_to_year,
314+
"QUARTER" => trunc_days_to_quarter,
315+
"MONTH" | "MON" | "MM" => trunc_days_to_month,
316+
"WEEK" => trunc_days_to_week,
317+
_ => {
318+
return Err(SparkError::Internal(format!(
319+
"Unsupported format: {format:?} for function 'date_trunc'"
320+
)))
321+
}
322+
};
323+
324+
// Apply truncation to each element
325+
let result: Date32Array = array
326+
.iter()
327+
.map(|opt_days| opt_days.and_then(trunc_fn))
328+
.collect();
329+
330+
Ok(result)
331+
}
332+
320333
///
321334
/// Implements the spark [TRUNC](https://spark.apache.org/docs/latest/api/sql/index.html#trunc)
322335
/// function where the specified format may be an array
@@ -410,29 +423,23 @@ macro_rules! date_trunc_array_fmt_helper {
410423
match $datatype {
411424
DataType::Date32 => {
412425
for (index, val) in iter.enumerate() {
413-
let op_result = match $formats.value(index).to_uppercase().as_str() {
414-
"YEAR" | "YYYY" | "YY" => {
415-
Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
416-
as_days_from_unix_epoch(trunc_date_to_year(dt))
417-
}))
418-
}
419-
"QUARTER" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
420-
as_days_from_unix_epoch(trunc_date_to_quarter(dt))
421-
})),
422-
"MONTH" | "MON" | "MM" => {
423-
Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
424-
as_days_from_unix_epoch(trunc_date_to_month(dt))
425-
}))
426-
}
427-
"WEEK" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
428-
as_days_from_unix_epoch(trunc_date_to_week(dt))
429-
})),
430-
_ => Err(SparkError::Internal(format!(
431-
"Unsupported format: {:?} for function 'date_trunc'",
432-
$formats.value(index)
433-
))),
434-
};
435-
op_result?
426+
let trunc_fn: fn(i32) -> Option<i32> =
427+
match $formats.value(index).to_uppercase().as_str() {
428+
"YEAR" | "YYYY" | "YY" => trunc_days_to_year,
429+
"QUARTER" => trunc_days_to_quarter,
430+
"MONTH" | "MON" | "MM" => trunc_days_to_month,
431+
"WEEK" => trunc_days_to_week,
432+
_ => {
433+
return Err(SparkError::Internal(format!(
434+
"Unsupported format: {:?} for function 'date_trunc'",
435+
$formats.value(index)
436+
)))
437+
}
438+
};
439+
match val.and_then(trunc_fn) {
440+
Some(days) => builder.append_value(days),
441+
None => builder.append_null(),
442+
}
436443
}
437444
Ok(builder.finish())
438445
}

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
mod error;
2323

24-
mod kernels;
24+
pub mod kernels;
25+
pub use kernels::temporal::date_trunc_dyn;
2526
mod static_invoke;
2627
pub use static_invoke::*;
2728

0 commit comments

Comments
 (0)