11use std:: {
22 any:: type_name,
3+ convert:: TryInto ,
4+ mem:: swap,
35 sync:: { Arc , LazyLock } ,
46 thread,
57} ;
68
7- use chrono:: { Datelike , Days , Duration , Months , NaiveDate , NaiveDateTime , NaiveTime } ;
9+ use chrono:: { Datelike , Days , Duration , Months , NaiveDate , NaiveDateTime , NaiveTime , Utc } ;
810use datafusion:: {
911 arrow:: {
1012 array:: {
@@ -21,6 +23,7 @@ use datafusion::{
2123 IntervalDayTimeType , IntervalMonthDayNanoType , IntervalUnit , IntervalYearMonthType ,
2224 TimeUnit , TimestampNanosecondType , UInt32Type ,
2325 } ,
26+ temporal_conversions:: timestamp_ns_to_datetime,
2427 } ,
2528 error:: { DataFusionError , Result } ,
2629 execution:: context:: SessionContext ,
@@ -3752,6 +3755,130 @@ pub fn create_pg_get_indexdef_udf() -> ScalarUDF {
37523755 )
37533756}
37543757
3758+ pub fn create_age_udf ( ) -> ScalarUDF {
3759+ let fun = make_scalar_function ( move |args : & [ ArrayRef ] | match args. len ( ) {
3760+ 1 => {
3761+ let older_dates =
3762+ downcast_primitive_arg ! ( args[ 0 ] , "older_date" , TimestampNanosecondType ) ;
3763+ let current_date = Utc :: now ( ) . date_naive ( ) . and_time ( NaiveTime :: default ( ) ) ;
3764+
3765+ let result = older_dates
3766+ . iter ( )
3767+ . map ( |older_date| {
3768+ older_date
3769+ . map ( |older_date| {
3770+ let older_date = timestamp_ns_to_datetime ( older_date) ;
3771+ timestamp_difference_to_interval_month_day_nano (
3772+ current_date,
3773+ older_date,
3774+ )
3775+ } )
3776+ . transpose ( )
3777+ } )
3778+ . collect :: < Result < IntervalMonthDayNanoArray > > ( ) ?;
3779+
3780+ Ok ( Arc :: new ( result) as ArrayRef )
3781+ }
3782+ 2 => {
3783+ let newer_dates =
3784+ downcast_primitive_arg ! ( args[ 0 ] , "newer_date" , TimestampNanosecondType ) ;
3785+ let older_dates =
3786+ downcast_primitive_arg ! ( args[ 1 ] , "older_date" , TimestampNanosecondType ) ;
3787+
3788+ let result = newer_dates
3789+ . iter ( )
3790+ . zip ( older_dates)
3791+ . map ( |dates| match dates {
3792+ ( Some ( newer_date) , Some ( older_date) ) => {
3793+ let newer_date = timestamp_ns_to_datetime ( newer_date) ;
3794+ let older_date = timestamp_ns_to_datetime ( older_date) ;
3795+ timestamp_difference_to_interval_month_day_nano ( newer_date, older_date)
3796+ . map ( Some )
3797+ }
3798+ _ => Ok ( None ) ,
3799+ } )
3800+ . collect :: < Result < IntervalMonthDayNanoArray > > ( ) ?;
3801+
3802+ Ok ( Arc :: new ( result) as ArrayRef )
3803+ }
3804+ _ => Err ( DataFusionError :: Execution (
3805+ "AGE function requires 1 or 2 arguments" . to_string ( ) ,
3806+ ) ) ,
3807+ } ) ;
3808+
3809+ let return_type: ReturnTypeFunction =
3810+ Arc :: new ( move |_| Ok ( Arc :: new ( DataType :: Interval ( IntervalUnit :: MonthDayNano ) ) ) ) ;
3811+
3812+ ScalarUDF :: new (
3813+ "age" ,
3814+ & Signature :: one_of (
3815+ vec ! [
3816+ TypeSignature :: Exact ( vec![ DataType :: Timestamp ( TimeUnit :: Nanosecond , None ) ] ) ,
3817+ TypeSignature :: Exact ( vec![
3818+ DataType :: Timestamp ( TimeUnit :: Nanosecond , None ) ,
3819+ DataType :: Timestamp ( TimeUnit :: Nanosecond , None ) ,
3820+ ] ) ,
3821+ ] ,
3822+ // NOTE: volatility should be `Stable` but we have no access
3823+ // to `query_execution_start_time`
3824+ Volatility :: Volatile ,
3825+ ) ,
3826+ & return_type,
3827+ & fun,
3828+ )
3829+ }
3830+
3831+ fn timestamp_difference_to_interval_month_day_nano (
3832+ newer_date : NaiveDateTime ,
3833+ older_date : NaiveDateTime ,
3834+ ) -> Result < i128 > {
3835+ if newer_date == older_date {
3836+ return Ok ( 0 ) ;
3837+ }
3838+ let mut newer_date = newer_date;
3839+ let mut older_date = older_date;
3840+ let reverse = if older_date > newer_date {
3841+ swap ( & mut newer_date, & mut older_date) ;
3842+ true
3843+ } else {
3844+ false
3845+ } ;
3846+
3847+ let years = newer_date. year ( ) - older_date. year ( ) ;
3848+ let mut months: i32 = ( newer_date. month ( ) as i32 ) - ( older_date. month ( ) as i32 ) ;
3849+ months += years * 12 ;
3850+ if newer_date. day ( ) < older_date. day ( )
3851+ || ( newer_date. day ( ) == older_date. day ( ) && newer_date. time ( ) < older_date. time ( ) )
3852+ {
3853+ months -= 1 ;
3854+ }
3855+
3856+ let offset_older_date = older_date
3857+ . checked_add_months ( Months :: new ( months as u32 ) )
3858+ . ok_or_else ( || DataFusionError :: Execution ( "Cannot add months to date" . to_string ( ) ) ) ?;
3859+ let duration = newer_date - offset_older_date;
3860+ let mut days: i32 = duration
3861+ . num_days ( )
3862+ . try_into ( )
3863+ . map_err ( |_| DataFusionError :: Execution ( "Cannot convert days to i32" . to_string ( ) ) ) ?;
3864+
3865+ let offset_older_date = offset_older_date
3866+ . checked_add_days ( Days :: new ( days as u64 ) )
3867+ . ok_or_else ( || DataFusionError :: Execution ( "Cannot add days to date" . to_string ( ) ) ) ?;
3868+ let duration = newer_date - offset_older_date;
3869+ let mut nanos = duration. num_nanoseconds ( ) . ok_or_else ( || {
3870+ DataFusionError :: Execution ( "Cannot convert duration to nanoseconds" . to_string ( ) )
3871+ } ) ?;
3872+
3873+ if reverse {
3874+ months = -months;
3875+ days = -days;
3876+ nanos = -nanos;
3877+ }
3878+ let result = IntervalMonthDayNanoType :: make_value ( months, days, nanos) ;
3879+ Ok ( result)
3880+ }
3881+
37553882pub fn create_udf_stub (
37563883 name : & ' static str ,
37573884 type_signature : TypeSignature ,
@@ -3987,13 +4114,6 @@ pub fn register_fun_stubs(mut ctx: SessionContext) -> SessionContext {
39874114 // NOTE: lack of "rettyp" implies "type of first arg"
39884115 register_fun_stub ! ( udf, "acosd" , tsig = [ Float64 ] , rettyp = Float64 ) ;
39894116 register_fun_stub ! ( udf, "acosh" , tsig = [ Float64 ] , rettyp = Float64 ) ;
3990- register_fun_stub ! (
3991- udf,
3992- "age" ,
3993- tsigs = [ [ Timestamp ] , [ Timestamp , Timestamp ] , ] ,
3994- rettyp = Interval ,
3995- vol = Stable
3996- ) ;
39974117 register_fun_stub ! ( udf, "asind" , tsig = [ Float64 ] , rettyp = Float64 ) ;
39984118 register_fun_stub ! ( udf, "asinh" , tsig = [ Float64 ] , rettyp = Float64 ) ;
39994119 register_fun_stub ! ( udf, "atan2" , tsig = [ Float64 , Float64 ] , rettyp = Float64 ) ;
0 commit comments