Skip to content

Commit 28e042d

Browse files
authored
feat(spark): implement spark datetime function date_add/date_sub (#17024)
* feat: spark date_add/date_sub * fix
1 parent ceab1e1 commit 28e042d

File tree

6 files changed

+392
-7
lines changed

6 files changed

+392
-7
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::ArrayRef;
22+
use arrow::compute;
23+
use arrow::datatypes::{DataType, Date32Type};
24+
use datafusion_common::cast::{
25+
as_date32_array, as_int16_array, as_int32_array, as_int8_array,
26+
};
27+
use datafusion_common::{internal_err, Result};
28+
use datafusion_expr::{
29+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
30+
Volatility,
31+
};
32+
use datafusion_functions::utils::make_scalar_function;
33+
34+
#[derive(Debug)]
35+
pub struct SparkDateAdd {
36+
signature: Signature,
37+
aliases: Vec<String>,
38+
}
39+
40+
impl Default for SparkDateAdd {
41+
fn default() -> Self {
42+
Self::new()
43+
}
44+
}
45+
46+
impl SparkDateAdd {
47+
pub fn new() -> Self {
48+
Self {
49+
signature: Signature::one_of(
50+
vec![
51+
TypeSignature::Exact(vec![DataType::Date32, DataType::Int8]),
52+
TypeSignature::Exact(vec![DataType::Date32, DataType::Int16]),
53+
TypeSignature::Exact(vec![DataType::Date32, DataType::Int32]),
54+
],
55+
Volatility::Immutable,
56+
),
57+
aliases: vec!["dateadd".to_string()],
58+
}
59+
}
60+
}
61+
62+
impl ScalarUDFImpl for SparkDateAdd {
63+
fn as_any(&self) -> &dyn Any {
64+
self
65+
}
66+
67+
fn name(&self) -> &str {
68+
"date_add"
69+
}
70+
71+
fn aliases(&self) -> &[String] {
72+
&self.aliases
73+
}
74+
75+
fn signature(&self) -> &Signature {
76+
&self.signature
77+
}
78+
79+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
80+
Ok(DataType::Date32)
81+
}
82+
83+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
84+
make_scalar_function(spark_date_add, vec![])(&args.args)
85+
}
86+
}
87+
88+
fn spark_date_add(args: &[ArrayRef]) -> Result<ArrayRef> {
89+
let [date_arg, days_arg] = args else {
90+
return internal_err!(
91+
"Spark `date_add` function requires 2 arguments, got {}",
92+
args.len()
93+
);
94+
};
95+
let date_array = as_date32_array(date_arg)?;
96+
let result = match days_arg.data_type() {
97+
DataType::Int8 => {
98+
let days_array = as_int8_array(days_arg)?;
99+
compute::binary::<_, _, _, Date32Type>(
100+
date_array,
101+
days_array,
102+
|date, days| date + days as i32,
103+
)?
104+
}
105+
DataType::Int16 => {
106+
let days_array = as_int16_array(days_arg)?;
107+
compute::binary::<_, _, _, Date32Type>(
108+
date_array,
109+
days_array,
110+
|date, days| date + days as i32,
111+
)?
112+
}
113+
DataType::Int32 => {
114+
let days_array = as_int32_array(days_arg)?;
115+
compute::binary::<_, _, _, Date32Type>(
116+
date_array,
117+
days_array,
118+
|date, days| date + days,
119+
)?
120+
}
121+
_ => {
122+
return internal_err!(
123+
"Spark `date_add` function: argument must be int8, int16, int32, got {:?}",
124+
days_arg.data_type()
125+
);
126+
}
127+
};
128+
Ok(Arc::new(result))
129+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::ArrayRef;
22+
use arrow::compute;
23+
use arrow::datatypes::{DataType, Date32Type};
24+
use datafusion_common::cast::{
25+
as_date32_array, as_int16_array, as_int32_array, as_int8_array,
26+
};
27+
use datafusion_common::{internal_err, Result};
28+
use datafusion_expr::{
29+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
30+
Volatility,
31+
};
32+
use datafusion_functions::utils::make_scalar_function;
33+
34+
#[derive(Debug)]
35+
pub struct SparkDateSub {
36+
signature: Signature,
37+
}
38+
39+
impl Default for SparkDateSub {
40+
fn default() -> Self {
41+
Self::new()
42+
}
43+
}
44+
45+
impl SparkDateSub {
46+
pub fn new() -> Self {
47+
Self {
48+
signature: Signature::one_of(
49+
vec![
50+
TypeSignature::Exact(vec![DataType::Date32, DataType::Int8]),
51+
TypeSignature::Exact(vec![DataType::Date32, DataType::Int16]),
52+
TypeSignature::Exact(vec![DataType::Date32, DataType::Int32]),
53+
],
54+
Volatility::Immutable,
55+
),
56+
}
57+
}
58+
}
59+
60+
impl ScalarUDFImpl for SparkDateSub {
61+
fn as_any(&self) -> &dyn Any {
62+
self
63+
}
64+
65+
fn name(&self) -> &str {
66+
"date_sub"
67+
}
68+
69+
fn signature(&self) -> &Signature {
70+
&self.signature
71+
}
72+
73+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
74+
Ok(DataType::Date32)
75+
}
76+
77+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
78+
make_scalar_function(spark_date_sub, vec![])(&args.args)
79+
}
80+
}
81+
82+
fn spark_date_sub(args: &[ArrayRef]) -> Result<ArrayRef> {
83+
let [date_arg, days_arg] = args else {
84+
return internal_err!(
85+
"Spark `date_sub` function requires 2 arguments, got {}",
86+
args.len()
87+
);
88+
};
89+
let date_array = as_date32_array(date_arg)?;
90+
let result = match days_arg.data_type() {
91+
DataType::Int8 => {
92+
let days_array = as_int8_array(days_arg)?;
93+
compute::binary::<_, _, _, Date32Type>(
94+
date_array,
95+
days_array,
96+
|date, days| date - days as i32,
97+
)?
98+
}
99+
DataType::Int16 => {
100+
let days_array = as_int16_array(days_arg)?;
101+
compute::binary::<_, _, _, Date32Type>(
102+
date_array,
103+
days_array,
104+
|date, days| date - days as i32,
105+
)?
106+
}
107+
DataType::Int32 => {
108+
let days_array = as_int32_array(days_arg)?;
109+
compute::binary::<_, _, _, Date32Type>(
110+
date_array,
111+
days_array,
112+
|date, days| date - days,
113+
)?
114+
}
115+
_ => {
116+
return internal_err!(
117+
"Spark `date_add` function: argument must be int8, int16, int32, got {:?}",
118+
days_arg.data_type()
119+
);
120+
}
121+
};
122+
Ok(Arc::new(result))
123+
}

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,33 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod date_add;
19+
pub mod date_sub;
1820
pub mod last_day;
1921
pub mod next_day;
2022

2123
use datafusion_expr::ScalarUDF;
2224
use datafusion_functions::make_udf_function;
2325
use std::sync::Arc;
2426

27+
make_udf_function!(date_add::SparkDateAdd, date_add);
28+
make_udf_function!(date_sub::SparkDateSub, date_sub);
2529
make_udf_function!(last_day::SparkLastDay, last_day);
2630
make_udf_function!(next_day::SparkNextDay, next_day);
2731

2832
pub mod expr_fn {
2933
use datafusion_functions::export_functions;
3034

35+
export_functions!((
36+
date_add,
37+
"Returns the date that is days days after start. The function returns NULL if at least one of the input parameters is NULL.",
38+
arg1 arg2
39+
));
40+
export_functions!((
41+
date_sub,
42+
"Returns the date that is days days before start. The function returns NULL if at least one of the input parameters is NULL.",
43+
arg1 arg2
44+
));
3145
export_functions!((
3246
last_day,
3347
"Returns the last day of the month which the date belongs to.",
@@ -43,5 +57,5 @@ pub mod expr_fn {
4357
}
4458

4559
pub fn functions() -> Vec<Arc<ScalarUDF>> {
46-
vec![last_day(), next_day()]
60+
vec![date_add(), date_sub(), last_day(), next_day()]
4761
}

datafusion/sqllogictest/test_files/spark/datetime/date_add.slt

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,46 @@
2323

2424
## Original Query: SELECT date_add('2016-07-30', 1);
2525
## PySpark 3.5.5 Result: {'date_add(2016-07-30, 1)': datetime.date(2016, 7, 31), 'typeof(date_add(2016-07-30, 1))': 'date', 'typeof(2016-07-30)': 'string', 'typeof(1)': 'int'}
26-
#query
27-
#SELECT date_add('2016-07-30'::string, 1::int);
26+
27+
# Basic date_add tests
28+
query D
29+
SELECT date_add('2016-07-30'::date, 1::int);
30+
----
31+
2016-07-31
32+
33+
query D
34+
SELECT date_add('2016-07-30'::date, arrow_cast(1, 'Int8'));
35+
----
36+
2016-07-31
37+
38+
query D
39+
SELECT date_add('2016-07-30'::date, arrow_cast(1, 'Int8'));
40+
----
41+
2016-07-31
42+
43+
query D
44+
SELECT date_sub('2016-07-30'::date, 0::int);
45+
----
46+
2016-07-30
47+
48+
# Test with negative day values (should subtract days)
49+
query D
50+
SELECT date_add('2016-07-30'::date, -5::int);
51+
----
52+
2016-07-25
53+
54+
# Test with NULL values
55+
query D
56+
SELECT date_add(NULL::date, 1::int);
57+
----
58+
NULL
59+
60+
query D
61+
SELECT date_add('2016-07-30'::date, NULL::int);
62+
----
63+
NULL
64+
65+
query D
66+
SELECT date_add(NULL::date, NULL::int);
67+
----
68+
NULL

datafusion/sqllogictest/test_files/spark/datetime/date_sub.slt

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,51 @@
2323

2424
## Original Query: SELECT date_sub('2016-07-30', 1);
2525
## PySpark 3.5.5 Result: {'date_sub(2016-07-30, 1)': datetime.date(2016, 7, 29), 'typeof(date_sub(2016-07-30, 1))': 'date', 'typeof(2016-07-30)': 'string', 'typeof(1)': 'int'}
26-
#query
27-
#SELECT date_sub('2016-07-30'::string, 1::int);
26+
27+
# Basic date_sub tests
28+
query D
29+
SELECT date_sub('2016-07-30'::date, 1::int);
30+
----
31+
2016-07-29
32+
33+
query D
34+
SELECT date_sub('2016-07-30'::date, arrow_cast(1, 'Int8'));
35+
----
36+
2016-07-29
37+
38+
query D
39+
SELECT date_sub('2016-07-30'::date, arrow_cast(1, 'Int16'));
40+
----
41+
2016-07-29
42+
43+
query D
44+
SELECT date_sub('2016-07-30'::date, 0::int);
45+
----
46+
2016-07-30
47+
48+
# Test with negative day values (should add days)
49+
query D
50+
SELECT date_sub('2016-07-30'::date, -1::int);
51+
----
52+
2016-07-31
53+
54+
query D
55+
SELECT date_sub('2016-07-30'::date, -5::int);
56+
----
57+
2016-08-04
58+
59+
# Test with NULL values
60+
query D
61+
SELECT date_sub(NULL::date, 1::int);
62+
----
63+
NULL
64+
65+
query D
66+
SELECT date_sub('2016-07-30'::date, NULL::int);
67+
----
68+
NULL
69+
70+
query D
71+
SELECT date_sub(NULL::date, NULL::int);
72+
----
73+
NULL

0 commit comments

Comments
 (0)