Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eed97cf
Implement timezone-aware handling for to_timestamp functions.
kosiew Oct 12, 2025
bb579d5
Merge branch 'apache:main' into timestamp-17998
Omega359 Dec 3, 2025
7c00e07
Implement timezone-aware handling for to_timestamp functions.
Omega359 Dec 3, 2025
5c4eaf7
Updates from code review.
Omega359 Dec 4, 2025
71fe6d3
Merge remote-tracking branch 'origin/timestamp-17998' into timestamp-…
Omega359 Dec 4, 2025
15096a6
Extract parsing of UTC to a static.
Omega359 Dec 5, 2025
9b3e417
Merge remote-tracking branch 'upstream/main' into timestamp-17998
Omega359 Dec 12, 2025
7dded42
Updates from merge.
Omega359 Dec 12, 2025
1ea061c
remove unused functions.
Omega359 Dec 12, 2025
7122aef
Merge branch 'main' into timestamp-17998
Omega359 Dec 15, 2025
e89d0bd
PR review feedback.
Omega359 Dec 17, 2025
f4e0934
PR review feedback.
Omega359 Dec 17, 2025
6d68731
Merge branch 'main' into timestamp-17998
Omega359 Dec 17, 2025
db10d2c
Merge branch 'main' into timestamp-17998
Omega359 Dec 18, 2025
4a91743
Add documentation.
Omega359 Dec 20, 2025
8d9cb2e
Refactored ScalarDataType out of the PR.
Omega359 Dec 20, 2025
64a29b4
Merge branch 'main' into timestamp-17998
Omega359 Dec 20, 2025
72f2a97
Merge branch 'main' into timestamp-17998
Omega359 Dec 21, 2025
1579ec3
Merge pull request #7
kosiew Dec 22, 2025
783b684
Small doc fix.
Omega359 Dec 22, 2025
5764925
Merge branch 'main' into timestamp-17998
Omega359 Dec 22, 2025
8bd8df0
Merge branch 'main' into timestamp-17998
Omega359 Dec 25, 2025
55845e0
Merge remote-tracking branch 'upstream/main' into timestamp-17998
Omega359 Dec 29, 2025
8742658
Merge branch 'main' into timestamp-17998
Omega359 Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ workspace = true
[features]
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
# enable datetime functions
datetime_expressions = []
datetime_expressions = ["chrono-tz"]
# Enable encoding by default so the doctests work. In general don't automatically enable all packages.
default = [
"datetime_expressions",
Expand Down Expand Up @@ -71,6 +71,7 @@ base64 = { version = "0.22", optional = true }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.8", optional = true }
chrono = { workspace = true }
chrono-tz = { version = "0.10.4", optional = true }
datafusion-common = { workspace = true }
datafusion-doc = { workspace = true }
datafusion-execution = { workspace = true }
Expand Down
24 changes: 17 additions & 7 deletions datafusion/functions/benches/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,21 @@ fn criterion_benchmark(c: &mut Criterion) {
Field::new("f", DataType::Timestamp(TimeUnit::Nanosecond, None), true).into();
let arg_field = Field::new("a", DataType::Utf8, false).into();
let arg_fields = vec![arg_field];
let config_options = Arc::new(ConfigOptions::default());
let mut options = ConfigOptions::default();
options.execution.time_zone = Some("UTC".into());
let config_options = Arc::new(options);

let to_timestamp_udf = to_timestamp(config_options.as_ref());

c.bench_function("to_timestamp_no_formats_utf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let arr_data = data();
let batch_len = arr_data.len();
let string_array = ColumnarValue::Array(Arc::new(arr_data) as ArrayRef);

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: vec![string_array.clone()],
arg_fields: arg_fields.clone(),
Expand All @@ -137,13 +142,14 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_no_formats_largeutf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let data = cast(&data(), &DataType::LargeUtf8).unwrap();
let batch_len = data.len();
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: vec![string_array.clone()],
arg_fields: arg_fields.clone(),
Expand All @@ -157,13 +163,14 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_no_formats_utf8view", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let data = cast(&data(), &DataType::Utf8View).unwrap();
let batch_len = data.len();
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: vec![string_array.clone()],
arg_fields: arg_fields.clone(),
Expand All @@ -177,6 +184,7 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_with_formats_utf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let (inputs, format1, format2, format3) = data_with_formats();
let batch_len = inputs.len();

Expand All @@ -196,7 +204,7 @@ fn criterion_benchmark(c: &mut Criterion) {

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
Expand All @@ -210,6 +218,7 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_with_formats_largeutf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let (inputs, format1, format2, format3) = data_with_formats();
let batch_len = inputs.len();

Expand Down Expand Up @@ -237,7 +246,7 @@ fn criterion_benchmark(c: &mut Criterion) {

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
Expand All @@ -251,6 +260,7 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_with_formats_utf8view", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let (inputs, format1, format2, format3) = data_with_formats();

let batch_len = inputs.len();
Expand Down Expand Up @@ -279,7 +289,7 @@ fn criterion_benchmark(c: &mut Criterion) {

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
Expand Down
153 changes: 121 additions & 32 deletions datafusion/functions/src/datetime/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,57 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use arrow::array::timezone::Tz;
use arrow::array::{
Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
StringArrayType, StringViewArray,
};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use arrow::compute::DecimalCast;
use arrow::compute::kernels::cast_utils::string_to_datetime;
use arrow::datatypes::{DataType, TimeUnit};
use arrow_buffer::ArrowNativeType;
use chrono::LocalResult::Single;
use chrono::format::{Parsed, StrftimeItems, parse};
use chrono::{DateTime, TimeZone, Utc};

use datafusion_common::cast::as_generic_string_array;
use datafusion_common::{
DataFusionError, Result, ScalarType, ScalarValue, exec_datafusion_err, exec_err,
unwrap_or_internal_err,
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
internal_datafusion_err, unwrap_or_internal_err,
};
use datafusion_expr::ColumnarValue;

/// Error message if nanosecond conversion request beyond supported interval
const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";

/// Calls string_to_timestamp_nanos and converts the error type
pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
static UTC: LazyLock<Tz> = LazyLock::new(|| "UTC".parse().expect("UTC is always valid"));

/// Converts a string representation of a date‑time into a timestamp expressed in
/// nanoseconds since the Unix epoch.
///
/// This helper is a thin wrapper around the more general `string_to_datetime`
/// function. It accepts an optional `timezone` which, if `None`, defaults to
/// Coordinated Universal Time (UTC). The string `s` must contain a valid
/// date‑time format that can be parsed by the underlying chrono parser.
///
/// # Return Value
///
/// * `Ok(i64)` – The number of nanoseconds since `1970‑01‑01T00:00:00Z`.
/// * `Err(DataFusionError)` – If the string cannot be parsed, the parsed
/// value is out of range (between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804)
/// or the parsed value does not correspond to an unambiguous time.
pub(crate) fn string_to_timestamp_nanos_with_timezone(
timezone: &Option<Tz>,
s: &str,
) -> Result<i64> {
let tz = timezone.as_ref().unwrap_or(&UTC);
let dt = string_to_datetime(tz, s)?;
let parsed = dt
.timestamp_nanos_opt()
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;

Ok(parsed)
}

/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
Expand Down Expand Up @@ -69,13 +95,12 @@ pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<
/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers
/// relative to the provided `timezone`
///
/// [IANA timezones] are only supported if the `arrow-array/chrono-tz` feature is enabled
///
/// * `2023-01-01 040506 America/Los_Angeles`
///
/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error
/// will be returned
///
/// Note that parsing [IANA timezones] is not supported yet in chrono - <https://github.com/chronotope/chrono/issues/38>
/// and this implementation only supports named timezones at the end of the string preceded by a space.
///
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
/// [IANA timezones]: https://www.iana.org/time-zones
pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
Expand All @@ -89,11 +114,55 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
)
};

let mut datetime_str = s;
let mut format = format;

// Manually handle the most common case of a named timezone at the end of the timestamp.
// Note that %+ handles 'Z' at the end of the string without a space. This code doesn't
// handle named timezones with no preceding space since that would require writing a
// custom parser (or switching to Jiff)
let tz: Option<chrono_tz::Tz> = if format.trim_end().ends_with(" %Z") {
// grab the string after the last space as the named timezone
if let Some((dt_str, timezone_name)) = datetime_str.trim_end().rsplit_once(' ') {
datetime_str = dt_str;

// attempt to parse the timezone name
let result: Result<chrono_tz::Tz, chrono_tz::ParseError> =
timezone_name.parse();
let Ok(tz) = result else {
return Err(err(&result.unwrap_err().to_string()));
};

// successfully parsed the timezone name, remove the ' %Z' from the format
format = &format[..format.len() - 3];

Some(tz)
} else {
None
}
} else if format.contains("%Z") {
return Err(err(
"'%Z' is only supported at the end of the format string preceded by a space",
));
} else {
None
};

let mut parsed = Parsed::new();
parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
parse(&mut parsed, datetime_str, StrftimeItems::new(format))
.map_err(|e| err(&e.to_string()))?;

// attempt to parse the string assuming it has a timezone
let dt = parsed.to_datetime();
let dt = match tz {
Some(tz) => {
// A timezone was manually parsed out, convert it to a fixed offset
match parsed.to_datetime_with_timezone(&tz) {
Ok(dt) => Ok(dt.fixed_offset()),
Err(e) => Err(e),
}
}
// default to parse the string assuming it has a timezone
None => parsed.to_datetime(),
};

if let Err(e) = &dt {
// no timezone or other failure, try without a timezone
Expand All @@ -115,7 +184,7 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
}

/// Accepts a string with a `chrono` format and converts it to a
/// nanosecond precision timestamp.
/// nanosecond precision timestamp relative to the provided `timezone`.
///
/// See [`chrono::format::strftime`] for the full set of supported formats.
///
Expand All @@ -141,19 +210,21 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
///
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
#[inline]
pub(crate) fn string_to_timestamp_nanos_formatted(
pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
timezone: &Option<Tz>,
s: &str,
format: &str,
) -> Result<i64, DataFusionError> {
string_to_datetime_formatted(&Utc, s, format)?
.naive_utc()
.and_utc()
let dt = string_to_datetime_formatted(timezone.as_ref().unwrap_or(&UTC), s, format)?;
let parsed = dt
.timestamp_nanos_opt()
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;

Ok(parsed)
}

/// Accepts a string with a `chrono` format and converts it to a
/// millisecond precision timestamp.
/// millisecond precision timestamp relative to the provided `timezone`.
///
/// See [`chrono::format::strftime`] for the full set of supported formats.
///
Expand All @@ -176,14 +247,14 @@ pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Res
.timestamp_millis())
}

pub(crate) fn handle<O, F, S>(
pub(crate) fn handle<O, F>(
args: &[ColumnarValue],
op: F,
name: &str,
dt: &DataType,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&str) -> Result<O::Native>,
{
match &args[0] {
Expand All @@ -210,8 +281,13 @@ where
},
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(a) => {
let result = a.as_ref().map(|x| op(x)).transpose()?;
Ok(ColumnarValue::Scalar(S::scalar(result)))
let result = a
.as_ref()
.map(|x| op(x))
.transpose()?
.and_then(|v| v.to_i64());
let s = scalar_value(dt, result)?;
Ok(ColumnarValue::Scalar(s))
}
_ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
},
Expand All @@ -221,15 +297,15 @@ where
// Given a function that maps a `&str`, `&str` to an arrow native type,
// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
// depending on the `args`'s variant.
pub(crate) fn handle_multiple<O, F, S, M>(
pub(crate) fn handle_multiple<O, F, M>(
args: &[ColumnarValue],
op: F,
op2: M,
name: &str,
dt: &DataType,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&str, &str) -> Result<O::Native>,
M: Fn(O::Native) -> O::Native,
{
Expand Down Expand Up @@ -298,9 +374,9 @@ where
if let Some(s) = x {
match op(a, s.as_str()) {
Ok(r) => {
ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
op2(r),
)))));
let result = op2(r).to_i64();
let s = scalar_value(dt, result)?;
ret = Some(Ok(ColumnarValue::Scalar(s)));
break;
}
Err(e) => ret = Some(Err(e)),
Expand Down Expand Up @@ -454,3 +530,16 @@ where
// first map is the iterator, second is for the `Option<_>`
array.iter().map(|x| x.map(&op).transpose()).collect()
}

fn scalar_value(dt: &DataType, r: Option<i64>) -> Result<ScalarValue> {
match dt {
DataType::Date32 => Ok(ScalarValue::Date32(r.and_then(|v| v.to_i32()))),
DataType::Timestamp(u, tz) => match u {
TimeUnit::Second => Ok(ScalarValue::TimestampSecond(r, tz.clone())),
TimeUnit::Millisecond => Ok(ScalarValue::TimestampMillisecond(r, tz.clone())),
TimeUnit::Microsecond => Ok(ScalarValue::TimestampMicrosecond(r, tz.clone())),
TimeUnit::Nanosecond => Ok(ScalarValue::TimestampNanosecond(r, tz.clone())),
},
t => Err(internal_datafusion_err!("Unsupported data type: {t:?}")),
}
}
Loading