Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 174 additions & 1 deletion datafusion/functions/src/datetime/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::str::FromStr;
use std::sync::Arc;

use arrow::array::timezone::Tz;
use arrow::array::{
Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
StringArrayType, StringViewArray,
Expand All @@ -25,7 +27,7 @@ use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use chrono::format::{parse, Parsed, StrftimeItems};
use chrono::LocalResult::Single;
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, FixedOffset, LocalResult, NaiveDateTime, TimeZone, Utc};

use datafusion_common::cast::as_generic_string_array;
use datafusion_common::{
Expand All @@ -42,6 +44,167 @@ pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
}

#[derive(Clone, Copy)]
enum ConfiguredZone {
Named(Tz),
Offset(FixedOffset),
}

#[derive(Clone)]
pub(crate) struct ConfiguredTimeZone {
repr: Arc<str>,
zone: ConfiguredZone,
}

impl ConfiguredTimeZone {
pub(crate) fn utc() -> Self {
Self {
repr: Arc::from("+00:00"),
zone: ConfiguredZone::Offset(FixedOffset::east_opt(0).unwrap()),
}
}

pub(crate) fn parse(tz: &str) -> Result<Self> {
if tz.trim().is_empty() {
return Ok(Self::utc());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we may want to change this to allow for None - see the start of discussion @ #17993 (comment) and #18017 (comment)

}

if let Ok(named) = Tz::from_str(tz) {
return Ok(Self {
repr: Arc::from(tz),
zone: ConfiguredZone::Named(named),
});
}

if let Some(offset) = parse_fixed_offset(tz) {
return Ok(Self {
repr: Arc::from(tz),
zone: ConfiguredZone::Offset(offset),
});
}

Err(exec_datafusion_err!(
"Invalid execution timezone '{tz}'. Please provide an IANA timezone name (e.g. 'America/New_York') or an offset in the form '+HH:MM'."
))
}

fn timestamp_from_naive(&self, naive: &NaiveDateTime) -> Result<i64> {
match self.zone {
ConfiguredZone::Named(tz) => {
local_datetime_to_timestamp(tz.from_local_datetime(naive), &self.repr)
}
ConfiguredZone::Offset(offset) => {
local_datetime_to_timestamp(offset.from_local_datetime(naive), &self.repr)
}
}
}

fn datetime_from_formatted(&self, s: &str, format: &str) -> Result<DateTime<Utc>> {
let datetime = match self.zone {
ConfiguredZone::Named(tz) => {
string_to_datetime_formatted(&tz, s, format)?.with_timezone(&Utc)
}
ConfiguredZone::Offset(offset) => {
string_to_datetime_formatted(&offset, s, format)?.with_timezone(&Utc)
}
};
Ok(datetime)
}
}

fn parse_fixed_offset(tz: &str) -> Option<FixedOffset> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is a private fn

error[E0603]: function `parse_fixed_offset` is private
  --> datafusion/functions/src/datetime/common.rs:18:29
   |
18 | use arrow::array::timezone::parse_fixed_offset;
   |                             ^^^^^^^^^^^^^^^^^^ private function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll file a ticket to make that pub.

let tz = tz.trim();
if tz.eq_ignore_ascii_case("utc") || tz.eq_ignore_ascii_case("z") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tz::from_str(tz) doesn't account for these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked to confirm that it does not handle lower case:

    fn parse_fixed_offset_accepts_lowercase_and_z() -> Result<()> {
        use std::str::FromStr;

        assert!(!Tz::from_str("utc").is_err());
        ConfiguredTimeZone::parse("utc")?; // succeeds via parse_fixed_offset fallback
        ConfiguredTimeZone::parse("Z")?; // succeeds via parse_fixed_offset fallback
        Ok(())
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. While I expect it's to spec I can't see a reason why that impl shouldn't be case insensitive. I'll file a ticket for that as well.

return FixedOffset::east_opt(0);
}

let (sign, rest) = if let Some(rest) = tz.strip_prefix('+') {
(1, rest)
} else if let Some(rest) = tz.strip_prefix('-') {
(-1, rest)
} else {
return None;
};

let (hours, minutes) = if let Some((hours, minutes)) = rest.split_once(':') {
(hours, minutes)
} else if rest.len() == 4 {
rest.split_at(2)
} else {
return None;
};

let hours: i32 = hours.parse().ok()?;
let minutes: i32 = minutes.parse().ok()?;
if hours > 23 || minutes > 59 {
return None;
}

let total_minutes = hours * 60 + minutes;
let total_seconds = sign * total_minutes * 60;
FixedOffset::east_opt(total_seconds)
}

fn local_datetime_to_timestamp<T: TimeZone>(
result: LocalResult<DateTime<T>>,
tz_repr: &str,
) -> Result<i64> {
match result {
Single(dt) => datetime_to_timestamp(dt.with_timezone(&Utc)),
LocalResult::Ambiguous(dt1, dt2) => Err(exec_datafusion_err!(
"The local time '{:?}' is ambiguous in timezone '{tz_repr}' (also corresponds to '{:?}').",
dt1.naive_local(),
dt2.naive_local()
)),
LocalResult::None => Err(exec_datafusion_err!(
"The local time is invalid in timezone '{tz_repr}'."
)),
}
}

fn datetime_to_timestamp(datetime: DateTime<Utc>) -> Result<i64> {
datetime
.timestamp_nanos_opt()
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
}

fn timestamp_to_naive(value: i64) -> Result<NaiveDateTime> {
let secs = value.div_euclid(1_000_000_000);
let nanos = value.rem_euclid(1_000_000_000) as u32;
DateTime::<Utc>::from_timestamp(secs, nanos)
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
.map(|dt| dt.naive_utc())
}

fn has_explicit_timezone(value: &str) -> bool {
if DateTime::parse_from_rfc3339(value).is_ok() {
return true;
}

if let Some(pos) = value.rfind(|c| ['T', ' '].contains(&c)) {
let tail = &value[pos + 1..];
tail.contains('Z')
|| tail.contains('z')
|| tail.contains('+')
|| tail.contains('-')
} else {
false
}
}

pub(crate) fn string_to_timestamp_nanos_with_timezone(
timezone: &ConfiguredTimeZone,
s: &str,
) -> Result<i64> {
let ts = string_to_timestamp_nanos_shim(s)?;
if has_explicit_timezone(s) {
Ok(ts)
} else {
let naive = timestamp_to_naive(ts)?;
timezone.timestamp_from_naive(&naive)
}
}

/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
///
/// [Utf8]: DataType::Utf8
Expand Down Expand Up @@ -142,6 +305,7 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
///
#[inline]
#[allow(dead_code)]
pub(crate) fn string_to_timestamp_nanos_formatted(
s: &str,
format: &str,
Expand All @@ -153,6 +317,15 @@ pub(crate) fn string_to_timestamp_nanos_formatted(
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
}

pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
timezone: &ConfiguredTimeZone,
s: &str,
format: &str,
) -> Result<i64, DataFusionError> {
let datetime = timezone.datetime_from_formatted(s, format)?;
datetime_to_timestamp(datetime)
}

/// Accepts a string with a `chrono` format and converts it to a
/// millisecond precision timestamp.
///
Expand Down
Loading