Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 128 additions & 8 deletions rust/cubesql/cubesql/src/compile/engine/udf/common.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
any::type_name,
convert::TryInto,
mem::swap,
sync::{Arc, LazyLock},
thread,
};

use chrono::{Datelike, Days, Duration, Months, NaiveDate, NaiveDateTime, NaiveTime};
use chrono::{Datelike, Days, Duration, Months, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use datafusion::{
arrow::{
array::{
Expand All @@ -21,6 +23,7 @@ use datafusion::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType,
TimeUnit, TimestampNanosecondType, UInt32Type,
},
temporal_conversions::timestamp_ns_to_datetime,
},
error::{DataFusionError, Result},
execution::context::SessionContext,
Expand Down Expand Up @@ -3752,6 +3755,130 @@ pub fn create_pg_get_indexdef_udf() -> ScalarUDF {
)
}

pub fn create_age_udf() -> ScalarUDF {
let fun = make_scalar_function(move |args: &[ArrayRef]| match args.len() {
1 => {
let older_dates =
downcast_primitive_arg!(args[0], "older_date", TimestampNanosecondType);
let current_date = Utc::now().date_naive().and_time(NaiveTime::default());

let result = older_dates
.iter()
.map(|older_date| {
older_date
.map(|older_date| {
let older_date = timestamp_ns_to_datetime(older_date);
timestamp_difference_to_interval_month_day_nano(
current_date,
older_date,
)
})
.transpose()
})
.collect::<Result<IntervalMonthDayNanoArray>>()?;

Ok(Arc::new(result) as ArrayRef)
}
2 => {
let newer_dates =
downcast_primitive_arg!(args[0], "newer_date", TimestampNanosecondType);
let older_dates =
downcast_primitive_arg!(args[1], "older_date", TimestampNanosecondType);

let result = newer_dates
.iter()
.zip(older_dates)
.map(|dates| match dates {
(Some(newer_date), Some(older_date)) => {
let newer_date = timestamp_ns_to_datetime(newer_date);
let older_date = timestamp_ns_to_datetime(older_date);
timestamp_difference_to_interval_month_day_nano(newer_date, older_date)
.map(Some)
}
_ => Ok(None),
})
.collect::<Result<IntervalMonthDayNanoArray>>()?;

Ok(Arc::new(result) as ArrayRef)
}
_ => Err(DataFusionError::Execution(
"AGE function requires 1 or 2 arguments".to_string(),
)),
});

let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Interval(IntervalUnit::MonthDayNano))));

ScalarUDF::new(
"age",
&Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
],
// NOTE: volatility should be `Stable` but we have no access
// to `query_execution_start_time`
Volatility::Volatile,
),
&return_type,
&fun,
)
}

fn timestamp_difference_to_interval_month_day_nano(
newer_date: NaiveDateTime,
older_date: NaiveDateTime,
) -> Result<i128> {
if newer_date == older_date {
return Ok(0);
}
let mut newer_date = newer_date;
let mut older_date = older_date;
let reverse = if older_date > newer_date {
swap(&mut newer_date, &mut older_date);
true
} else {
false
};

let years = newer_date.year() - older_date.year();
let mut months: i32 = (newer_date.month() as i32) - (older_date.month() as i32);
months += years * 12;
if newer_date.day() < older_date.day()
|| (newer_date.day() == older_date.day() && newer_date.time() < older_date.time())
{
months -= 1;
}

let offset_older_date = older_date
.checked_add_months(Months::new(months as u32))
.ok_or_else(|| DataFusionError::Execution("Cannot add months to date".to_string()))?;
let duration = newer_date - offset_older_date;
let mut days: i32 = duration
.num_days()
.try_into()
.map_err(|_| DataFusionError::Execution("Cannot convert days to i32".to_string()))?;

let offset_older_date = offset_older_date
.checked_add_days(Days::new(days as u64))
.ok_or_else(|| DataFusionError::Execution("Cannot add days to date".to_string()))?;
let duration = newer_date - offset_older_date;
let mut nanos = duration.num_nanoseconds().ok_or_else(|| {
DataFusionError::Execution("Cannot convert duration to nanoseconds".to_string())
})?;

if reverse {
months = -months;
days = -days;
nanos = -nanos;
}
let result = IntervalMonthDayNanoType::make_value(months, days, nanos);
Ok(result)
}

pub fn create_udf_stub(
name: &'static str,
type_signature: TypeSignature,
Expand Down Expand Up @@ -3987,13 +4114,6 @@ pub fn register_fun_stubs(mut ctx: SessionContext) -> SessionContext {
// NOTE: lack of "rettyp" implies "type of first arg"
register_fun_stub!(udf, "acosd", tsig = [Float64], rettyp = Float64);
register_fun_stub!(udf, "acosh", tsig = [Float64], rettyp = Float64);
register_fun_stub!(
udf,
"age",
tsigs = [[Timestamp], [Timestamp, Timestamp],],
rettyp = Interval,
vol = Stable
);
register_fun_stub!(udf, "asind", tsig = [Float64], rettyp = Float64);
register_fun_stub!(udf, "asinh", tsig = [Float64], rettyp = Float64);
register_fun_stub!(udf, "atan2", tsig = [Float64, Float64], rettyp = Float64);
Expand Down
1 change: 1 addition & 0 deletions rust/cubesql/cubesql/src/compile/query_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ impl QueryEngine for SqlQueryEngine {
ctx.register_udf(create_to_regtype_udf());
ctx.register_udf(create_pg_get_indexdef_udf());
ctx.register_udf(create_inet_server_addr_udf());
ctx.register_udf(create_age_udf());

// udaf
ctx.register_udaf(create_measure_udaf());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
source: cubesql/src/compile/test/test_udfs.rs
expression: "execute_query(r#\"SELECT AGE('2025-06-09T10:09:45'::timestamp, DATE '2023-08-10') AS age;\"#.to_string(),\nDatabaseProtocol::PostgreSQL).await?"
---
+---------------------------------------------------+
| age |
+---------------------------------------------------+
| 1 years 9 mons 30 days 10 hours 9 mins 45.00 secs |
+---------------------------------------------------+
17 changes: 17 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_udfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1310,3 +1310,20 @@ async fn test_extension_udf_xirr() -> Result<(), CubeError> {

Ok(())
}

#[tokio::test]
async fn test_age() -> Result<(), CubeError> {
init_testing_logger();

insta::assert_snapshot!(
"age",
execute_query(
r#"SELECT AGE('2025-06-09T10:09:45'::timestamp, DATE '2023-08-10') AS age;"#
.to_string(),
DatabaseProtocol::PostgreSQL
)
.await?
);

Ok(())
}
Loading