@@ -19,16 +19,18 @@ use std::any::Any;
1919use std:: sync:: Arc ;
2020
2121use crate :: datetime:: common:: * ;
22- use arrow:: array:: Float64Array ;
2322use arrow:: array:: timezone:: Tz ;
23+ use arrow:: array:: {
24+ Array , Decimal128Array , Float16Array , Float32Array , Float64Array ,
25+ TimestampNanosecondArray ,
26+ } ;
2427use arrow:: datatypes:: DataType :: * ;
2528use arrow:: datatypes:: TimeUnit :: { Microsecond , Millisecond , Nanosecond , Second } ;
2629use arrow:: datatypes:: {
2730 ArrowTimestampType , DataType , TimestampMicrosecondType , TimestampMillisecondType ,
2831 TimestampNanosecondType , TimestampSecondType ,
2932} ;
3033use datafusion_common:: config:: ConfigOptions ;
31- use datafusion_common:: format:: DEFAULT_CAST_OPTIONS ;
3234use datafusion_common:: { Result , ScalarType , ScalarValue , exec_err} ;
3335use datafusion_expr:: {
3436 ColumnarValue , Documentation , ScalarUDF , ScalarUDFImpl , Signature , Volatility ,
@@ -325,6 +327,45 @@ impl_to_timestamp_constructors!(ToTimestampMillisFunc);
325327impl_to_timestamp_constructors ! ( ToTimestampMicrosFunc ) ;
326328impl_to_timestamp_constructors ! ( ToTimestampNanosFunc ) ;
327329
330+ fn decimal_to_nanoseconds ( value : i128 , scale : i8 ) -> i64 {
331+ let nanos_exponent = 9_i16 - scale as i16 ;
332+ let timestamp_nanos = if nanos_exponent >= 0 {
333+ value * 10_i128 . pow ( nanos_exponent as u32 )
334+ } else {
335+ value / 10_i128 . pow ( nanos_exponent. unsigned_abs ( ) as u32 )
336+ } ;
337+ timestamp_nanos as i64
338+ }
339+
340+ fn decimal128_to_timestamp_nanos (
341+ arg : & ColumnarValue ,
342+ tz : Option < Arc < str > > ,
343+ ) -> Result < ColumnarValue > {
344+ match arg {
345+ ColumnarValue :: Scalar ( ScalarValue :: Decimal128 ( Some ( value) , _, scale) ) => {
346+ let timestamp_nanos = decimal_to_nanoseconds ( * value, * scale) ;
347+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond (
348+ Some ( timestamp_nanos) ,
349+ tz,
350+ ) ) )
351+ }
352+ ColumnarValue :: Scalar ( ScalarValue :: Decimal128 ( None , _, _) ) => Ok (
353+ ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond ( None , tz) ) ,
354+ ) ,
355+ ColumnarValue :: Array ( arr) => {
356+ let decimal_arr = downcast_arg ! ( arr, Decimal128Array ) ;
357+ let scale = decimal_arr. scale ( ) ;
358+ let result: TimestampNanosecondArray = decimal_arr
359+ . iter ( )
360+ . map ( |v| v. map ( |val| decimal_to_nanoseconds ( val, scale) ) )
361+ . collect ( ) ;
362+ let result = result. with_timezone_opt ( tz) ;
363+ Ok ( ColumnarValue :: Array ( Arc :: new ( result) ) )
364+ }
365+ _ => exec_err ! ( "Invalid Decimal128 value for to_timestamp" ) ,
366+ }
367+ }
368+
328369/// to_timestamp SQL function
329370///
330371/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**.
@@ -380,48 +421,39 @@ impl ScalarUDFImpl for ToTimestampFunc {
380421 let tz = self . timezone . clone ( ) ;
381422
382423 match args[ 0 ] . data_type ( ) {
383- Int32 | Int64 => args[ 0 ]
424+ Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => args[ 0 ]
384425 . cast_to ( & Timestamp ( Second , None ) , None ) ?
385426 . cast_to ( & Timestamp ( Nanosecond , tz) , None ) ,
386427 Null | Timestamp ( _, _) => args[ 0 ] . cast_to ( & Timestamp ( Nanosecond , tz) , None ) ,
428+ Float16 => {
429+ let arr = args[ 0 ] . to_array ( 1 ) ?;
430+ let f16_arr = downcast_arg ! ( & arr, Float16Array ) ;
431+ let result: TimestampNanosecondArray =
432+ f16_arr. unary ( |x| ( x. to_f64 ( ) * 1_000_000_000.0 ) as i64 ) ;
433+ Ok ( ColumnarValue :: Array ( Arc :: new ( result. with_timezone_opt ( tz) ) ) )
434+ }
435+ Float32 => {
436+ let arr = args[ 0 ] . to_array ( 1 ) ?;
437+ let f32_arr = downcast_arg ! ( & arr, Float32Array ) ;
438+ let result: TimestampNanosecondArray =
439+ f32_arr. unary ( |x| ( x as f64 * 1_000_000_000.0 ) as i64 ) ;
440+ Ok ( ColumnarValue :: Array ( Arc :: new ( result. with_timezone_opt ( tz) ) ) )
441+ }
387442 Float64 => {
388- let rescaled = arrow:: compute:: kernels:: numeric:: mul (
389- & args[ 0 ] . to_array ( 1 ) ?,
390- & arrow:: array:: Scalar :: new ( Float64Array :: from ( vec ! [
391- 1_000_000_000f64 ,
392- ] ) ) ,
393- ) ?;
394- Ok ( ColumnarValue :: Array ( arrow:: compute:: cast_with_options (
395- & rescaled,
396- & Timestamp ( Nanosecond , tz) ,
397- & DEFAULT_CAST_OPTIONS ,
398- ) ?) )
443+ let arr = args[ 0 ] . to_array ( 1 ) ?;
444+ let f64_arr = downcast_arg ! ( & arr, Float64Array ) ;
445+ let result: TimestampNanosecondArray =
446+ f64_arr. unary ( |x| ( x * 1_000_000_000.0 ) as i64 ) ;
447+ Ok ( ColumnarValue :: Array ( Arc :: new ( result. with_timezone_opt ( tz) ) ) )
448+ }
449+ Decimal32 ( _, _) | Decimal64 ( _, _) | Decimal256 ( _, _) => {
450+ let arg = args[ 0 ] . cast_to ( & Decimal128 ( 38 , 9 ) , None ) ?;
451+ decimal128_to_timestamp_nanos ( & arg, tz)
399452 }
453+ Decimal128 ( _, _) => decimal128_to_timestamp_nanos ( & args[ 0 ] , tz) ,
400454 Utf8View | LargeUtf8 | Utf8 => {
401455 to_timestamp_impl :: < TimestampNanosecondType > ( & args, "to_timestamp" , & tz)
402456 }
403- Decimal128 ( _, _) => {
404- match & args[ 0 ] {
405- ColumnarValue :: Scalar ( ScalarValue :: Decimal128 (
406- Some ( value) ,
407- _,
408- scale,
409- ) ) => {
410- // Convert decimal to seconds and nanoseconds
411- let scale_factor = 10_i128 . pow ( * scale as u32 ) ;
412- let seconds = value / scale_factor;
413- let fraction = value % scale_factor;
414- let nanos = ( fraction * 1_000_000_000 ) / scale_factor;
415- let timestamp_nanos = seconds * 1_000_000_000 + nanos;
416-
417- Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond (
418- Some ( timestamp_nanos as i64 ) ,
419- tz,
420- ) ) )
421- }
422- _ => exec_err ! ( "Invalid decimal value" ) ,
423- }
424- }
425457 other => {
426458 exec_err ! ( "Unsupported data type {other} for function to_timestamp" )
427459 }
@@ -473,9 +505,23 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc {
473505 let tz = self . timezone . clone ( ) ;
474506
475507 match args[ 0 ] . data_type ( ) {
476- Null | Int32 | Int64 | Timestamp ( _, _) | Decimal128 ( _, _) => {
477- args[ 0 ] . cast_to ( & Timestamp ( Second , tz) , None )
478- }
508+ Null
509+ | Int8
510+ | Int16
511+ | Int32
512+ | Int64
513+ | UInt8
514+ | UInt16
515+ | UInt32
516+ | UInt64
517+ | Timestamp ( _, _)
518+ | Decimal32 ( _, _)
519+ | Decimal64 ( _, _)
520+ | Decimal128 ( _, _)
521+ | Decimal256 ( _, _) => args[ 0 ] . cast_to ( & Timestamp ( Second , tz) , None ) ,
522+ Float16 | Float32 | Float64 => args[ 0 ]
523+ . cast_to ( & Int64 , None ) ?
524+ . cast_to ( & Timestamp ( Second , tz) , None ) ,
479525 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl :: < TimestampSecondType > (
480526 & args,
481527 "to_timestamp_seconds" ,
@@ -533,9 +579,25 @@ impl ScalarUDFImpl for ToTimestampMillisFunc {
533579 }
534580
535581 match args[ 0 ] . data_type ( ) {
536- Null | Int32 | Int64 | Timestamp ( _, _) => {
582+ Null
583+ | Int8
584+ | Int16
585+ | Int32
586+ | Int64
587+ | UInt8
588+ | UInt16
589+ | UInt32
590+ | UInt64
591+ | Timestamp ( _, _)
592+ | Decimal32 ( _, _)
593+ | Decimal64 ( _, _)
594+ | Decimal128 ( _, _)
595+ | Decimal256 ( _, _) => {
537596 args[ 0 ] . cast_to ( & Timestamp ( Millisecond , self . timezone . clone ( ) ) , None )
538597 }
598+ Float16 | Float32 | Float64 => args[ 0 ]
599+ . cast_to ( & Int64 , None ) ?
600+ . cast_to ( & Timestamp ( Millisecond , self . timezone . clone ( ) ) , None ) ,
539601 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl :: < TimestampMillisecondType > (
540602 & args,
541603 "to_timestamp_millis" ,
@@ -593,9 +655,25 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc {
593655 }
594656
595657 match args[ 0 ] . data_type ( ) {
596- Null | Int32 | Int64 | Timestamp ( _, _) => {
658+ Null
659+ | Int8
660+ | Int16
661+ | Int32
662+ | Int64
663+ | UInt8
664+ | UInt16
665+ | UInt32
666+ | UInt64
667+ | Timestamp ( _, _)
668+ | Decimal32 ( _, _)
669+ | Decimal64 ( _, _)
670+ | Decimal128 ( _, _)
671+ | Decimal256 ( _, _) => {
597672 args[ 0 ] . cast_to ( & Timestamp ( Microsecond , self . timezone . clone ( ) ) , None )
598673 }
674+ Float16 | Float32 | Float64 => args[ 0 ]
675+ . cast_to ( & Int64 , None ) ?
676+ . cast_to ( & Timestamp ( Microsecond , self . timezone . clone ( ) ) , None ) ,
599677 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl :: < TimestampMicrosecondType > (
600678 & args,
601679 "to_timestamp_micros" ,
@@ -653,9 +731,25 @@ impl ScalarUDFImpl for ToTimestampNanosFunc {
653731 }
654732
655733 match args[ 0 ] . data_type ( ) {
656- Null | Int32 | Int64 | Timestamp ( _, _) => {
734+ Null
735+ | Int8
736+ | Int16
737+ | Int32
738+ | Int64
739+ | UInt8
740+ | UInt16
741+ | UInt32
742+ | UInt64
743+ | Timestamp ( _, _)
744+ | Decimal32 ( _, _)
745+ | Decimal64 ( _, _)
746+ | Decimal128 ( _, _)
747+ | Decimal256 ( _, _) => {
657748 args[ 0 ] . cast_to ( & Timestamp ( Nanosecond , self . timezone . clone ( ) ) , None )
658749 }
750+ Float16 | Float32 | Float64 => args[ 0 ]
751+ . cast_to ( & Int64 , None ) ?
752+ . cast_to ( & Timestamp ( Nanosecond , self . timezone . clone ( ) ) , None ) ,
659753 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl :: < TimestampNanosecondType > (
660754 & args,
661755 "to_timestamp_nanos" ,
@@ -1735,4 +1829,23 @@ mod tests {
17351829 assert_contains ! ( actual, expected) ;
17361830 }
17371831 }
1832+
1833+ #[ test]
1834+ fn test_decimal_to_nanoseconds_negative_scale ( ) {
1835+ // scale -2: internal value 5 represents 5 * 10^2 = 500 seconds
1836+ let nanos = decimal_to_nanoseconds ( 5 , -2 ) ;
1837+ assert_eq ! ( nanos, 500_000_000_000 ) ; // 500 seconds in nanoseconds
1838+
1839+ // scale -1: internal value 10 represents 10 * 10^1 = 100 seconds
1840+ let nanos = decimal_to_nanoseconds ( 10 , -1 ) ;
1841+ assert_eq ! ( nanos, 100_000_000_000 ) ;
1842+
1843+ // scale 0: internal value 5 represents 5 seconds
1844+ let nanos = decimal_to_nanoseconds ( 5 , 0 ) ;
1845+ assert_eq ! ( nanos, 5_000_000_000 ) ;
1846+
1847+ // scale 3: internal value 1500 represents 1.5 seconds
1848+ let nanos = decimal_to_nanoseconds ( 1500 , 3 ) ;
1849+ assert_eq ! ( nanos, 1_500_000_000 ) ;
1850+ }
17381851}
0 commit comments