Skip to content

Commit 90f5bfe

Browse files
andygroveJefffrey
andauthored
feat: Implement Spark functions hour, minute, second (#19512)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - N/A ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Add new functions: hour, minute, and second. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Implementation - Unit Tests - slt tests ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, tests added as part of this PR. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> No, these are new functions. <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Jefffrey <[email protected]>
1 parent 0db668b commit 90f5bfe

File tree

5 files changed

+349
-6
lines changed

5 files changed

+349
-6
lines changed
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
20+
use arrow::array::ArrayRef;
21+
use arrow::compute::{DatePart, date_part};
22+
use arrow::datatypes::DataType;
23+
use datafusion_common::Result;
24+
use datafusion_common::utils::take_function_args;
25+
use datafusion_expr::{
26+
Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
27+
TypeSignatureClass, Volatility,
28+
};
29+
use datafusion_functions::utils::make_scalar_function;
30+
31+
/// Creates a signature for datetime extraction functions that accept timestamp types.
32+
fn extract_signature() -> Signature {
33+
Signature::coercible(
34+
vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
35+
Volatility::Immutable,
36+
)
37+
}
38+
39+
// -----------------------------------------------------------------------------
40+
// SparkHour
41+
// -----------------------------------------------------------------------------
42+
43+
#[derive(Debug, PartialEq, Eq, Hash)]
44+
pub struct SparkHour {
45+
signature: Signature,
46+
}
47+
48+
impl Default for SparkHour {
49+
fn default() -> Self {
50+
Self::new()
51+
}
52+
}
53+
54+
impl SparkHour {
55+
pub fn new() -> Self {
56+
Self {
57+
signature: extract_signature(),
58+
}
59+
}
60+
}
61+
62+
impl ScalarUDFImpl for SparkHour {
63+
fn as_any(&self) -> &dyn Any {
64+
self
65+
}
66+
67+
fn name(&self) -> &str {
68+
"hour"
69+
}
70+
71+
fn signature(&self) -> &Signature {
72+
&self.signature
73+
}
74+
75+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
76+
Ok(DataType::Int32)
77+
}
78+
79+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80+
make_scalar_function(spark_hour, vec![])(&args.args)
81+
}
82+
}
83+
84+
fn spark_hour(args: &[ArrayRef]) -> Result<ArrayRef> {
85+
let [ts_arg] = take_function_args("hour", args)?;
86+
let result = date_part(ts_arg.as_ref(), DatePart::Hour)?;
87+
Ok(result)
88+
}
89+
90+
// -----------------------------------------------------------------------------
91+
// SparkMinute
92+
// -----------------------------------------------------------------------------
93+
94+
#[derive(Debug, PartialEq, Eq, Hash)]
95+
pub struct SparkMinute {
96+
signature: Signature,
97+
}
98+
99+
impl Default for SparkMinute {
100+
fn default() -> Self {
101+
Self::new()
102+
}
103+
}
104+
105+
impl SparkMinute {
106+
pub fn new() -> Self {
107+
Self {
108+
signature: extract_signature(),
109+
}
110+
}
111+
}
112+
113+
impl ScalarUDFImpl for SparkMinute {
114+
fn as_any(&self) -> &dyn Any {
115+
self
116+
}
117+
118+
fn name(&self) -> &str {
119+
"minute"
120+
}
121+
122+
fn signature(&self) -> &Signature {
123+
&self.signature
124+
}
125+
126+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
127+
Ok(DataType::Int32)
128+
}
129+
130+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
131+
make_scalar_function(spark_minute, vec![])(&args.args)
132+
}
133+
}
134+
135+
fn spark_minute(args: &[ArrayRef]) -> Result<ArrayRef> {
136+
let [ts_arg] = take_function_args("minute", args)?;
137+
let result = date_part(ts_arg.as_ref(), DatePart::Minute)?;
138+
Ok(result)
139+
}
140+
141+
// -----------------------------------------------------------------------------
142+
// SparkSecond
143+
// -----------------------------------------------------------------------------
144+
145+
#[derive(Debug, PartialEq, Eq, Hash)]
146+
pub struct SparkSecond {
147+
signature: Signature,
148+
}
149+
150+
impl Default for SparkSecond {
151+
fn default() -> Self {
152+
Self::new()
153+
}
154+
}
155+
156+
impl SparkSecond {
157+
pub fn new() -> Self {
158+
Self {
159+
signature: extract_signature(),
160+
}
161+
}
162+
}
163+
164+
impl ScalarUDFImpl for SparkSecond {
165+
fn as_any(&self) -> &dyn Any {
166+
self
167+
}
168+
169+
fn name(&self) -> &str {
170+
"second"
171+
}
172+
173+
fn signature(&self) -> &Signature {
174+
&self.signature
175+
}
176+
177+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
178+
Ok(DataType::Int32)
179+
}
180+
181+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
182+
make_scalar_function(spark_second, vec![])(&args.args)
183+
}
184+
}
185+
186+
fn spark_second(args: &[ArrayRef]) -> Result<ArrayRef> {
187+
let [ts_arg] = take_function_args("second", args)?;
188+
let result = date_part(ts_arg.as_ref(), DatePart::Second)?;
189+
Ok(result)
190+
}
191+
192+
#[cfg(test)]
193+
mod tests {
194+
use super::*;
195+
use arrow::array::{Array, Int32Array, TimestampMicrosecondArray};
196+
use arrow::datatypes::TimeUnit;
197+
use std::sync::Arc;
198+
199+
#[test]
200+
fn test_spark_hour() {
201+
// Create a timestamp array: 2024-01-15 14:30:45 UTC (in microseconds)
202+
// 14:30:45 -> hour = 14
203+
let ts_micros = 1_705_329_045_000_000_i64; // 2024-01-15 14:30:45 UTC
204+
let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
205+
let ts_array = Arc::new(ts_array) as ArrayRef;
206+
207+
let result = spark_hour(&[ts_array]).unwrap();
208+
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
209+
210+
assert_eq!(result.value(0), 14);
211+
assert!(result.is_null(1));
212+
}
213+
214+
#[test]
215+
fn test_spark_minute() {
216+
// 14:30:45 -> minute = 30
217+
let ts_micros = 1_705_329_045_000_000_i64;
218+
let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
219+
let ts_array = Arc::new(ts_array) as ArrayRef;
220+
221+
let result = spark_minute(&[ts_array]).unwrap();
222+
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
223+
224+
assert_eq!(result.value(0), 30);
225+
assert!(result.is_null(1));
226+
}
227+
228+
#[test]
229+
fn test_spark_second() {
230+
// 14:30:45 -> second = 45
231+
let ts_micros = 1_705_329_045_000_000_i64;
232+
let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
233+
let ts_array = Arc::new(ts_array) as ArrayRef;
234+
235+
let result = spark_second(&[ts_array]).unwrap();
236+
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
237+
238+
assert_eq!(result.value(0), 45);
239+
assert!(result.is_null(1));
240+
}
241+
242+
#[test]
243+
fn test_hour_return_type() {
244+
let func = SparkHour::new();
245+
let result = func
246+
.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
247+
.unwrap();
248+
assert_eq!(result, DataType::Int32);
249+
}
250+
251+
#[test]
252+
fn test_minute_return_type() {
253+
let func = SparkMinute::new();
254+
let result = func
255+
.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
256+
.unwrap();
257+
assert_eq!(result, DataType::Int32);
258+
}
259+
260+
#[test]
261+
fn test_second_return_type() {
262+
let func = SparkSecond::new();
263+
let result = func
264+
.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
265+
.unwrap();
266+
assert_eq!(result, DataType::Int32);
267+
}
268+
}

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
pub mod date_add;
1919
pub mod date_sub;
20+
pub mod extract;
2021
pub mod last_day;
2122
pub mod make_dt_interval;
2223
pub mod make_interval;
@@ -28,6 +29,9 @@ use std::sync::Arc;
2829

2930
make_udf_function!(date_add::SparkDateAdd, date_add);
3031
make_udf_function!(date_sub::SparkDateSub, date_sub);
32+
make_udf_function!(extract::SparkHour, hour);
33+
make_udf_function!(extract::SparkMinute, minute);
34+
make_udf_function!(extract::SparkSecond, second);
3135
make_udf_function!(last_day::SparkLastDay, last_day);
3236
make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval);
3337
make_udf_function!(make_interval::SparkMakeInterval, make_interval);
@@ -46,6 +50,17 @@ pub mod expr_fn {
4650
"Returns the date that is days days before start. The function returns NULL if at least one of the input parameters is NULL.",
4751
arg1 arg2
4852
));
53+
export_functions!((hour, "Extracts the hour component of a timestamp.", arg1));
54+
export_functions!((
55+
minute,
56+
"Extracts the minute component of a timestamp.",
57+
arg1
58+
));
59+
export_functions!((
60+
second,
61+
"Extracts the second component of a timestamp.",
62+
arg1
63+
));
4964
export_functions!((
5065
last_day,
5166
"Returns the last day of the month which the date belongs to.",
@@ -74,6 +89,9 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
7489
vec![
7590
date_add(),
7691
date_sub(),
92+
hour(),
93+
minute(),
94+
second(),
7795
last_day(),
7896
make_dt_interval(),
7997
make_interval(),

datafusion/sqllogictest/test_files/spark/datetime/hour.slt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,24 @@
2323

2424
## Original Query: SELECT hour('2009-07-30 12:58:59');
2525
## PySpark 3.5.5 Result: {'hour(2009-07-30 12:58:59)': 12, 'typeof(hour(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'}
26-
#query
27-
#SELECT hour('2009-07-30 12:58:59'::string);
26+
query I
27+
SELECT hour('2009-07-30 12:58:59'::timestamp);
28+
----
29+
12
30+
31+
# Test with different hours
32+
query I
33+
SELECT hour('2009-07-30 00:00:00'::timestamp);
34+
----
35+
0
36+
37+
query I
38+
SELECT hour('2009-07-30 23:59:59'::timestamp);
39+
----
40+
23
41+
42+
# Test with NULL
43+
query I
44+
SELECT hour(NULL::timestamp);
45+
----
46+
NULL

datafusion/sqllogictest/test_files/spark/datetime/minute.slt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,24 @@
2323

2424
## Original Query: SELECT minute('2009-07-30 12:58:59');
2525
## PySpark 3.5.5 Result: {'minute(2009-07-30 12:58:59)': 58, 'typeof(minute(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'}
26-
#query
27-
#SELECT minute('2009-07-30 12:58:59'::string);
26+
query I
27+
SELECT minute('2009-07-30 12:58:59'::timestamp);
28+
----
29+
58
30+
31+
# Test with different minutes
32+
query I
33+
SELECT minute('2009-07-30 12:00:00'::timestamp);
34+
----
35+
0
36+
37+
query I
38+
SELECT minute('2009-07-30 12:59:59'::timestamp);
39+
----
40+
59
41+
42+
# Test with NULL
43+
query I
44+
SELECT minute(NULL::timestamp);
45+
----
46+
NULL

datafusion/sqllogictest/test_files/spark/datetime/second.slt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,24 @@
2323

2424
## Original Query: SELECT second('2009-07-30 12:58:59');
2525
## PySpark 3.5.5 Result: {'second(2009-07-30 12:58:59)': 59, 'typeof(second(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'}
26-
#query
27-
#SELECT second('2009-07-30 12:58:59'::string);
26+
query I
27+
SELECT second('2009-07-30 12:58:59'::timestamp);
28+
----
29+
59
30+
31+
# Test with different seconds
32+
query I
33+
SELECT second('2009-07-30 12:58:00'::timestamp);
34+
----
35+
0
36+
37+
query I
38+
SELECT second('2009-07-30 12:58:30'::timestamp);
39+
----
40+
30
41+
42+
# Test with NULL
43+
query I
44+
SELECT second(NULL::timestamp);
45+
----
46+
NULL

0 commit comments

Comments
 (0)