diff --git a/datafusion/functions/src/datetime/common.rs b/datafusion/functions/src/datetime/common.rs index 65f9c9323925..fa1819e499dd 100644 --- a/datafusion/functions/src/datetime/common.rs +++ b/datafusion/functions/src/datetime/common.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::str::FromStr; use std::sync::Arc; +use arrow::array::timezone::Tz; use arrow::array::{ Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray, StringArrayType, StringViewArray, @@ -25,7 +27,7 @@ use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::DataType; use chrono::format::{parse, Parsed, StrftimeItems}; use chrono::LocalResult::Single; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, FixedOffset, LocalResult, NaiveDateTime, TimeZone, Utc}; use datafusion_common::cast::as_generic_string_array; use datafusion_common::{ @@ -42,6 +44,167 @@ pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result { string_to_timestamp_nanos(s).map_err(|e| e.into()) } +#[derive(Clone, Copy)] +enum ConfiguredZone { + Named(Tz), + Offset(FixedOffset), +} + +#[derive(Clone)] +pub(crate) struct ConfiguredTimeZone { + repr: Arc, + zone: ConfiguredZone, +} + +impl ConfiguredTimeZone { + pub(crate) fn utc() -> Self { + Self { + repr: Arc::from("+00:00"), + zone: ConfiguredZone::Offset(FixedOffset::east_opt(0).unwrap()), + } + } + + pub(crate) fn parse(tz: &str) -> Result { + if tz.trim().is_empty() { + return Ok(Self::utc()); + } + + if let Ok(named) = Tz::from_str(tz) { + return Ok(Self { + repr: Arc::from(tz), + zone: ConfiguredZone::Named(named), + }); + } + + if let Some(offset) = parse_fixed_offset(tz) { + return Ok(Self { + repr: Arc::from(tz), + zone: ConfiguredZone::Offset(offset), + }); + } + + Err(exec_datafusion_err!( + "Invalid execution timezone '{tz}'. Please provide an IANA timezone name (e.g. 'America/New_York') or an offset in the form '+HH:MM'." + )) + } + + fn timestamp_from_naive(&self, naive: &NaiveDateTime) -> Result { + match self.zone { + ConfiguredZone::Named(tz) => { + local_datetime_to_timestamp(tz.from_local_datetime(naive), &self.repr) + } + ConfiguredZone::Offset(offset) => { + local_datetime_to_timestamp(offset.from_local_datetime(naive), &self.repr) + } + } + } + + fn datetime_from_formatted(&self, s: &str, format: &str) -> Result> { + let datetime = match self.zone { + ConfiguredZone::Named(tz) => { + string_to_datetime_formatted(&tz, s, format)?.with_timezone(&Utc) + } + ConfiguredZone::Offset(offset) => { + string_to_datetime_formatted(&offset, s, format)?.with_timezone(&Utc) + } + }; + Ok(datetime) + } +} + +fn parse_fixed_offset(tz: &str) -> Option { + let tz = tz.trim(); + if tz.eq_ignore_ascii_case("utc") || tz.eq_ignore_ascii_case("z") { + return FixedOffset::east_opt(0); + } + + let (sign, rest) = if let Some(rest) = tz.strip_prefix('+') { + (1, rest) + } else if let Some(rest) = tz.strip_prefix('-') { + (-1, rest) + } else { + return None; + }; + + let (hours, minutes) = if let Some((hours, minutes)) = rest.split_once(':') { + (hours, minutes) + } else if rest.len() == 4 { + rest.split_at(2) + } else { + return None; + }; + + let hours: i32 = hours.parse().ok()?; + let minutes: i32 = minutes.parse().ok()?; + if hours > 23 || minutes > 59 { + return None; + } + + let total_minutes = hours * 60 + minutes; + let total_seconds = sign * total_minutes * 60; + FixedOffset::east_opt(total_seconds) +} + +fn local_datetime_to_timestamp( + result: LocalResult>, + tz_repr: &str, +) -> Result { + match result { + Single(dt) => datetime_to_timestamp(dt.with_timezone(&Utc)), + LocalResult::Ambiguous(dt1, dt2) => Err(exec_datafusion_err!( + "The local time '{:?}' is ambiguous in timezone '{tz_repr}' (also corresponds to '{:?}').", + dt1.naive_local(), + dt2.naive_local() + )), + LocalResult::None => Err(exec_datafusion_err!( + "The local time is invalid in timezone '{tz_repr}'." + )), + } +} + +fn datetime_to_timestamp(datetime: DateTime) -> Result { + datetime + .timestamp_nanos_opt() + .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}")) +} + +fn timestamp_to_naive(value: i64) -> Result { + let secs = value.div_euclid(1_000_000_000); + let nanos = value.rem_euclid(1_000_000_000) as u32; + DateTime::::from_timestamp(secs, nanos) + .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}")) + .map(|dt| dt.naive_utc()) +} + +fn has_explicit_timezone(value: &str) -> bool { + if DateTime::parse_from_rfc3339(value).is_ok() { + return true; + } + + if let Some(pos) = value.rfind(|c| ['T', ' '].contains(&c)) { + let tail = &value[pos + 1..]; + tail.contains('Z') + || tail.contains('z') + || tail.contains('+') + || tail.contains('-') + } else { + false + } +} + +pub(crate) fn string_to_timestamp_nanos_with_timezone( + timezone: &ConfiguredTimeZone, + s: &str, +) -> Result { + let ts = string_to_timestamp_nanos_shim(s)?; + if has_explicit_timezone(s) { + Ok(ts) + } else { + let naive = timestamp_to_naive(ts)?; + timezone.timestamp_from_naive(&naive) + } +} + /// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View] /// /// [Utf8]: DataType::Utf8 @@ -142,6 +305,7 @@ pub(crate) fn string_to_datetime_formatted( /// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html /// #[inline] +#[allow(dead_code)] pub(crate) fn string_to_timestamp_nanos_formatted( s: &str, format: &str, @@ -153,6 +317,15 @@ pub(crate) fn string_to_timestamp_nanos_formatted( .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}")) } +pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone( + timezone: &ConfiguredTimeZone, + s: &str, + format: &str, +) -> Result { + let datetime = timezone.datetime_from_formatted(s, format)?; + datetime_to_timestamp(datetime) +} + /// Accepts a string with a `chrono` format and converts it to a /// millisecond precision timestamp. /// diff --git a/datafusion/functions/src/datetime/to_timestamp.rs b/datafusion/functions/src/datetime/to_timestamp.rs index dcd52aa07be3..2aca2b725ee0 100644 --- a/datafusion/functions/src/datetime/to_timestamp.rs +++ b/datafusion/functions/src/datetime/to_timestamp.rs @@ -304,7 +304,11 @@ impl ScalarUDFImpl for ToTimestampFunc { &self, args: datafusion_expr::ScalarFunctionArgs, ) -> Result { - let args = args.args; + let datafusion_expr::ScalarFunctionArgs { + args, + config_options, + .. + } = args; if args.is_empty() { return exec_err!( "to_timestamp function requires 1 or more arguments, got {}", @@ -341,7 +345,13 @@ impl ScalarUDFImpl for ToTimestampFunc { args[0].cast_to(&Timestamp(Nanosecond, Some(tz)), None) } Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::(&args, "to_timestamp") + let timezone = + ConfiguredTimeZone::parse(&config_options.execution.time_zone)?; + to_timestamp_impl::( + &args, + "to_timestamp", + &timezone, + ) } Decimal128(_, _) => { match &args[0] { @@ -398,7 +408,11 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc { &self, args: datafusion_expr::ScalarFunctionArgs, ) -> Result { - let args = args.args; + let datafusion_expr::ScalarFunctionArgs { + args, + config_options, + .. + } = args; if args.is_empty() { return exec_err!( "to_timestamp_seconds function requires 1 or more arguments, got {}", @@ -417,7 +431,13 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc { } Timestamp(_, Some(tz)) => args[0].cast_to(&Timestamp(Second, Some(tz)), None), Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::(&args, "to_timestamp_seconds") + let timezone = + ConfiguredTimeZone::parse(&config_options.execution.time_zone)?; + to_timestamp_impl::( + &args, + "to_timestamp_seconds", + &timezone, + ) } other => { exec_err!( @@ -453,7 +473,11 @@ impl ScalarUDFImpl for ToTimestampMillisFunc { &self, args: datafusion_expr::ScalarFunctionArgs, ) -> Result { - let args = args.args; + let datafusion_expr::ScalarFunctionArgs { + args, + config_options, + .. + } = args; if args.is_empty() { return exec_err!( "to_timestamp_millis function requires 1 or more arguments, got {}", @@ -473,10 +497,15 @@ impl ScalarUDFImpl for ToTimestampMillisFunc { Timestamp(_, Some(tz)) => { args[0].cast_to(&Timestamp(Millisecond, Some(tz)), None) } - Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::( - &args, - "to_timestamp_millis", - ), + Utf8View | LargeUtf8 | Utf8 => { + let timezone = + ConfiguredTimeZone::parse(&config_options.execution.time_zone)?; + to_timestamp_impl::( + &args, + "to_timestamp_millis", + &timezone, + ) + } other => { exec_err!( "Unsupported data type {} for function to_timestamp_millis", @@ -511,7 +540,11 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc { &self, args: datafusion_expr::ScalarFunctionArgs, ) -> Result { - let args = args.args; + let datafusion_expr::ScalarFunctionArgs { + args, + config_options, + .. + } = args; if args.is_empty() { return exec_err!( "to_timestamp_micros function requires 1 or more arguments, got {}", @@ -531,10 +564,15 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc { Timestamp(_, Some(tz)) => { args[0].cast_to(&Timestamp(Microsecond, Some(tz)), None) } - Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::( - &args, - "to_timestamp_micros", - ), + Utf8View | LargeUtf8 | Utf8 => { + let timezone = + ConfiguredTimeZone::parse(&config_options.execution.time_zone)?; + to_timestamp_impl::( + &args, + "to_timestamp_micros", + &timezone, + ) + } other => { exec_err!( "Unsupported data type {} for function to_timestamp_micros", @@ -569,7 +607,11 @@ impl ScalarUDFImpl for ToTimestampNanosFunc { &self, args: datafusion_expr::ScalarFunctionArgs, ) -> Result { - let args = args.args; + let datafusion_expr::ScalarFunctionArgs { + args, + config_options, + .. + } = args; if args.is_empty() { return exec_err!( "to_timestamp_nanos function requires 1 or more arguments, got {}", @@ -590,7 +632,13 @@ impl ScalarUDFImpl for ToTimestampNanosFunc { args[0].cast_to(&Timestamp(Nanosecond, Some(tz)), None) } Utf8View | LargeUtf8 | Utf8 => { - to_timestamp_impl::(&args, "to_timestamp_nanos") + let timezone = + ConfiguredTimeZone::parse(&config_options.execution.time_zone)?; + to_timestamp_impl::( + &args, + "to_timestamp_nanos", + &timezone, + ) } other => { exec_err!( @@ -617,6 +665,7 @@ fn return_type_for(arg: &DataType, unit: TimeUnit) -> DataType { fn to_timestamp_impl>( args: &[ColumnarValue], name: &str, + timezone: &ConfiguredTimeZone, ) -> Result { let factor = match T::UNIT { Second => 1_000_000_000, @@ -626,17 +675,30 @@ fn to_timestamp_impl>( }; match args.len() { - 1 => handle::( - args, - |s| string_to_timestamp_nanos_shim(s).map(|n| n / factor), - name, - ), - n if n >= 2 => handle_multiple::( - args, - string_to_timestamp_nanos_formatted, - |n| n / factor, - name, - ), + 1 => { + let timezone = timezone.clone(); + handle::( + args, + move |s| { + string_to_timestamp_nanos_with_timezone(&timezone, s) + .map(|n| n / factor) + }, + name, + ) + } + n if n >= 2 => { + let timezone = timezone.clone(); + handle_multiple::( + args, + move |s, format| { + string_to_timestamp_nanos_formatted_with_timezone( + &timezone, s, format, + ) + }, + |n| n / factor, + name, + ) + } _ => exec_err!("Unsupported 0 argument count for function {name}"), } } @@ -645,6 +707,7 @@ fn to_timestamp_impl>( mod tests { use std::sync::Arc; + use crate::datetime::common::ConfiguredTimeZone; use arrow::array::types::Int64Type; use arrow::array::{ Array, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray, @@ -660,27 +723,44 @@ mod tests { use super::*; fn to_timestamp(args: &[ColumnarValue]) -> Result { - to_timestamp_impl::(args, "to_timestamp") + let timezone = ConfiguredTimeZone::utc(); + to_timestamp_impl::(args, "to_timestamp", &timezone) } /// to_timestamp_millis SQL function fn to_timestamp_millis(args: &[ColumnarValue]) -> Result { - to_timestamp_impl::(args, "to_timestamp_millis") + let timezone = ConfiguredTimeZone::utc(); + to_timestamp_impl::( + args, + "to_timestamp_millis", + &timezone, + ) } /// to_timestamp_micros SQL function fn to_timestamp_micros(args: &[ColumnarValue]) -> Result { - to_timestamp_impl::(args, "to_timestamp_micros") + let timezone = ConfiguredTimeZone::utc(); + to_timestamp_impl::( + args, + "to_timestamp_micros", + &timezone, + ) } /// to_timestamp_nanos SQL function fn to_timestamp_nanos(args: &[ColumnarValue]) -> Result { - to_timestamp_impl::(args, "to_timestamp_nanos") + let timezone = ConfiguredTimeZone::utc(); + to_timestamp_impl::( + args, + "to_timestamp_nanos", + &timezone, + ) } /// to_timestamp_seconds SQL function fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result { - to_timestamp_impl::(args, "to_timestamp_seconds") + let timezone = ConfiguredTimeZone::utc(); + to_timestamp_impl::(args, "to_timestamp_seconds", &timezone) } #[test] @@ -751,6 +831,65 @@ mod tests { Ok(()) } + #[test] + fn to_timestamp_respects_execution_timezone() -> Result<()> { + let udf = ToTimestampFunc::new(); + let field = Field::new("arg", Utf8, true).into(); + + let mut options = ConfigOptions::default(); + options.execution.time_zone = "-05:00".into(); + + let args = datafusion_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "2020-09-08T13:42:29".to_string(), + )))], + arg_fields: vec![field], + number_rows: 1, + return_field: Field::new("f", Timestamp(Nanosecond, None), true).into(), + config_options: Arc::new(options), + }; + + let result = udf.invoke_with_args(args)?; + let ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), None)) = + result + else { + panic!("expected scalar timestamp"); + }; + + let expected = string_to_timestamp_nanos_shim("2020-09-08T18:42:29Z")?; + assert_eq!(value, expected); + Ok(()) + } + + #[test] + fn to_timestamp_formats_respect_timezone() -> Result<()> { + let timezone = ConfiguredTimeZone::parse("Asia/Tokyo")?; + let args = vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "03:59:00.123456789 05-17-2023".to_string(), + ))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "%H:%M:%S%.f %m-%d-%Y".to_string(), + ))), + ]; + + let result = to_timestamp_impl::( + &args, + "to_timestamp", + &timezone, + )?; + + let ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), None)) = + result + else { + panic!("expected scalar timestamp"); + }; + + let expected = string_to_timestamp_nanos_shim("2023-05-16T18:59:00.123456789Z")?; + assert_eq!(value, expected); + Ok(()) + } + #[test] fn to_timestamp_invalid_input_type() -> Result<()> { // pass the wrong type of input array to to_timestamp and test