diff --git a/rust/cubesql/cubesql/src/compile/engine/udf/common.rs b/rust/cubesql/cubesql/src/compile/engine/udf/common.rs index 096ea843f7f0e..e75583b667099 100644 --- a/rust/cubesql/cubesql/src/compile/engine/udf/common.rs +++ b/rust/cubesql/cubesql/src/compile/engine/udf/common.rs @@ -1,10 +1,12 @@ use std::{ any::type_name, + convert::TryInto, + mem::swap, sync::{Arc, LazyLock}, thread, }; -use chrono::{Datelike, Days, Duration, Months, NaiveDate, NaiveDateTime, NaiveTime}; +use chrono::{Datelike, Days, Duration, Months, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use datafusion::{ arrow::{ array::{ @@ -21,6 +23,7 @@ use datafusion::{ IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit, TimestampNanosecondType, UInt32Type, }, + temporal_conversions::timestamp_ns_to_datetime, }, error::{DataFusionError, Result}, execution::context::SessionContext, @@ -3752,6 +3755,130 @@ pub fn create_pg_get_indexdef_udf() -> ScalarUDF { ) } +pub fn create_age_udf() -> ScalarUDF { + let fun = make_scalar_function(move |args: &[ArrayRef]| match args.len() { + 1 => { + let older_dates = + downcast_primitive_arg!(args[0], "older_date", TimestampNanosecondType); + let current_date = Utc::now().date_naive().and_time(NaiveTime::default()); + + let result = older_dates + .iter() + .map(|older_date| { + older_date + .map(|older_date| { + let older_date = timestamp_ns_to_datetime(older_date); + timestamp_difference_to_interval_month_day_nano( + current_date, + older_date, + ) + }) + .transpose() + }) + .collect::>()?; + + Ok(Arc::new(result) as ArrayRef) + } + 2 => { + let newer_dates = + downcast_primitive_arg!(args[0], "newer_date", TimestampNanosecondType); + let older_dates = + downcast_primitive_arg!(args[1], "older_date", TimestampNanosecondType); + + let result = newer_dates + .iter() + .zip(older_dates) + .map(|dates| match dates { + (Some(newer_date), Some(older_date)) => { + let newer_date = timestamp_ns_to_datetime(newer_date); + let older_date = timestamp_ns_to_datetime(older_date); + timestamp_difference_to_interval_month_day_nano(newer_date, older_date) + .map(Some) + } + _ => Ok(None), + }) + .collect::>()?; + + Ok(Arc::new(result) as ArrayRef) + } + _ => Err(DataFusionError::Execution( + "AGE function requires 1 or 2 arguments".to_string(), + )), + }); + + let return_type: ReturnTypeFunction = + Arc::new(move |_| Ok(Arc::new(DataType::Interval(IntervalUnit::MonthDayNano)))); + + ScalarUDF::new( + "age", + &Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Timestamp(TimeUnit::Nanosecond, None)]), + TypeSignature::Exact(vec![ + DataType::Timestamp(TimeUnit::Nanosecond, None), + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), + ], + // NOTE: volatility should be `Stable` but we have no access + // to `query_execution_start_time` + Volatility::Volatile, + ), + &return_type, + &fun, + ) +} + +fn timestamp_difference_to_interval_month_day_nano( + newer_date: NaiveDateTime, + older_date: NaiveDateTime, +) -> Result { + if newer_date == older_date { + return Ok(0); + } + let mut newer_date = newer_date; + let mut older_date = older_date; + let reverse = if older_date > newer_date { + swap(&mut newer_date, &mut older_date); + true + } else { + false + }; + + let years = newer_date.year() - older_date.year(); + let mut months: i32 = (newer_date.month() as i32) - (older_date.month() as i32); + months += years * 12; + if newer_date.day() < older_date.day() + || (newer_date.day() == older_date.day() && newer_date.time() < older_date.time()) + { + months -= 1; + } + + let offset_older_date = older_date + .checked_add_months(Months::new(months as u32)) + .ok_or_else(|| DataFusionError::Execution("Cannot add months to date".to_string()))?; + let duration = newer_date - offset_older_date; + let mut days: i32 = duration + .num_days() + .try_into() + .map_err(|_| DataFusionError::Execution("Cannot convert days to i32".to_string()))?; + + let offset_older_date = offset_older_date + .checked_add_days(Days::new(days as u64)) + .ok_or_else(|| DataFusionError::Execution("Cannot add days to date".to_string()))?; + let duration = newer_date - offset_older_date; + let mut nanos = duration.num_nanoseconds().ok_or_else(|| { + DataFusionError::Execution("Cannot convert duration to nanoseconds".to_string()) + })?; + + if reverse { + months = -months; + days = -days; + nanos = -nanos; + } + let result = IntervalMonthDayNanoType::make_value(months, days, nanos); + Ok(result) +} + pub fn create_udf_stub( name: &'static str, type_signature: TypeSignature, @@ -3987,13 +4114,6 @@ pub fn register_fun_stubs(mut ctx: SessionContext) -> SessionContext { // NOTE: lack of "rettyp" implies "type of first arg" register_fun_stub!(udf, "acosd", tsig = [Float64], rettyp = Float64); register_fun_stub!(udf, "acosh", tsig = [Float64], rettyp = Float64); - register_fun_stub!( - udf, - "age", - tsigs = [[Timestamp], [Timestamp, Timestamp],], - rettyp = Interval, - vol = Stable - ); register_fun_stub!(udf, "asind", tsig = [Float64], rettyp = Float64); register_fun_stub!(udf, "asinh", tsig = [Float64], rettyp = Float64); register_fun_stub!(udf, "atan2", tsig = [Float64, Float64], rettyp = Float64); diff --git a/rust/cubesql/cubesql/src/compile/query_engine.rs b/rust/cubesql/cubesql/src/compile/query_engine.rs index d9dedc0ffe76f..5c43baeebb1a7 100644 --- a/rust/cubesql/cubesql/src/compile/query_engine.rs +++ b/rust/cubesql/cubesql/src/compile/query_engine.rs @@ -512,6 +512,7 @@ impl QueryEngine for SqlQueryEngine { ctx.register_udf(create_to_regtype_udf()); ctx.register_udf(create_pg_get_indexdef_udf()); ctx.register_udf(create_inet_server_addr_udf()); + ctx.register_udf(create_age_udf()); // udaf ctx.register_udaf(create_measure_udaf()); diff --git a/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__age.snap b/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__age.snap new file mode 100644 index 0000000000000..a62f3b2d3b670 --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__age.snap @@ -0,0 +1,9 @@ +--- +source: cubesql/src/compile/test/test_udfs.rs +expression: "execute_query(r#\"SELECT AGE('2025-06-09T10:09:45'::timestamp, DATE '2023-08-10') AS age;\"#.to_string(),\nDatabaseProtocol::PostgreSQL).await?" +--- ++---------------------------------------------------+ +| age | ++---------------------------------------------------+ +| 1 years 9 mons 30 days 10 hours 9 mins 45.00 secs | ++---------------------------------------------------+ diff --git a/rust/cubesql/cubesql/src/compile/test/test_udfs.rs b/rust/cubesql/cubesql/src/compile/test/test_udfs.rs index e4e58ca0815af..37e884adef88b 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_udfs.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_udfs.rs @@ -1310,3 +1310,20 @@ async fn test_extension_udf_xirr() -> Result<(), CubeError> { Ok(()) } + +#[tokio::test] +async fn test_age() -> Result<(), CubeError> { + init_testing_logger(); + + insta::assert_snapshot!( + "age", + execute_query( + r#"SELECT AGE('2025-06-09T10:09:45'::timestamp, DATE '2023-08-10') AS age;"# + .to_string(), + DatabaseProtocol::PostgreSQL + ) + .await? + ); + + Ok(()) +}