Skip to content

Commit ada0923

Browse files
Omega359kosiew
andauthored
Respect execution timezone in to_timestamp and related functions (#19078)
## Which issue does this PR close? Closes #17998. Continuation of PR #18025 by @kosiew ## Rationale for this change Previously, the to_timestamp() family of functions (to_timestamp, to_timestamp_seconds, to_timestamp_millis, to_timestamp_micros, to_timestamp_nanos) always interpreted timezone-free (naïve) timestamps as UTC, ignoring the datafusion.execution.time_zone configuration option. This behavior caused inconsistencies when users configured a specific execution timezone and expected timestamp conversions to respect it. This PR introduces full timezone awareness to these functions so that: - Naïve timestamp strings are interpreted as being in the configured execution timezone, or UTC if the configured execution timezone is `None`. - All returned timestamps are in the execution timezone. ## What changes are included in this PR? Code, tests. ## Are these changes tested? Yes, via code tests and slt tests. ## Are there any user-facing changes? Yes: - to_timestamp() and its precision variants now respect datafusion.execution.time_zone when parsing timezone-free timestamps and return timestamps in the execution time zone.. These changes make timestamp functions consistent with session timezone semantics and improve correctness for global workloads. --------- Co-authored-by: Siew Kam Onn <[email protected]>
1 parent fd79241 commit ada0923

File tree

12 files changed

+1159
-240
lines changed

12 files changed

+1159
-240
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/functions/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ workspace = true
4040
[features]
4141
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
4242
# enable datetime functions
43-
datetime_expressions = []
43+
datetime_expressions = ["chrono-tz"]
4444
# Enable encoding by default so the doctests work. In general don't automatically enable all packages.
4545
default = [
4646
"datetime_expressions",
@@ -71,6 +71,7 @@ base64 = { version = "0.22", optional = true }
7171
blake2 = { version = "^0.10.2", optional = true }
7272
blake3 = { version = "1.8", optional = true }
7373
chrono = { workspace = true }
74+
chrono-tz = { version = "0.10.4", optional = true }
7475
datafusion-common = { workspace = true }
7576
datafusion-doc = { workspace = true }
7677
datafusion-execution = { workspace = true }

datafusion/functions/benches/to_timestamp.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,21 @@ fn criterion_benchmark(c: &mut Criterion) {
114114
Field::new("f", DataType::Timestamp(TimeUnit::Nanosecond, None), true).into();
115115
let arg_field = Field::new("a", DataType::Utf8, false).into();
116116
let arg_fields = vec![arg_field];
117-
let config_options = Arc::new(ConfigOptions::default());
117+
let mut options = ConfigOptions::default();
118+
options.execution.time_zone = Some("UTC".into());
119+
let config_options = Arc::new(options);
120+
121+
let to_timestamp_udf = to_timestamp(config_options.as_ref());
118122

119123
c.bench_function("to_timestamp_no_formats_utf8", |b| {
124+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
120125
let arr_data = data();
121126
let batch_len = arr_data.len();
122127
let string_array = ColumnarValue::Array(Arc::new(arr_data) as ArrayRef);
123128

124129
b.iter(|| {
125130
black_box(
126-
to_timestamp()
131+
to_timestamp_udf
127132
.invoke_with_args(ScalarFunctionArgs {
128133
args: vec![string_array.clone()],
129134
arg_fields: arg_fields.clone(),
@@ -137,13 +142,14 @@ fn criterion_benchmark(c: &mut Criterion) {
137142
});
138143

139144
c.bench_function("to_timestamp_no_formats_largeutf8", |b| {
145+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
140146
let data = cast(&data(), &DataType::LargeUtf8).unwrap();
141147
let batch_len = data.len();
142148
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);
143149

144150
b.iter(|| {
145151
black_box(
146-
to_timestamp()
152+
to_timestamp_udf
147153
.invoke_with_args(ScalarFunctionArgs {
148154
args: vec![string_array.clone()],
149155
arg_fields: arg_fields.clone(),
@@ -157,13 +163,14 @@ fn criterion_benchmark(c: &mut Criterion) {
157163
});
158164

159165
c.bench_function("to_timestamp_no_formats_utf8view", |b| {
166+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
160167
let data = cast(&data(), &DataType::Utf8View).unwrap();
161168
let batch_len = data.len();
162169
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);
163170

164171
b.iter(|| {
165172
black_box(
166-
to_timestamp()
173+
to_timestamp_udf
167174
.invoke_with_args(ScalarFunctionArgs {
168175
args: vec![string_array.clone()],
169176
arg_fields: arg_fields.clone(),
@@ -177,6 +184,7 @@ fn criterion_benchmark(c: &mut Criterion) {
177184
});
178185

179186
c.bench_function("to_timestamp_with_formats_utf8", |b| {
187+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
180188
let (inputs, format1, format2, format3) = data_with_formats();
181189
let batch_len = inputs.len();
182190

@@ -196,7 +204,7 @@ fn criterion_benchmark(c: &mut Criterion) {
196204

197205
b.iter(|| {
198206
black_box(
199-
to_timestamp()
207+
to_timestamp_udf
200208
.invoke_with_args(ScalarFunctionArgs {
201209
args: args.clone(),
202210
arg_fields: arg_fields.clone(),
@@ -210,6 +218,7 @@ fn criterion_benchmark(c: &mut Criterion) {
210218
});
211219

212220
c.bench_function("to_timestamp_with_formats_largeutf8", |b| {
221+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
213222
let (inputs, format1, format2, format3) = data_with_formats();
214223
let batch_len = inputs.len();
215224

@@ -237,7 +246,7 @@ fn criterion_benchmark(c: &mut Criterion) {
237246

238247
b.iter(|| {
239248
black_box(
240-
to_timestamp()
249+
to_timestamp_udf
241250
.invoke_with_args(ScalarFunctionArgs {
242251
args: args.clone(),
243252
arg_fields: arg_fields.clone(),
@@ -251,6 +260,7 @@ fn criterion_benchmark(c: &mut Criterion) {
251260
});
252261

253262
c.bench_function("to_timestamp_with_formats_utf8view", |b| {
263+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
254264
let (inputs, format1, format2, format3) = data_with_formats();
255265

256266
let batch_len = inputs.len();
@@ -279,7 +289,7 @@ fn criterion_benchmark(c: &mut Criterion) {
279289

280290
b.iter(|| {
281291
black_box(
282-
to_timestamp()
292+
to_timestamp_udf
283293
.invoke_with_args(ScalarFunctionArgs {
284294
args: args.clone(),
285295
arg_fields: arg_fields.clone(),

datafusion/functions/src/datetime/common.rs

Lines changed: 121 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,57 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
18+
use std::sync::{Arc, LazyLock};
1919

20+
use arrow::array::timezone::Tz;
2021
use arrow::array::{
2122
Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
2223
StringArrayType, StringViewArray,
2324
};
24-
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
25-
use arrow::datatypes::DataType;
25+
use arrow::compute::DecimalCast;
26+
use arrow::compute::kernels::cast_utils::string_to_datetime;
27+
use arrow::datatypes::{DataType, TimeUnit};
28+
use arrow_buffer::ArrowNativeType;
2629
use chrono::LocalResult::Single;
2730
use chrono::format::{Parsed, StrftimeItems, parse};
2831
use chrono::{DateTime, TimeZone, Utc};
29-
3032
use datafusion_common::cast::as_generic_string_array;
3133
use datafusion_common::{
32-
DataFusionError, Result, ScalarType, ScalarValue, exec_datafusion_err, exec_err,
33-
unwrap_or_internal_err,
34+
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
35+
internal_datafusion_err, unwrap_or_internal_err,
3436
};
3537
use datafusion_expr::ColumnarValue;
3638

3739
/// Error message if nanosecond conversion request beyond supported interval
3840
const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
3941

40-
/// Calls string_to_timestamp_nanos and converts the error type
41-
pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
42-
string_to_timestamp_nanos(s).map_err(|e| e.into())
42+
static UTC: LazyLock<Tz> = LazyLock::new(|| "UTC".parse().expect("UTC is always valid"));
43+
44+
/// Converts a string representation of a date‑time into a timestamp expressed in
45+
/// nanoseconds since the Unix epoch.
46+
///
47+
/// This helper is a thin wrapper around the more general `string_to_datetime`
48+
/// function. It accepts an optional `timezone` which, if `None`, defaults to
49+
/// Coordinated Universal Time (UTC). The string `s` must contain a valid
50+
/// date‑time format that can be parsed by the underlying chrono parser.
51+
///
52+
/// # Return Value
53+
///
54+
/// * `Ok(i64)` – The number of nanoseconds since `1970‑01‑01T00:00:00Z`.
55+
/// * `Err(DataFusionError)` – If the string cannot be parsed, the parsed
56+
/// value is out of range (between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804)
57+
/// or the parsed value does not correspond to an unambiguous time.
58+
pub(crate) fn string_to_timestamp_nanos_with_timezone(
59+
timezone: &Option<Tz>,
60+
s: &str,
61+
) -> Result<i64> {
62+
let tz = timezone.as_ref().unwrap_or(&UTC);
63+
let dt = string_to_datetime(tz, s)?;
64+
let parsed = dt
65+
.timestamp_nanos_opt()
66+
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
67+
68+
Ok(parsed)
4369
}
4470

4571
/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
@@ -69,13 +95,12 @@ pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<
6995
/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers
7096
/// relative to the provided `timezone`
7197
///
72-
/// [IANA timezones] are only supported if the `arrow-array/chrono-tz` feature is enabled
73-
///
74-
/// * `2023-01-01 040506 America/Los_Angeles`
75-
///
7698
/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error
7799
/// will be returned
78100
///
101+
/// Note that parsing [IANA timezones] is not supported yet in chrono - <https://github.com/chronotope/chrono/issues/38>
102+
/// and this implementation only supports named timezones at the end of the string preceded by a space.
103+
///
79104
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
80105
/// [IANA timezones]: https://www.iana.org/time-zones
81106
pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
@@ -89,11 +114,55 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
89114
)
90115
};
91116

117+
let mut datetime_str = s;
118+
let mut format = format;
119+
120+
// Manually handle the most common case of a named timezone at the end of the timestamp.
121+
// Note that %+ handles 'Z' at the end of the string without a space. This code doesn't
122+
// handle named timezones with no preceding space since that would require writing a
123+
// custom parser (or switching to Jiff)
124+
let tz: Option<chrono_tz::Tz> = if format.trim_end().ends_with(" %Z") {
125+
// grab the string after the last space as the named timezone
126+
if let Some((dt_str, timezone_name)) = datetime_str.trim_end().rsplit_once(' ') {
127+
datetime_str = dt_str;
128+
129+
// attempt to parse the timezone name
130+
let result: Result<chrono_tz::Tz, chrono_tz::ParseError> =
131+
timezone_name.parse();
132+
let Ok(tz) = result else {
133+
return Err(err(&result.unwrap_err().to_string()));
134+
};
135+
136+
// successfully parsed the timezone name, remove the ' %Z' from the format
137+
format = &format[..format.len() - 3];
138+
139+
Some(tz)
140+
} else {
141+
None
142+
}
143+
} else if format.contains("%Z") {
144+
return Err(err(
145+
"'%Z' is only supported at the end of the format string preceded by a space",
146+
));
147+
} else {
148+
None
149+
};
150+
92151
let mut parsed = Parsed::new();
93-
parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
152+
parse(&mut parsed, datetime_str, StrftimeItems::new(format))
153+
.map_err(|e| err(&e.to_string()))?;
94154

95-
// attempt to parse the string assuming it has a timezone
96-
let dt = parsed.to_datetime();
155+
let dt = match tz {
156+
Some(tz) => {
157+
// A timezone was manually parsed out, convert it to a fixed offset
158+
match parsed.to_datetime_with_timezone(&tz) {
159+
Ok(dt) => Ok(dt.fixed_offset()),
160+
Err(e) => Err(e),
161+
}
162+
}
163+
// default to parse the string assuming it has a timezone
164+
None => parsed.to_datetime(),
165+
};
97166

98167
if let Err(e) = &dt {
99168
// no timezone or other failure, try without a timezone
@@ -115,7 +184,7 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
115184
}
116185

117186
/// Accepts a string with a `chrono` format and converts it to a
118-
/// nanosecond precision timestamp.
187+
/// nanosecond precision timestamp relative to the provided `timezone`.
119188
///
120189
/// See [`chrono::format::strftime`] for the full set of supported formats.
121190
///
@@ -141,19 +210,21 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
141210
///
142211
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
143212
#[inline]
144-
pub(crate) fn string_to_timestamp_nanos_formatted(
213+
pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
214+
timezone: &Option<Tz>,
145215
s: &str,
146216
format: &str,
147217
) -> Result<i64, DataFusionError> {
148-
string_to_datetime_formatted(&Utc, s, format)?
149-
.naive_utc()
150-
.and_utc()
218+
let dt = string_to_datetime_formatted(timezone.as_ref().unwrap_or(&UTC), s, format)?;
219+
let parsed = dt
151220
.timestamp_nanos_opt()
152-
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
221+
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
222+
223+
Ok(parsed)
153224
}
154225

155226
/// Accepts a string with a `chrono` format and converts it to a
156-
/// millisecond precision timestamp.
227+
/// millisecond precision timestamp relative to the provided `timezone`.
157228
///
158229
/// See [`chrono::format::strftime`] for the full set of supported formats.
159230
///
@@ -176,14 +247,14 @@ pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Res
176247
.timestamp_millis())
177248
}
178249

179-
pub(crate) fn handle<O, F, S>(
250+
pub(crate) fn handle<O, F>(
180251
args: &[ColumnarValue],
181252
op: F,
182253
name: &str,
254+
dt: &DataType,
183255
) -> Result<ColumnarValue>
184256
where
185257
O: ArrowPrimitiveType,
186-
S: ScalarType<O::Native>,
187258
F: Fn(&str) -> Result<O::Native>,
188259
{
189260
match &args[0] {
@@ -210,8 +281,13 @@ where
210281
},
211282
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
212283
Some(a) => {
213-
let result = a.as_ref().map(|x| op(x)).transpose()?;
214-
Ok(ColumnarValue::Scalar(S::scalar(result)))
284+
let result = a
285+
.as_ref()
286+
.map(|x| op(x))
287+
.transpose()?
288+
.and_then(|v| v.to_i64());
289+
let s = scalar_value(dt, result)?;
290+
Ok(ColumnarValue::Scalar(s))
215291
}
216292
_ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
217293
},
@@ -221,15 +297,15 @@ where
221297
// Given a function that maps a `&str`, `&str` to an arrow native type,
222298
// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
223299
// depending on the `args`'s variant.
224-
pub(crate) fn handle_multiple<O, F, S, M>(
300+
pub(crate) fn handle_multiple<O, F, M>(
225301
args: &[ColumnarValue],
226302
op: F,
227303
op2: M,
228304
name: &str,
305+
dt: &DataType,
229306
) -> Result<ColumnarValue>
230307
where
231308
O: ArrowPrimitiveType,
232-
S: ScalarType<O::Native>,
233309
F: Fn(&str, &str) -> Result<O::Native>,
234310
M: Fn(O::Native) -> O::Native,
235311
{
@@ -298,9 +374,9 @@ where
298374
if let Some(s) = x {
299375
match op(a, s.as_str()) {
300376
Ok(r) => {
301-
ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
302-
op2(r),
303-
)))));
377+
let result = op2(r).to_i64();
378+
let s = scalar_value(dt, result)?;
379+
ret = Some(Ok(ColumnarValue::Scalar(s)));
304380
break;
305381
}
306382
Err(e) => ret = Some(Err(e)),
@@ -454,3 +530,16 @@ where
454530
// first map is the iterator, second is for the `Option<_>`
455531
array.iter().map(|x| x.map(&op).transpose()).collect()
456532
}
533+
534+
fn scalar_value(dt: &DataType, r: Option<i64>) -> Result<ScalarValue> {
535+
match dt {
536+
DataType::Date32 => Ok(ScalarValue::Date32(r.and_then(|v| v.to_i32()))),
537+
DataType::Timestamp(u, tz) => match u {
538+
TimeUnit::Second => Ok(ScalarValue::TimestampSecond(r, tz.clone())),
539+
TimeUnit::Millisecond => Ok(ScalarValue::TimestampMillisecond(r, tz.clone())),
540+
TimeUnit::Microsecond => Ok(ScalarValue::TimestampMicrosecond(r, tz.clone())),
541+
TimeUnit::Nanosecond => Ok(ScalarValue::TimestampNanosecond(r, tz.clone())),
542+
},
543+
t => Err(internal_datafusion_err!("Unsupported data type: {t:?}")),
544+
}
545+
}

0 commit comments

Comments
 (0)