Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 200 additions & 84 deletions datafusion/functions/src/datetime/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,28 @@ use std::str::FromStr;
use std::sync::Arc;

use arrow::array::temporal_conversions::{
as_datetime_with_timezone, timestamp_ns_to_datetime,
MICROSECONDS, MILLISECONDS, NANOSECONDS, as_datetime_with_timezone,
timestamp_ns_to_datetime,
};
use arrow::array::timezone::Tz;
use arrow::array::types::{
ArrowTimestampType, TimestampMicrosecondType, TimestampMillisecondType,
ArrowTimestampType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow::array::{Array, ArrayRef, PrimitiveArray};
use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8, Utf8View};
use arrow::datatypes::DataType::{self, Time32, Time64, Timestamp};
use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second};
use datafusion_common::cast::as_primitive_array;
use datafusion_common::types::{NativeType, logical_date, logical_string};
use datafusion_common::{
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, plan_err,
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
};
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, TIMEZONE_WILDCARD, Volatility,
ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
use datafusion_macros::user_doc;

use chrono::{
Expand Down Expand Up @@ -116,16 +119,30 @@ impl DateTruncGranularity {
fn is_fine_granularity_utc(&self) -> bool {
self.is_fine_granularity() || matches!(self, Self::Hour | Self::Day)
}

/// Returns true if this granularity is valid for Time types
/// Time types don't have date components, so day/week/month/quarter/year are not valid
fn valid_for_time(&self) -> bool {
matches!(
self,
Self::Hour
| Self::Minute
| Self::Second
| Self::Millisecond
| Self::Microsecond
)
}
}

#[user_doc(
doc_section(label = "Time and Date Functions"),
description = "Truncates a timestamp value to a specified precision.",
description = "Truncates a timestamp or time value to a specified precision.",
syntax_example = "date_trunc(precision, expression)",
argument(
name = "precision",
description = r#"Time precision to truncate to. The following precisions are supported:

For Timestamp types:
- year / YEAR
- quarter / QUARTER
- month / MONTH
Expand All @@ -136,11 +153,18 @@ impl DateTruncGranularity {
- second / SECOND
- millisecond / MILLISECOND
- microsecond / MICROSECOND

For Time types (hour, minute, second, millisecond, microsecond only):
- hour / HOUR
- minute / MINUTE
- second / SECOND
- millisecond / MILLISECOND
- microsecond / MICROSECOND
"#
),
argument(
name = "expression",
description = "Time expression to operate on. Can be a constant, column, or function."
description = "Timestamp or time expression to operate on. Can be a constant, column, or function."
)
)]
#[derive(Debug, PartialEq, Eq, Hash)]
Expand All @@ -160,45 +184,21 @@ impl DateTruncFunc {
Self {
signature: Signature::one_of(
vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8View, Timestamp(Nanosecond, None)]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![
Utf8View,
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8View, Timestamp(Microsecond, None)]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
TypeSignature::Coercible(vec![
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
Coercion::new_implicit(
TypeSignatureClass::Timestamp,
// Allow implicit cast from string and date to timestamp for backward compatibility
vec![
TypeSignatureClass::Native(logical_string()),
TypeSignatureClass::Native(logical_date()),
],
NativeType::Timestamp(Nanosecond, None),
),
]),
Exact(vec![
Utf8View,
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8View, Timestamp(Millisecond, None)]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![
Utf8View,
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![Utf8View, Timestamp(Second, None)]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
]),
Exact(vec![
Utf8View,
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
TypeSignature::Coercible(vec![
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
Coercion::new_exact(TypeSignatureClass::Time),
]),
],
Volatility::Immutable,
Expand All @@ -222,17 +222,10 @@ impl ScalarUDFImpl for DateTruncFunc {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[1] {
Timestamp(Nanosecond, None) | Utf8 | DataType::Date32 | Null => {
Ok(Timestamp(Nanosecond, None))
}
Timestamp(Nanosecond, tz_opt) => Ok(Timestamp(Nanosecond, tz_opt.clone())),
Timestamp(Microsecond, tz_opt) => Ok(Timestamp(Microsecond, tz_opt.clone())),
Timestamp(Millisecond, tz_opt) => Ok(Timestamp(Millisecond, tz_opt.clone())),
Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
_ => plan_err!(
"The date_trunc function can only accept timestamp as the second arg."
),
if arg_types[1].is_null() {
Ok(Timestamp(Nanosecond, None))
} else {
Ok(arg_types[1].clone())
}
}

Expand All @@ -248,6 +241,9 @@ impl ScalarUDFImpl for DateTruncFunc {
{
v.to_lowercase()
} else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = granularity
{
v.to_lowercase()
} else if let ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(v))) = granularity
{
v.to_lowercase()
} else {
Expand All @@ -256,6 +252,15 @@ impl ScalarUDFImpl for DateTruncFunc {

let granularity = DateTruncGranularity::from_str(&granularity_str)?;

// Check upfront if granularity is valid for Time types
let is_time_type = matches!(array.data_type(), Time64(_) | Time32(_));
if is_time_type && !granularity.valid_for_time() {
return exec_err!(
"date_trunc does not support '{}' granularity for Time types. Valid values are: hour, minute, second, millisecond, microsecond",
granularity_str
);
}

fn process_array<T: ArrowTimestampType>(
array: &dyn Array,
granularity: DateTruncGranularity,
Expand Down Expand Up @@ -303,6 +308,10 @@ impl ScalarUDFImpl for DateTruncFunc {
}

Ok(match array {
ColumnarValue::Scalar(ScalarValue::Null) => {
// NULL input returns NULL timestamp
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, None))
}
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
process_scalar::<TimestampNanosecondType>(v, granularity, tz_opt)?
}
Expand All @@ -315,40 +324,77 @@ impl ScalarUDFImpl for DateTruncFunc {
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
process_scalar::<TimestampSecondType>(v, granularity, tz_opt)?
}
ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => {
let truncated = v.map(|val| truncate_time_nanos(val, granularity));
ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(truncated))
}
ColumnarValue::Scalar(ScalarValue::Time64Microsecond(v)) => {
let truncated = v.map(|val| truncate_time_micros(val, granularity));
ColumnarValue::Scalar(ScalarValue::Time64Microsecond(truncated))
}
ColumnarValue::Scalar(ScalarValue::Time32Millisecond(v)) => {
let truncated = v.map(|val| truncate_time_millis(val, granularity));
ColumnarValue::Scalar(ScalarValue::Time32Millisecond(truncated))
}
ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => {
let truncated = v.map(|val| truncate_time_secs(val, granularity));
ColumnarValue::Scalar(ScalarValue::Time32Second(truncated))
}
ColumnarValue::Array(array) => {
let array_type = array.data_type();
if let Timestamp(unit, tz_opt) = array_type {
match unit {
Second => process_array::<TimestampSecondType>(
array,
granularity,
tz_opt,
)?,
Millisecond => process_array::<TimestampMillisecondType>(
array,
granularity,
tz_opt,
)?,
Microsecond => process_array::<TimestampMicrosecondType>(
array,
granularity,
tz_opt,
)?,
Nanosecond => process_array::<TimestampNanosecondType>(
array,
granularity,
tz_opt,
)?,
match array_type {
Timestamp(Second, tz_opt) => {
process_array::<TimestampSecondType>(array, granularity, tz_opt)?
}
Timestamp(Millisecond, tz_opt) => process_array::<
TimestampMillisecondType,
>(
array, granularity, tz_opt
)?,
Timestamp(Microsecond, tz_opt) => process_array::<
TimestampMicrosecondType,
>(
array, granularity, tz_opt
)?,
Timestamp(Nanosecond, tz_opt) => process_array::<
TimestampNanosecondType,
>(
array, granularity, tz_opt
)?,
Time64(Nanosecond) => {
let arr = as_primitive_array::<Time64NanosecondType>(array)?;
let result: PrimitiveArray<Time64NanosecondType> =
arr.unary(|v| truncate_time_nanos(v, granularity));
ColumnarValue::Array(Arc::new(result))
}
Time64(Microsecond) => {
let arr = as_primitive_array::<Time64MicrosecondType>(array)?;
let result: PrimitiveArray<Time64MicrosecondType> =
arr.unary(|v| truncate_time_micros(v, granularity));
ColumnarValue::Array(Arc::new(result))
}
Time32(Millisecond) => {
let arr = as_primitive_array::<Time32MillisecondType>(array)?;
let result: PrimitiveArray<Time32MillisecondType> =
arr.unary(|v| truncate_time_millis(v, granularity));
ColumnarValue::Array(Arc::new(result))
}
Time32(Second) => {
let arr = as_primitive_array::<Time32SecondType>(array)?;
let result: PrimitiveArray<Time32SecondType> =
arr.unary(|v| truncate_time_secs(v, granularity));
ColumnarValue::Array(Arc::new(result))
}
_ => {
return exec_err!(
"second argument of `date_trunc` is an unsupported array type: {array_type}"
);
}
} else {
return exec_err!(
"second argument of `date_trunc` is an unsupported array type: {array_type}"
);
}
}
_ => {
return exec_err!(
"second argument of `date_trunc` must be timestamp scalar or array"
"second argument of `date_trunc` must be timestamp, time scalar or array"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"second argument of `date_trunc` must be timestamp, time scalar or array"
"second argument of `date_trunc` must be a timestamp/time scalar or an array"

);
}
})
Expand All @@ -374,6 +420,76 @@ impl ScalarUDFImpl for DateTruncFunc {
}
}

const NANOS_PER_MICROSECOND: i64 = NANOSECONDS / MICROSECONDS;
const NANOS_PER_MILLISECOND: i64 = NANOSECONDS / MILLISECONDS;
const NANOS_PER_SECOND: i64 = NANOSECONDS;
const NANOS_PER_MINUTE: i64 = 60 * NANOS_PER_SECOND;
const NANOS_PER_HOUR: i64 = 60 * NANOS_PER_MINUTE;

const MICROS_PER_MILLISECOND: i64 = MICROSECONDS / MILLISECONDS;
const MICROS_PER_SECOND: i64 = MICROSECONDS;
const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SECOND;
const MICROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE;

const MILLIS_PER_SECOND: i32 = MILLISECONDS as i32;
const MILLIS_PER_MINUTE: i32 = 60 * MILLIS_PER_SECOND;
const MILLIS_PER_HOUR: i32 = 60 * MILLIS_PER_MINUTE;

const SECS_PER_MINUTE: i32 = 60;
const SECS_PER_HOUR: i32 = 60 * SECS_PER_MINUTE;

/// Truncate time in nanoseconds to the specified granularity
fn truncate_time_nanos(value: i64, granularity: DateTruncGranularity) -> i64 {
match granularity {
DateTruncGranularity::Hour => value - (value % NANOS_PER_HOUR),
DateTruncGranularity::Minute => value - (value % NANOS_PER_MINUTE),
DateTruncGranularity::Second => value - (value % NANOS_PER_SECOND),
DateTruncGranularity::Millisecond => value - (value % NANOS_PER_MILLISECOND),
DateTruncGranularity::Microsecond => value - (value % NANOS_PER_MICROSECOND),
// Other granularities are not valid for time - should be caught earlier
_ => value,
}
}

/// Truncate time in microseconds to the specified granularity
fn truncate_time_micros(value: i64, granularity: DateTruncGranularity) -> i64 {
match granularity {
DateTruncGranularity::Hour => value - (value % MICROS_PER_HOUR),
DateTruncGranularity::Minute => value - (value % MICROS_PER_MINUTE),
DateTruncGranularity::Second => value - (value % MICROS_PER_SECOND),
DateTruncGranularity::Millisecond => value - (value % MICROS_PER_MILLISECOND),
DateTruncGranularity::Microsecond => value, // Already at microsecond precision
// Other granularities are not valid for time
_ => value,
}
}

/// Truncate time in milliseconds to the specified granularity
fn truncate_time_millis(value: i32, granularity: DateTruncGranularity) -> i32 {
match granularity {
DateTruncGranularity::Hour => value - (value % MILLIS_PER_HOUR),
DateTruncGranularity::Minute => value - (value % MILLIS_PER_MINUTE),
DateTruncGranularity::Second => value - (value % MILLIS_PER_SECOND),
DateTruncGranularity::Millisecond => value, // Already at millisecond precision
DateTruncGranularity::Microsecond => value, // Can't truncate to finer precision
// Other granularities are not valid for time
_ => value,
}
}

/// Truncate time in seconds to the specified granularity
fn truncate_time_secs(value: i32, granularity: DateTruncGranularity) -> i32 {
match granularity {
DateTruncGranularity::Hour => value - (value % SECS_PER_HOUR),
DateTruncGranularity::Minute => value - (value % SECS_PER_MINUTE),
DateTruncGranularity::Second => value, // Already at second precision
DateTruncGranularity::Millisecond => value, // Can't truncate to finer precision
DateTruncGranularity::Microsecond => value, // Can't truncate to finer precision
// Other granularities are not valid for time
_ => value,
}
}

fn _date_trunc_coarse<T>(
granularity: DateTruncGranularity,
value: Option<T>,
Expand Down
Loading