Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ harness = false
name = "padding"
harness = false

[[bench]]
name = "date_trunc"
harness = false

[[test]]
name = "test_udf_registration"
path = "tests/spark_expr_reg.rs"
55 changes: 55 additions & 0 deletions native/spark-expr/benches/date_trunc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Date32Array};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_comet_spark_expr::date_trunc_dyn;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let date_array = create_date_array();

let mut group = c.benchmark_group("date_trunc");

// Benchmark each truncation format
for format in ["YEAR", "QUARTER", "MONTH", "WEEK"] {
let array_ref: ArrayRef = Arc::new(date_array.clone());
group.bench_function(format!("date_trunc_{}", format.to_lowercase()), |b| {
b.iter(|| date_trunc_dyn(&array_ref, format.to_string()).unwrap());
});
}

group.finish();
}

fn create_date_array() -> Date32Array {
// Create 10000 dates spanning several years (more realistic workload)
// Days since Unix epoch: range from 0 (1970-01-01) to ~19000 (2022)
let dates: Vec<i32> = (0..10000).map(|i| (i * 2) % 19000).collect();
Date32Array::from(dates)
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
2 changes: 1 addition & 1 deletion native/spark-expr/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
//! Kernels

pub mod strings;
pub(crate) mod temporal;
pub mod temporal;
197 changes: 102 additions & 95 deletions native/spark-expr/src/kernels/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

//! temporal kernels

use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Timelike, Utc};
use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};

use std::sync::Arc;

use arrow::array::{
downcast_dictionary_array, downcast_temporal_array,
temporal_conversions::*,
timezone::Tz,
types::{ArrowDictionaryKeyType, ArrowTemporalType, Date32Type, TimestampMicrosecondType},
types::{ArrowDictionaryKeyType, ArrowTemporalType, TimestampMicrosecondType},
ArrowNumericType,
};
use arrow::{
Expand All @@ -46,47 +46,57 @@ macro_rules! return_compute_error_with {
// and the beginning of the Unix Epoch (1970-01-01)
const DAYS_TO_UNIX_EPOCH: i32 = 719_163;

// Copied from arrow_arith/temporal.rs with modification to the output datatype
// Transforms a array of NaiveDate to an array of Date32 after applying an operation
fn as_datetime_with_op<A: ArrayAccessor<Item = T::Native>, T: ArrowTemporalType, F>(
iter: ArrayIter<A>,
mut builder: PrimitiveBuilder<Date32Type>,
op: F,
) -> Date32Array
where
F: Fn(NaiveDateTime) -> i32,
i64: From<T::Native>,
{
iter.into_iter().for_each(|value| {
if let Some(value) = value {
match as_datetime::<T>(i64::from(value)) {
Some(dt) => builder.append_value(op(dt)),
None => builder.append_null(),
}
} else {
builder.append_null();
}
});
// Optimized date truncation functions that work directly with days since epoch
// These avoid the overhead of converting to/from NaiveDateTime

builder.finish()
/// Convert days since Unix epoch to NaiveDate
#[inline]
fn days_to_date(days: i32) -> Option<NaiveDate> {
NaiveDate::from_num_days_from_ce_opt(days + DAYS_TO_UNIX_EPOCH)
}

/// Truncate date to first day of year - optimized version
/// Uses ordinal (day of year) to avoid creating a new date
#[inline]
fn as_datetime_with_op_single<F>(
value: Option<i32>,
builder: &mut PrimitiveBuilder<Date32Type>,
op: F,
) where
F: Fn(NaiveDateTime) -> i32,
{
if let Some(value) = value {
match as_datetime::<Date32Type>(i64::from(value)) {
Some(dt) => builder.append_value(op(dt)),
None => builder.append_null(),
}
} else {
builder.append_null();
}
fn trunc_days_to_year(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
let day_of_year_offset = date.ordinal() as i32 - 1;
Some(days - day_of_year_offset)
}

/// Truncate date to first day of quarter - optimized version
/// Computes offset from first day of quarter without creating a new date
#[inline]
fn trunc_days_to_quarter(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
let month = date.month(); // 1-12
let quarter = (month - 1) / 3; // 0-3
let first_month_of_quarter = quarter * 3 + 1; // 1, 4, 7, or 10

// Find day of year for first day of quarter
let first_day_of_quarter = NaiveDate::from_ymd_opt(date.year(), first_month_of_quarter, 1)?;
let quarter_start_ordinal = first_day_of_quarter.ordinal() as i32;
let current_ordinal = date.ordinal() as i32;

Some(days - (current_ordinal - quarter_start_ordinal))
}

/// Truncate date to first day of month - optimized version
/// Instead of creating a new date, just subtract day offset
#[inline]
fn trunc_days_to_month(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
let day_offset = date.day() as i32 - 1;
Some(days - day_offset)
}

/// Truncate date to first day of week (Monday) - optimized version
#[inline]
fn trunc_days_to_week(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
// weekday().num_days_from_monday() gives 0 for Monday, 1 for Tuesday, etc.
let days_since_monday = date.weekday().num_days_from_monday() as i32;
Some(days - days_since_monday)
}

// Based on arrow_arith/temporal.rs:extract_component_from_datetime_array
Expand Down Expand Up @@ -143,11 +153,6 @@ where
Ok(())
}

#[inline]
fn as_days_from_unix_epoch(dt: Option<NaiveDateTime>) -> i32 {
dt.unwrap().num_days_from_ce() - DAYS_TO_UNIX_EPOCH
}

// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
#[inline]
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
Expand Down Expand Up @@ -251,7 +256,7 @@ fn trunc_date_to_microsec<T: Timelike>(dt: T) -> Option<T> {
/// array is an array of Date32 values. The array may be a dictionary array.
///
/// format is a scalar string specifying the format to apply to the timestamp value.
pub(crate) fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, SparkError> {
pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, SparkError> {
match array.data_type().clone() {
DataType::Dictionary(_, _) => {
downcast_dictionary_array!(
Expand Down Expand Up @@ -282,41 +287,49 @@ where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let builder = Date32Builder::with_capacity(array.len());
let iter = ArrayIter::new(array);
match array.data_type() {
DataType::Date32 => match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_year(dt)),
)),
"QUARTER" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_quarter(dt)),
)),
"MONTH" | "MON" | "MM" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_month(dt)),
)),
"WEEK" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_week(dt)),
)),
_ => Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'date_trunc'"
))),
},
DataType::Date32 => {
// Use optimized path for Date32 that works directly with days
date_trunc_date32(
array
.as_any()
.downcast_ref::<Date32Array>()
.expect("Date32 type mismatch"),
format,
)
}
dt => return_compute_error_with!(
"Unsupported input type '{:?}' for function 'date_trunc'",
dt
),
}
}

/// Optimized date truncation for Date32 arrays
/// Works directly with days since epoch instead of converting to/from NaiveDateTime
fn date_trunc_date32(array: &Date32Array, format: String) -> Result<Date32Array, SparkError> {
// Select the truncation function based on format
let trunc_fn: fn(i32) -> Option<i32> = match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => trunc_days_to_year,
"QUARTER" => trunc_days_to_quarter,
"MONTH" | "MON" | "MM" => trunc_days_to_month,
"WEEK" => trunc_days_to_week,
_ => {
return Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'date_trunc'"
)))
}
};

// Apply truncation to each element
let result: Date32Array = array
.iter()
.map(|opt_days| opt_days.and_then(trunc_fn))
.collect();

Ok(result)
}

///
/// Implements the spark [TRUNC](https://spark.apache.org/docs/latest/api/sql/index.html#trunc)
/// function where the specified format may be an array
Expand Down Expand Up @@ -410,29 +423,23 @@ macro_rules! date_trunc_array_fmt_helper {
match $datatype {
DataType::Date32 => {
for (index, val) in iter.enumerate() {
let op_result = match $formats.value(index).to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => {
Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_year(dt))
}))
}
"QUARTER" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_quarter(dt))
})),
"MONTH" | "MON" | "MM" => {
Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_month(dt))
}))
}
"WEEK" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_week(dt))
})),
_ => Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function 'date_trunc'",
$formats.value(index)
))),
};
op_result?
let trunc_fn: fn(i32) -> Option<i32> =
match $formats.value(index).to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => trunc_days_to_year,
"QUARTER" => trunc_days_to_quarter,
"MONTH" | "MON" | "MM" => trunc_days_to_month,
"WEEK" => trunc_days_to_week,
_ => {
return Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function 'date_trunc'",
$formats.value(index)
)))
}
};
match val.and_then(trunc_fn) {
Some(days) => builder.append_value(days),
None => builder.append_null(),
}
}
Ok(builder.finish())
}
Expand Down
3 changes: 2 additions & 1 deletion native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

mod error;

mod kernels;
pub mod kernels;
pub use kernels::temporal::date_trunc_dyn;
mod static_invoke;
pub use static_invoke::*;

Expand Down
Loading