Skip to content

Commit eed97cf

Browse files
kosiewOmega359
authored andcommitted
Implement timezone-aware handling for to_timestamp functions.
Refactor shared helpers to ensure naïve strings are interpreted using the configured execution zone. Extend unit tests to cover naïve and formatted inputs respecting non-UTC execution timezones.
1 parent 0490aec commit eed97cf

File tree

12 files changed

+1221
-171
lines changed

12 files changed

+1221
-171
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/functions/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ base64 = { version = "0.22", optional = true }
7171
blake2 = { version = "^0.10.2", optional = true }
7272
blake3 = { version = "1.8", optional = true }
7373
chrono = { workspace = true }
74+
chrono-tz = "0.10.4"
7475
datafusion-common = { workspace = true }
7576
datafusion-doc = { workspace = true }
7677
datafusion-execution = { workspace = true }

datafusion/functions/benches/to_timestamp.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,17 @@ fn criterion_benchmark(c: &mut Criterion) {
115115
let arg_field = Field::new("a", DataType::Utf8, false).into();
116116
let arg_fields = vec![arg_field];
117117
let config_options = Arc::new(ConfigOptions::default());
118+
let to_timestamp_udf = to_timestamp(config_options.as_ref());
118119

119120
c.bench_function("to_timestamp_no_formats_utf8", |b| {
121+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
120122
let arr_data = data();
121123
let batch_len = arr_data.len();
122124
let string_array = ColumnarValue::Array(Arc::new(arr_data) as ArrayRef);
123125

124126
b.iter(|| {
125127
black_box(
126-
to_timestamp()
128+
to_timestamp_udf
127129
.invoke_with_args(ScalarFunctionArgs {
128130
args: vec![string_array.clone()],
129131
arg_fields: arg_fields.clone(),
@@ -137,13 +139,14 @@ fn criterion_benchmark(c: &mut Criterion) {
137139
});
138140

139141
c.bench_function("to_timestamp_no_formats_largeutf8", |b| {
142+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
140143
let data = cast(&data(), &DataType::LargeUtf8).unwrap();
141144
let batch_len = data.len();
142145
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);
143146

144147
b.iter(|| {
145148
black_box(
146-
to_timestamp()
149+
to_timestamp_udf
147150
.invoke_with_args(ScalarFunctionArgs {
148151
args: vec![string_array.clone()],
149152
arg_fields: arg_fields.clone(),
@@ -157,13 +160,14 @@ fn criterion_benchmark(c: &mut Criterion) {
157160
});
158161

159162
c.bench_function("to_timestamp_no_formats_utf8view", |b| {
163+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
160164
let data = cast(&data(), &DataType::Utf8View).unwrap();
161165
let batch_len = data.len();
162166
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);
163167

164168
b.iter(|| {
165169
black_box(
166-
to_timestamp()
170+
to_timestamp_udf
167171
.invoke_with_args(ScalarFunctionArgs {
168172
args: vec![string_array.clone()],
169173
arg_fields: arg_fields.clone(),
@@ -177,6 +181,7 @@ fn criterion_benchmark(c: &mut Criterion) {
177181
});
178182

179183
c.bench_function("to_timestamp_with_formats_utf8", |b| {
184+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
180185
let (inputs, format1, format2, format3) = data_with_formats();
181186
let batch_len = inputs.len();
182187

@@ -196,7 +201,7 @@ fn criterion_benchmark(c: &mut Criterion) {
196201

197202
b.iter(|| {
198203
black_box(
199-
to_timestamp()
204+
to_timestamp_udf
200205
.invoke_with_args(ScalarFunctionArgs {
201206
args: args.clone(),
202207
arg_fields: arg_fields.clone(),
@@ -210,6 +215,7 @@ fn criterion_benchmark(c: &mut Criterion) {
210215
});
211216

212217
c.bench_function("to_timestamp_with_formats_largeutf8", |b| {
218+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
213219
let (inputs, format1, format2, format3) = data_with_formats();
214220
let batch_len = inputs.len();
215221

@@ -237,7 +243,7 @@ fn criterion_benchmark(c: &mut Criterion) {
237243

238244
b.iter(|| {
239245
black_box(
240-
to_timestamp()
246+
to_timestamp_udf
241247
.invoke_with_args(ScalarFunctionArgs {
242248
args: args.clone(),
243249
arg_fields: arg_fields.clone(),
@@ -251,6 +257,7 @@ fn criterion_benchmark(c: &mut Criterion) {
251257
});
252258

253259
c.bench_function("to_timestamp_with_formats_utf8view", |b| {
260+
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
254261
let (inputs, format1, format2, format3) = data_with_formats();
255262

256263
let batch_len = inputs.len();
@@ -279,7 +286,7 @@ fn criterion_benchmark(c: &mut Criterion) {
279286

280287
b.iter(|| {
281288
black_box(
282-
to_timestamp()
289+
to_timestamp_udf
283290
.invoke_with_args(ScalarFunctionArgs {
284291
args: args.clone(),
285292
arg_fields: arg_fields.clone(),

datafusion/functions/src/datetime/common.rs

Lines changed: 137 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,52 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::marker::PhantomData;
1819
use std::sync::Arc;
1920

21+
use arrow::array::timezone::Tz;
2022
use arrow::array::{
2123
Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
2224
StringArrayType, StringViewArray,
2325
};
24-
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
25-
use arrow::datatypes::DataType;
26+
use arrow::compute::kernels::cast_utils::{
27+
string_to_datetime, string_to_timestamp_nanos,
28+
};
29+
use arrow::datatypes::{DataType, TimeUnit};
30+
use arrow_buffer::ArrowNativeType;
2631
use chrono::format::{parse, Parsed, StrftimeItems};
2732
use chrono::LocalResult::Single;
2833
use chrono::{DateTime, TimeZone, Utc};
29-
3034
use datafusion_common::cast::as_generic_string_array;
3135
use datafusion_common::{
32-
exec_datafusion_err, exec_err, unwrap_or_internal_err, DataFusionError, Result,
33-
ScalarType, ScalarValue,
36+
exec_datafusion_err, exec_err, internal_datafusion_err, unwrap_or_internal_err,
37+
DataFusionError, Result, ScalarValue,
3438
};
3539
use datafusion_expr::ColumnarValue;
40+
use num_traits::{PrimInt, ToPrimitive};
3641

3742
/// Error message if nanosecond conversion request beyond supported interval
3843
const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
3944

45+
#[expect(unused)]
4046
/// Calls string_to_timestamp_nanos and converts the error type
4147
pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
4248
string_to_timestamp_nanos(s).map_err(|e| e.into())
4349
}
4450

51+
pub(crate) fn string_to_timestamp_nanos_with_timezone(
52+
timezone: &Option<Tz>,
53+
s: &str,
54+
) -> Result<i64> {
55+
let tz = timezone.unwrap_or("UTC".parse()?);
56+
let dt = string_to_datetime(&tz, s)?;
57+
let parsed = dt
58+
.timestamp_nanos_opt()
59+
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
60+
61+
Ok(parsed)
62+
}
63+
4564
/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
4665
///
4766
/// [Utf8]: DataType::Utf8
@@ -69,13 +88,12 @@ pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<
6988
/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers
7089
/// relative to the provided `timezone`
7190
///
72-
/// [IANA timezones] are only supported if the `arrow-array/chrono-tz` feature is enabled
73-
///
74-
/// * `2023-01-01 040506 America/Los_Angeles`
75-
///
7691
/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error
7792
/// will be returned
7893
///
94+
/// Note that parsing [IANA timezones] is not supported yet in chrono - <https://github.com/chronotope/chrono/issues/38>
95+
/// and this implementation only supports named timezones at the end of the string preceded by a space.
96+
///
7997
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
8098
/// [IANA timezones]: https://www.iana.org/time-zones
8199
pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
@@ -89,11 +107,52 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
89107
)
90108
};
91109

110+
let mut datetime_str = s;
111+
let mut format = format;
112+
113+
// we manually handle the most common case of a named timezone at the end of the timestamp
114+
// not that %+ handles 'Z' at the end of the string without a space. This code doesn't
115+
// handle named timezones with no preceding space since that would require writing a
116+
// custom parser (or switching to Jiff)
117+
let tz: Option<chrono_tz::Tz> = if format.ends_with(" %Z") {
118+
// grab the string after the last space as the named timezone
119+
let parts: Vec<&str> = datetime_str.rsplitn(2, ' ').collect();
120+
let timezone_name = parts[0];
121+
datetime_str = parts[1];
122+
123+
// attempt to parse the timezone name
124+
let result: Result<chrono_tz::Tz, chrono_tz::ParseError> = timezone_name.parse();
125+
let Ok(tz) = result else {
126+
return Err(err(&result.unwrap_err().to_string()));
127+
};
128+
129+
// successfully parsed the timezone name, remove the ' %Z' from the format
130+
format = format.trim_end_matches(" %Z");
131+
132+
Some(tz)
133+
} else if format.contains("%Z") {
134+
return Err(err(
135+
"'%Z' is only supported at the end of the format string preceded by a space",
136+
));
137+
} else {
138+
None
139+
};
140+
92141
let mut parsed = Parsed::new();
93-
parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
142+
parse(&mut parsed, datetime_str, StrftimeItems::new(format))
143+
.map_err(|e| err(&e.to_string()))?;
94144

95-
// attempt to parse the string assuming it has a timezone
96-
let dt = parsed.to_datetime();
145+
let dt = match tz {
146+
Some(tz) => {
147+
// A timezone was manually parsed out, convert it to a fixed offset
148+
match parsed.to_datetime_with_timezone(&tz) {
149+
Ok(dt) => Ok(dt.fixed_offset()),
150+
Err(e) => Err(e),
151+
}
152+
}
153+
// default to parse the string assuming it has a timezone
154+
None => parsed.to_datetime(),
155+
};
97156

98157
if let Err(e) = &dt {
99158
// no timezone or other failure, try without a timezone
@@ -141,6 +200,7 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
141200
///
142201
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
143202
#[inline]
203+
#[expect(unused)]
144204
pub(crate) fn string_to_timestamp_nanos_formatted(
145205
s: &str,
146206
format: &str,
@@ -152,6 +212,20 @@ pub(crate) fn string_to_timestamp_nanos_formatted(
152212
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
153213
}
154214

215+
pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
216+
timezone: &Option<Tz>,
217+
s: &str,
218+
format: &str,
219+
) -> Result<i64, DataFusionError> {
220+
let dt =
221+
string_to_datetime_formatted(&timezone.unwrap_or("UTC".parse()?), s, format)?;
222+
let parsed = dt
223+
.timestamp_nanos_opt()
224+
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
225+
226+
Ok(parsed)
227+
}
228+
155229
/// Accepts a string with a `chrono` format and converts it to a
156230
/// millisecond precision timestamp.
157231
///
@@ -176,14 +250,50 @@ pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Res
176250
.timestamp_millis())
177251
}
178252

179-
pub(crate) fn handle<O, F, S>(
253+
pub(crate) struct ScalarDataType<T: PrimInt> {
254+
data_type: DataType,
255+
_marker: PhantomData<T>,
256+
}
257+
258+
impl<T: PrimInt> ScalarDataType<T> {
259+
pub(crate) fn new(dt: DataType) -> Self {
260+
Self {
261+
data_type: dt,
262+
_marker: PhantomData,
263+
}
264+
}
265+
266+
fn scalar(&self, r: Option<i64>) -> Result<ScalarValue> {
267+
match &self.data_type {
268+
DataType::Date32 => Ok(ScalarValue::Date32(r.and_then(|v| v.to_i32()))),
269+
DataType::Timestamp(u, tz) => match u {
270+
TimeUnit::Second => Ok(ScalarValue::TimestampSecond(r, tz.clone())),
271+
TimeUnit::Millisecond => {
272+
Ok(ScalarValue::TimestampMillisecond(r, tz.clone()))
273+
}
274+
TimeUnit::Microsecond => {
275+
Ok(ScalarValue::TimestampMicrosecond(r, tz.clone()))
276+
}
277+
TimeUnit::Nanosecond => {
278+
Ok(ScalarValue::TimestampNanosecond(r, tz.clone()))
279+
}
280+
},
281+
t => Err(internal_datafusion_err!(
282+
"Unsupported data type for ScalarDataType<T>: {t:?}"
283+
)),
284+
}
285+
}
286+
}
287+
288+
pub(crate) fn handle<O, F, T>(
180289
args: &[ColumnarValue],
181290
op: F,
182291
name: &str,
292+
sdt: &ScalarDataType<T>,
183293
) -> Result<ColumnarValue>
184294
where
185295
O: ArrowPrimitiveType,
186-
S: ScalarType<O::Native>,
296+
T: PrimInt,
187297
F: Fn(&str) -> Result<O::Native>,
188298
{
189299
match &args[0] {
@@ -210,8 +320,13 @@ where
210320
},
211321
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
212322
Some(a) => {
213-
let result = a.as_ref().map(|x| op(x)).transpose()?;
214-
Ok(ColumnarValue::Scalar(S::scalar(result)))
323+
let result = a
324+
.as_ref()
325+
.map(|x| op(x))
326+
.transpose()?
327+
.and_then(|v| v.to_i64());
328+
let s = sdt.scalar(result)?;
329+
Ok(ColumnarValue::Scalar(s))
215330
}
216331
_ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
217332
},
@@ -221,17 +336,18 @@ where
221336
// Given a function that maps a `&str`, `&str` to an arrow native type,
222337
// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
223338
// depending on the `args`'s variant.
224-
pub(crate) fn handle_multiple<O, F, S, M>(
339+
pub(crate) fn handle_multiple<O, F, M, T>(
225340
args: &[ColumnarValue],
226341
op: F,
227342
op2: M,
228343
name: &str,
344+
sdt: &ScalarDataType<T>,
229345
) -> Result<ColumnarValue>
230346
where
231347
O: ArrowPrimitiveType,
232-
S: ScalarType<O::Native>,
233348
F: Fn(&str, &str) -> Result<O::Native>,
234349
M: Fn(O::Native) -> O::Native,
350+
T: PrimInt,
235351
{
236352
match &args[0] {
237353
ColumnarValue::Array(a) => match a.data_type() {
@@ -286,9 +402,9 @@ where
286402
if let Some(s) = x {
287403
match op(a, s.as_str()) {
288404
Ok(r) => {
289-
ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
290-
op2(r),
291-
)))));
405+
let result = op2(r).to_i64();
406+
let s = sdt.scalar(result)?;
407+
ret = Some(Ok(ColumnarValue::Scalar(s)));
292408
break;
293409
}
294410
Err(e) => ret = Some(Err(e)),

0 commit comments

Comments
 (0)