Skip to content

Commit ab76c24

Browse files
MazterQyouFrank-TXS
authored andcommitted
feat(cubesql): Support AGE function (cube-js#9734)
1 parent b35f9fd commit ab76c24

File tree

4 files changed

+155
-8
lines changed

4 files changed

+155
-8
lines changed

rust/cubesql/cubesql/src/compile/engine/udf/common.rs

Lines changed: 128 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use std::{
22
any::type_name,
3+
convert::TryInto,
4+
mem::swap,
35
sync::{Arc, LazyLock},
46
thread,
57
};
68

7-
use chrono::{Datelike, Days, Duration, Months, NaiveDate, NaiveDateTime, NaiveTime};
9+
use chrono::{Datelike, Days, Duration, Months, NaiveDate, NaiveDateTime, NaiveTime, Utc};
810
use datafusion::{
911
arrow::{
1012
array::{
@@ -21,6 +23,7 @@ use datafusion::{
2123
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType,
2224
TimeUnit, TimestampNanosecondType, UInt32Type,
2325
},
26+
temporal_conversions::timestamp_ns_to_datetime,
2427
},
2528
error::{DataFusionError, Result},
2629
execution::context::SessionContext,
@@ -3752,6 +3755,130 @@ pub fn create_pg_get_indexdef_udf() -> ScalarUDF {
37523755
)
37533756
}
37543757

3758+
pub fn create_age_udf() -> ScalarUDF {
3759+
let fun = make_scalar_function(move |args: &[ArrayRef]| match args.len() {
3760+
1 => {
3761+
let older_dates =
3762+
downcast_primitive_arg!(args[0], "older_date", TimestampNanosecondType);
3763+
let current_date = Utc::now().date_naive().and_time(NaiveTime::default());
3764+
3765+
let result = older_dates
3766+
.iter()
3767+
.map(|older_date| {
3768+
older_date
3769+
.map(|older_date| {
3770+
let older_date = timestamp_ns_to_datetime(older_date);
3771+
timestamp_difference_to_interval_month_day_nano(
3772+
current_date,
3773+
older_date,
3774+
)
3775+
})
3776+
.transpose()
3777+
})
3778+
.collect::<Result<IntervalMonthDayNanoArray>>()?;
3779+
3780+
Ok(Arc::new(result) as ArrayRef)
3781+
}
3782+
2 => {
3783+
let newer_dates =
3784+
downcast_primitive_arg!(args[0], "newer_date", TimestampNanosecondType);
3785+
let older_dates =
3786+
downcast_primitive_arg!(args[1], "older_date", TimestampNanosecondType);
3787+
3788+
let result = newer_dates
3789+
.iter()
3790+
.zip(older_dates)
3791+
.map(|dates| match dates {
3792+
(Some(newer_date), Some(older_date)) => {
3793+
let newer_date = timestamp_ns_to_datetime(newer_date);
3794+
let older_date = timestamp_ns_to_datetime(older_date);
3795+
timestamp_difference_to_interval_month_day_nano(newer_date, older_date)
3796+
.map(Some)
3797+
}
3798+
_ => Ok(None),
3799+
})
3800+
.collect::<Result<IntervalMonthDayNanoArray>>()?;
3801+
3802+
Ok(Arc::new(result) as ArrayRef)
3803+
}
3804+
_ => Err(DataFusionError::Execution(
3805+
"AGE function requires 1 or 2 arguments".to_string(),
3806+
)),
3807+
});
3808+
3809+
let return_type: ReturnTypeFunction =
3810+
Arc::new(move |_| Ok(Arc::new(DataType::Interval(IntervalUnit::MonthDayNano))));
3811+
3812+
ScalarUDF::new(
3813+
"age",
3814+
&Signature::one_of(
3815+
vec![
3816+
TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]),
3817+
TypeSignature::Exact(vec![
3818+
DataType::Timestamp(TimeUnit::Nanosecond, None),
3819+
DataType::Timestamp(TimeUnit::Nanosecond, None),
3820+
]),
3821+
],
3822+
// NOTE: volatility should be `Stable` but we have no access
3823+
// to `query_execution_start_time`
3824+
Volatility::Volatile,
3825+
),
3826+
&return_type,
3827+
&fun,
3828+
)
3829+
}
3830+
3831+
fn timestamp_difference_to_interval_month_day_nano(
3832+
newer_date: NaiveDateTime,
3833+
older_date: NaiveDateTime,
3834+
) -> Result<i128> {
3835+
if newer_date == older_date {
3836+
return Ok(0);
3837+
}
3838+
let mut newer_date = newer_date;
3839+
let mut older_date = older_date;
3840+
let reverse = if older_date > newer_date {
3841+
swap(&mut newer_date, &mut older_date);
3842+
true
3843+
} else {
3844+
false
3845+
};
3846+
3847+
let years = newer_date.year() - older_date.year();
3848+
let mut months: i32 = (newer_date.month() as i32) - (older_date.month() as i32);
3849+
months += years * 12;
3850+
if newer_date.day() < older_date.day()
3851+
|| (newer_date.day() == older_date.day() && newer_date.time() < older_date.time())
3852+
{
3853+
months -= 1;
3854+
}
3855+
3856+
let offset_older_date = older_date
3857+
.checked_add_months(Months::new(months as u32))
3858+
.ok_or_else(|| DataFusionError::Execution("Cannot add months to date".to_string()))?;
3859+
let duration = newer_date - offset_older_date;
3860+
let mut days: i32 = duration
3861+
.num_days()
3862+
.try_into()
3863+
.map_err(|_| DataFusionError::Execution("Cannot convert days to i32".to_string()))?;
3864+
3865+
let offset_older_date = offset_older_date
3866+
.checked_add_days(Days::new(days as u64))
3867+
.ok_or_else(|| DataFusionError::Execution("Cannot add days to date".to_string()))?;
3868+
let duration = newer_date - offset_older_date;
3869+
let mut nanos = duration.num_nanoseconds().ok_or_else(|| {
3870+
DataFusionError::Execution("Cannot convert duration to nanoseconds".to_string())
3871+
})?;
3872+
3873+
if reverse {
3874+
months = -months;
3875+
days = -days;
3876+
nanos = -nanos;
3877+
}
3878+
let result = IntervalMonthDayNanoType::make_value(months, days, nanos);
3879+
Ok(result)
3880+
}
3881+
37553882
pub fn create_udf_stub(
37563883
name: &'static str,
37573884
type_signature: TypeSignature,
@@ -3987,13 +4114,6 @@ pub fn register_fun_stubs(mut ctx: SessionContext) -> SessionContext {
39874114
// NOTE: lack of "rettyp" implies "type of first arg"
39884115
register_fun_stub!(udf, "acosd", tsig = [Float64], rettyp = Float64);
39894116
register_fun_stub!(udf, "acosh", tsig = [Float64], rettyp = Float64);
3990-
register_fun_stub!(
3991-
udf,
3992-
"age",
3993-
tsigs = [[Timestamp], [Timestamp, Timestamp],],
3994-
rettyp = Interval,
3995-
vol = Stable
3996-
);
39974117
register_fun_stub!(udf, "asind", tsig = [Float64], rettyp = Float64);
39984118
register_fun_stub!(udf, "asinh", tsig = [Float64], rettyp = Float64);
39994119
register_fun_stub!(udf, "atan2", tsig = [Float64, Float64], rettyp = Float64);

rust/cubesql/cubesql/src/compile/query_engine.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ impl QueryEngine for SqlQueryEngine {
512512
ctx.register_udf(create_to_regtype_udf());
513513
ctx.register_udf(create_pg_get_indexdef_udf());
514514
ctx.register_udf(create_inet_server_addr_udf());
515+
ctx.register_udf(create_age_udf());
515516

516517
// udaf
517518
ctx.register_udaf(create_measure_udaf());
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
source: cubesql/src/compile/test/test_udfs.rs
3+
expression: "execute_query(r#\"SELECT AGE('2025-06-09T10:09:45'::timestamp, DATE '2023-08-10') AS age;\"#.to_string(),\nDatabaseProtocol::PostgreSQL).await?"
4+
---
5+
+---------------------------------------------------+
6+
| age |
7+
+---------------------------------------------------+
8+
| 1 years 9 mons 30 days 10 hours 9 mins 45.00 secs |
9+
+---------------------------------------------------+

rust/cubesql/cubesql/src/compile/test/test_udfs.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,3 +1310,20 @@ async fn test_extension_udf_xirr() -> Result<(), CubeError> {
13101310

13111311
Ok(())
13121312
}
1313+
1314+
#[tokio::test]
1315+
async fn test_age() -> Result<(), CubeError> {
1316+
init_testing_logger();
1317+
1318+
insta::assert_snapshot!(
1319+
"age",
1320+
execute_query(
1321+
r#"SELECT AGE('2025-06-09T10:09:45'::timestamp, DATE '2023-08-10') AS age;"#
1322+
.to_string(),
1323+
DatabaseProtocol::PostgreSQL
1324+
)
1325+
.await?
1326+
);
1327+
1328+
Ok(())
1329+
}

0 commit comments

Comments
 (0)