@@ -20,7 +20,7 @@ use std::ops::Add;
2020use std:: sync:: Arc ;
2121
2222use arrow:: array:: timezone:: Tz ;
23- use arrow:: array:: { Array , ArrayRef , PrimitiveBuilder } ;
23+ use arrow:: array:: { ArrayRef , PrimitiveBuilder } ;
2424use arrow:: datatypes:: DataType :: Timestamp ;
2525use arrow:: datatypes:: TimeUnit :: { Microsecond , Millisecond , Nanosecond , Second } ;
2626use arrow:: datatypes:: {
@@ -31,11 +31,12 @@ use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
3131
3232use datafusion_common:: cast:: as_primitive_array;
3333use datafusion_common:: {
34- exec_err, internal_datafusion_err, plan_err , utils:: take_function_args, Result ,
34+ exec_err, internal_datafusion_err, internal_err , utils:: take_function_args, Result ,
3535 ScalarValue ,
3636} ;
3737use datafusion_expr:: {
38- ColumnarValue , Documentation , ScalarUDFImpl , Signature , Volatility ,
38+ Coercion , ColumnarValue , Documentation , ScalarUDFImpl , Signature , TypeSignatureClass ,
39+ Volatility ,
3940} ;
4041use datafusion_macros:: user_doc;
4142
@@ -111,133 +112,163 @@ impl Default for ToLocalTimeFunc {
111112impl ToLocalTimeFunc {
112113 pub fn new ( ) -> Self {
113114 Self {
114- signature : Signature :: user_defined ( Volatility :: Immutable ) ,
115+ signature : Signature :: coercible (
116+ vec ! [ Coercion :: new_exact( TypeSignatureClass :: Timestamp ) ] ,
117+ Volatility :: Immutable ,
118+ ) ,
115119 }
116120 }
121+ }
117122
118- fn to_local_time ( & self , args : & [ ColumnarValue ] ) -> Result < ColumnarValue > {
119- let [ time_value] = take_function_args ( self . name ( ) , args) ?;
123+ impl ScalarUDFImpl for ToLocalTimeFunc {
124+ fn as_any ( & self ) -> & dyn Any {
125+ self
126+ }
120127
121- let arg_type = time_value. data_type ( ) ;
122- match arg_type {
123- Timestamp ( _, None ) => {
124- // if no timezone specified, just return the input
125- Ok ( time_value. clone ( ) )
126- }
127- // If has timezone, adjust the underlying time value. The current time value
128- // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
129- // we need to adjust the time value to the local time. See [`adjust_to_local_time`]
130- // for more details.
131- //
132- // Then remove the timezone in return type, i.e. return None
133- Timestamp ( _, Some ( timezone) ) => {
134- let tz: Tz = timezone. parse ( ) ?;
135-
136- match time_value {
137- ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond (
138- Some ( ts) ,
139- Some ( _) ,
140- ) ) => {
141- let adjusted_ts =
142- adjust_to_local_time :: < TimestampNanosecondType > ( * ts, tz) ?;
143- Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond (
144- Some ( adjusted_ts) ,
145- None ,
146- ) ) )
147- }
148- ColumnarValue :: Scalar ( ScalarValue :: TimestampMicrosecond (
149- Some ( ts) ,
150- Some ( _) ,
151- ) ) => {
152- let adjusted_ts =
153- adjust_to_local_time :: < TimestampMicrosecondType > ( * ts, tz) ?;
154- Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampMicrosecond (
155- Some ( adjusted_ts) ,
156- None ,
157- ) ) )
158- }
159- ColumnarValue :: Scalar ( ScalarValue :: TimestampMillisecond (
160- Some ( ts) ,
161- Some ( _) ,
162- ) ) => {
163- let adjusted_ts =
164- adjust_to_local_time :: < TimestampMillisecondType > ( * ts, tz) ?;
165- Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampMillisecond (
166- Some ( adjusted_ts) ,
167- None ,
168- ) ) )
169- }
170- ColumnarValue :: Scalar ( ScalarValue :: TimestampSecond (
171- Some ( ts) ,
172- Some ( _) ,
173- ) ) => {
174- let adjusted_ts =
175- adjust_to_local_time :: < TimestampSecondType > ( * ts, tz) ?;
176- Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampSecond (
177- Some ( adjusted_ts) ,
178- None ,
179- ) ) )
180- }
181- ColumnarValue :: Array ( array) => {
182- fn transform_array < T : ArrowTimestampType > (
183- array : & ArrayRef ,
184- tz : Tz ,
185- ) -> Result < ColumnarValue > {
186- let mut builder = PrimitiveBuilder :: < T > :: new ( ) ;
187-
188- let primitive_array = as_primitive_array :: < T > ( array) ?;
189- for ts_opt in primitive_array. iter ( ) {
190- match ts_opt {
191- None => builder. append_null ( ) ,
192- Some ( ts) => {
193- let adjusted_ts: i64 =
194- adjust_to_local_time :: < T > ( ts, tz) ?;
195- builder. append_value ( adjusted_ts)
196- }
197- }
198- }
199-
200- Ok ( ColumnarValue :: Array ( Arc :: new ( builder. finish ( ) ) ) )
201- }
202-
203- match array. data_type ( ) {
204- Timestamp ( _, None ) => {
205- // if no timezone specified, just return the input
206- Ok ( time_value. clone ( ) )
207- }
208- Timestamp ( Nanosecond , Some ( _) ) => {
209- transform_array :: < TimestampNanosecondType > ( array, tz)
210- }
211- Timestamp ( Microsecond , Some ( _) ) => {
212- transform_array :: < TimestampMicrosecondType > ( array, tz)
213- }
214- Timestamp ( Millisecond , Some ( _) ) => {
215- transform_array :: < TimestampMillisecondType > ( array, tz)
216- }
217- Timestamp ( Second , Some ( _) ) => {
218- transform_array :: < TimestampSecondType > ( array, tz)
219- }
220- _ => {
221- exec_err ! ( "to_local_time function requires timestamp argument in array, got {:?}" , array. data_type( ) )
222- }
223- }
224- }
225- _ => {
226- exec_err ! (
227- "to_local_time function requires timestamp argument, got {:?}" ,
228- time_value. data_type( )
229- )
230- }
231- }
232- }
233- _ => {
234- exec_err ! (
235- "to_local_time function requires timestamp argument, got {:?}" ,
236- arg_type
237- )
128+ fn name ( & self ) -> & str {
129+ "to_local_time"
130+ }
131+
132+ fn signature ( & self ) -> & Signature {
133+ & self . signature
134+ }
135+
136+ fn return_type ( & self , arg_types : & [ DataType ] ) -> Result < DataType > {
137+ match & arg_types[ 0 ] {
138+ DataType :: Null => Ok ( Timestamp ( Nanosecond , None ) ) ,
139+ Timestamp ( timeunit, _) => Ok ( Timestamp ( * timeunit, None ) ) ,
140+ dt => internal_err ! (
141+ "The to_local_time function can only accept timestamp as the arg, got {dt}"
142+ ) ,
143+ }
144+ }
145+
146+ fn invoke_with_args (
147+ & self ,
148+ args : datafusion_expr:: ScalarFunctionArgs ,
149+ ) -> Result < ColumnarValue > {
150+ let [ time_value] = take_function_args ( self . name ( ) , & args. args ) ?;
151+ to_local_time ( time_value)
152+ }
153+
154+ fn documentation ( & self ) -> Option < & Documentation > {
155+ self . doc ( )
156+ }
157+ }
158+
159+ fn transform_array < T : ArrowTimestampType > (
160+ array : & ArrayRef ,
161+ tz : Tz ,
162+ ) -> Result < ColumnarValue > {
163+ let primitive_array = as_primitive_array :: < T > ( array) ?;
164+ let mut builder = PrimitiveBuilder :: < T > :: with_capacity ( primitive_array. len ( ) ) ;
165+ for ts_opt in primitive_array. iter ( ) {
166+ match ts_opt {
167+ None => builder. append_null ( ) ,
168+ Some ( ts) => {
169+ let adjusted_ts: i64 = adjust_to_local_time :: < T > ( ts, tz) ?;
170+ builder. append_value ( adjusted_ts)
238171 }
239172 }
240173 }
174+
175+ Ok ( ColumnarValue :: Array ( Arc :: new ( builder. finish ( ) ) ) )
176+ }
177+
178+ fn to_local_time ( time_value : & ColumnarValue ) -> Result < ColumnarValue > {
179+ let arg_type = time_value. data_type ( ) ;
180+
181+ let tz: Tz = match & arg_type {
182+ Timestamp ( _, Some ( timezone) ) => timezone. parse ( ) ?,
183+ Timestamp ( _, None ) => {
184+ // if no timezone specified, just return the input
185+ return Ok ( time_value. clone ( ) ) ;
186+ }
187+ DataType :: Null => {
188+ return Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond (
189+ None , None ,
190+ ) ) ) ;
191+ }
192+ dt => {
193+ return internal_err ! (
194+ "to_local_time function requires timestamp argument, got {dt}"
195+ )
196+ }
197+ } ;
198+
199+ // If has timezone, adjust the underlying time value. The current time value
200+ // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
201+ // we need to adjust the time value to the local time. See [`adjust_to_local_time`]
202+ // for more details.
203+ //
204+ // Then remove the timezone in return type, i.e. return None
205+ match time_value {
206+ ColumnarValue :: Scalar ( ScalarValue :: TimestampSecond ( None , Some ( _) ) ) => Ok (
207+ ColumnarValue :: Scalar ( ScalarValue :: TimestampSecond ( None , None ) ) ,
208+ ) ,
209+ ColumnarValue :: Scalar ( ScalarValue :: TimestampMillisecond ( None , Some ( _) ) ) => Ok (
210+ ColumnarValue :: Scalar ( ScalarValue :: TimestampMillisecond ( None , None ) ) ,
211+ ) ,
212+ ColumnarValue :: Scalar ( ScalarValue :: TimestampMicrosecond ( None , Some ( _) ) ) => Ok (
213+ ColumnarValue :: Scalar ( ScalarValue :: TimestampMicrosecond ( None , None ) ) ,
214+ ) ,
215+ ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond ( None , Some ( _) ) ) => Ok (
216+ ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond ( None , None ) ) ,
217+ ) ,
218+ ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond ( Some ( ts) , Some ( _) ) ) => {
219+ let adjusted_ts = adjust_to_local_time :: < TimestampNanosecondType > ( * ts, tz) ?;
220+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond (
221+ Some ( adjusted_ts) ,
222+ None ,
223+ ) ) )
224+ }
225+ ColumnarValue :: Scalar ( ScalarValue :: TimestampMicrosecond ( Some ( ts) , Some ( _) ) ) => {
226+ let adjusted_ts = adjust_to_local_time :: < TimestampMicrosecondType > ( * ts, tz) ?;
227+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampMicrosecond (
228+ Some ( adjusted_ts) ,
229+ None ,
230+ ) ) )
231+ }
232+ ColumnarValue :: Scalar ( ScalarValue :: TimestampMillisecond ( Some ( ts) , Some ( _) ) ) => {
233+ let adjusted_ts = adjust_to_local_time :: < TimestampMillisecondType > ( * ts, tz) ?;
234+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampMillisecond (
235+ Some ( adjusted_ts) ,
236+ None ,
237+ ) ) )
238+ }
239+ ColumnarValue :: Scalar ( ScalarValue :: TimestampSecond ( Some ( ts) , Some ( _) ) ) => {
240+ let adjusted_ts = adjust_to_local_time :: < TimestampSecondType > ( * ts, tz) ?;
241+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampSecond (
242+ Some ( adjusted_ts) ,
243+ None ,
244+ ) ) )
245+ }
246+ ColumnarValue :: Array ( array)
247+ if matches ! ( array. data_type( ) , Timestamp ( Nanosecond , Some ( _) ) ) =>
248+ {
249+ transform_array :: < TimestampNanosecondType > ( array, tz)
250+ }
251+ ColumnarValue :: Array ( array)
252+ if matches ! ( array. data_type( ) , Timestamp ( Microsecond , Some ( _) ) ) =>
253+ {
254+ transform_array :: < TimestampMicrosecondType > ( array, tz)
255+ }
256+ ColumnarValue :: Array ( array)
257+ if matches ! ( array. data_type( ) , Timestamp ( Millisecond , Some ( _) ) ) =>
258+ {
259+ transform_array :: < TimestampMillisecondType > ( array, tz)
260+ }
261+ ColumnarValue :: Array ( array)
262+ if matches ! ( array. data_type( ) , Timestamp ( Second , Some ( _) ) ) =>
263+ {
264+ transform_array :: < TimestampSecondType > ( array, tz)
265+ }
266+ _ => {
267+ internal_err ! (
268+ "to_local_time function requires timestamp argument, got {arg_type}"
269+ )
270+ }
271+ }
241272}
242273
243274/// This function converts a timestamp with a timezone to a timestamp without a timezone.
@@ -343,68 +374,6 @@ fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
343374 }
344375}
345376
346- impl ScalarUDFImpl for ToLocalTimeFunc {
347- fn as_any ( & self ) -> & dyn Any {
348- self
349- }
350-
351- fn name ( & self ) -> & str {
352- "to_local_time"
353- }
354-
355- fn signature ( & self ) -> & Signature {
356- & self . signature
357- }
358-
359- fn return_type ( & self , arg_types : & [ DataType ] ) -> Result < DataType > {
360- let [ time_value] = take_function_args ( self . name ( ) , arg_types) ?;
361-
362- match time_value {
363- Timestamp ( timeunit, _) => Ok ( Timestamp ( * timeunit, None ) ) ,
364- _ => exec_err ! (
365- "The to_local_time function can only accept timestamp as the arg, got {:?}" , time_value
366- )
367- }
368- }
369-
370- fn invoke_with_args (
371- & self ,
372- args : datafusion_expr:: ScalarFunctionArgs ,
373- ) -> Result < ColumnarValue > {
374- let [ time_value] = take_function_args ( self . name ( ) , args. args ) ?;
375-
376- self . to_local_time ( std:: slice:: from_ref ( & time_value) )
377- }
378-
379- fn coerce_types ( & self , arg_types : & [ DataType ] ) -> Result < Vec < DataType > > {
380- if arg_types. len ( ) != 1 {
381- return plan_err ! (
382- "to_local_time function requires 1 argument, got {:?}" ,
383- arg_types. len( )
384- ) ;
385- }
386-
387- let first_arg = arg_types[ 0 ] . clone ( ) ;
388- match & first_arg {
389- DataType :: Null => Ok ( vec ! [ Timestamp ( Nanosecond , None ) ] ) ,
390- Timestamp ( Nanosecond , timezone) => {
391- Ok ( vec ! [ Timestamp ( Nanosecond , timezone. clone( ) ) ] )
392- }
393- Timestamp ( Microsecond , timezone) => {
394- Ok ( vec ! [ Timestamp ( Microsecond , timezone. clone( ) ) ] )
395- }
396- Timestamp ( Millisecond , timezone) => {
397- Ok ( vec ! [ Timestamp ( Millisecond , timezone. clone( ) ) ] )
398- }
399- Timestamp ( Second , timezone) => Ok ( vec ! [ Timestamp ( Second , timezone. clone( ) ) ] ) ,
400- _ => plan_err ! ( "The to_local_time function can only accept Timestamp as the arg got {first_arg}" ) ,
401- }
402- }
403- fn documentation ( & self ) -> Option < & Documentation > {
404- self . doc ( )
405- }
406- }
407-
408377#[ cfg( test) ]
409378mod tests {
410379 use std:: sync:: Arc ;
0 commit comments