22use std:: { ops:: ControlFlow , sync:: Arc } ;
33
44use arrow:: {
5- array:: { Array , ArrayRef , BooleanBuilder , Float64Builder , Int64Builder , StringBuilder } ,
6- datatypes:: DataType ,
5+ array:: {
6+ Array , ArrayRef , BooleanBuilder , Float64Builder , Int64Builder , StringBuilder ,
7+ TimestampMicrosecondBuilder ,
8+ } ,
9+ datatypes:: { DataType , TimeUnit } ,
710} ;
11+ use chrono:: { DateTime , Datelike , NaiveDate , TimeZone , Timelike , Utc } ;
812use datafusion_common:: {
9- cast:: { as_boolean_array, as_float64_array, as_int64_array, as_string_array} ,
13+ cast:: {
14+ as_boolean_array, as_float64_array, as_int64_array, as_string_array,
15+ as_timestamp_microsecond_array,
16+ } ,
1017 error:: Result as DataFusionResult ,
1118 exec_datafusion_err, exec_err,
1219} ;
1320use pyo3:: {
1421 Bound , BoundObject , IntoPyObjectExt , PyAny , Python ,
15- types:: { PyAnyMethods , PyInt , PyNone } ,
22+ types:: {
23+ PyAnyMethods , PyDateAccess , PyDateTime , PyInt , PyNone , PyStringMethods , PyTimeAccess ,
24+ PyTzInfoAccess ,
25+ } ,
1626} ;
1727
1828use crate :: {
@@ -38,6 +48,7 @@ impl PythonType {
3848 pub ( crate ) fn data_type ( & self ) -> DataType {
3949 match self {
4050 Self :: Bool => DataType :: Boolean ,
51+ Self :: DateTime => DataType :: Timestamp ( TimeUnit :: Microsecond , None ) ,
4152 Self :: Float => DataType :: Float64 ,
4253 Self :: Int => DataType :: Int64 ,
4354 Self :: String => DataType :: Utf8 ,
@@ -68,6 +79,53 @@ impl PythonType {
6879
6980 Ok ( Box :: new ( it) )
7081 }
82+ Self :: DateTime => {
83+ let array = as_timestamp_microsecond_array ( array) ?;
84+ if let Some ( tz) = array. timezone ( ) {
85+ return exec_err ! ( "expected no time zone but got {tz}" ) ;
86+ }
87+
88+ let it = array. into_iter ( ) . map ( move |maybe_val| {
89+ maybe_val
90+ . map ( |val| {
91+ let dt = DateTime :: from_timestamp_micros ( val) . ok_or_else ( || exec_datafusion_err ! ( "cannot create DateTime object from microsecond timestamp: {val}" ) ) ?;
92+
93+ PyDateTime :: new (
94+ py,
95+ dt. year ( ) ,
96+ dt
97+ . month ( )
98+ . try_into ( )
99+ . map_err ( |e| exec_datafusion_err ! ( "month out of range: {e}" ) ) ?,
100+ dt
101+ . day ( )
102+ . try_into ( )
103+ . map_err ( |e| exec_datafusion_err ! ( "day out of range: {e}" ) ) ?,
104+ dt
105+ . hour ( )
106+ . try_into ( )
107+ . map_err ( |e| exec_datafusion_err ! ( "hour out of range: {e}" ) ) ?,
108+ dt
109+ . minute ( )
110+ . try_into ( )
111+ . map_err ( |e| exec_datafusion_err ! ( "minute out of range: {e}" ) ) ?,
112+ dt
113+ . second ( )
114+ . try_into ( )
115+ . map_err ( |e| exec_datafusion_err ! ( "second out of range: {e}" ) ) ?,
116+ dt. timestamp_subsec_micros ( ) ,
117+ None ,
118+ ) . map_err ( |e| {
119+ exec_datafusion_err ! ( "cannot create PyDateTime: {e}" )
120+ } ) ?. into_bound_py_any ( py) . map_err ( |e| {
121+ exec_datafusion_err ! ( "cannot convert PyDateTime to any: {e}" )
122+ } )
123+ } )
124+ . transpose ( )
125+ } ) ;
126+
127+ Ok ( Box :: new ( it) )
128+ }
71129 Self :: Float => {
72130 let array = as_float64_array ( array) ?;
73131
@@ -128,6 +186,7 @@ impl PythonType {
128186 fn python_to_arrow < ' py > ( & self , num_rows : usize ) -> Box < dyn ArrayBuilder < ' py > > {
129187 match self {
130188 Self :: Bool => Box :: new ( BooleanBuilder :: with_capacity ( num_rows) ) ,
189+ Self :: DateTime => Box :: new ( TimestampMicrosecondBuilder :: with_capacity ( num_rows) ) ,
131190 Self :: Float => Box :: new ( Float64Builder :: with_capacity ( num_rows) ) ,
132191 Self :: Int => Box :: new ( Int64Builder :: with_capacity ( num_rows) ) ,
133192 Self :: String => Box :: new ( StringBuilder :: with_capacity ( num_rows, 1024 ) ) ,
@@ -316,3 +375,50 @@ impl<'py> ArrayBuilder<'py> for StringBuilder {
316375 Arc :: new ( self . finish ( ) )
317376 }
318377}
378+
379+ impl < ' py > ArrayBuilder < ' py > for TimestampMicrosecondBuilder {
380+ fn push ( & mut self , val : Bound < ' py , PyAny > ) -> DataFusionResult < ( ) > {
381+ let val = val. downcast_exact :: < PyDateTime > ( ) . map_err ( |_| {
382+ exec_datafusion_err ! ( "expected `datetime` but got {}" , py_representation( & val) )
383+ } ) ?;
384+ if let Some ( tzinfo) = val. get_tzinfo ( ) {
385+ let s = tzinfo
386+ . str ( )
387+ . and_then ( |name| name. to_str ( ) . map ( |s| s. to_owned ( ) ) )
388+ . unwrap_or_else ( |_| "<unknown>" . to_owned ( ) ) ;
389+ return exec_err ! ( "expected no tzinfo, got {s}" ) ;
390+ }
391+ let val =
392+ NaiveDate :: from_ymd_opt ( val. get_year ( ) , val. get_month ( ) . into ( ) , val. get_day ( ) . into ( ) )
393+ . ok_or_else ( || {
394+ exec_datafusion_err ! (
395+ "cannot create NaiveDate based on year-month-day of {}" ,
396+ py_representation( val)
397+ )
398+ } ) ?
399+ . and_hms_micro_opt (
400+ val. get_hour ( ) . into ( ) ,
401+ val. get_minute ( ) . into ( ) ,
402+ val. get_second ( ) . into ( ) ,
403+ val. get_microsecond ( ) ,
404+ )
405+ . ok_or_else ( || {
406+ exec_datafusion_err ! (
407+ "cannot create NaiveDateTime based on hour-minute-second-microsecond of {}" ,
408+ py_representation( val)
409+ )
410+ } ) ?;
411+ let val = Utc . from_utc_datetime ( & val) ;
412+ let val = val. timestamp_micros ( ) ;
413+ self . append_value ( val) ;
414+ Ok ( ( ) )
415+ }
416+
417+ fn skip ( & mut self ) {
418+ self . append_null ( ) ;
419+ }
420+
421+ fn finish ( & mut self ) -> ArrayRef {
422+ Arc :: new ( self . finish ( ) )
423+ }
424+ }
0 commit comments