Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datafusion/functions/src/datetime/date_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ use datafusion_macros::user_doc;
argument(
name = "part",
description = r#"Part of the date to return. The following date parts are supported:

- year
- isoyear (ISO 8601 week-numbering year)
- quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
- month
- week (week of the year)
Expand Down Expand Up @@ -215,6 +216,7 @@ impl ScalarUDFImpl for DatePartFunc {
} else {
// special cases that can be extracted (in postgres) but are not interval units
match part_trim.to_lowercase().as_str() {
"isoyear" => date_part(array.as_ref(), DatePart::YearISO)?,
"qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
"doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
"dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
Expand Down
139 changes: 139 additions & 0 deletions datafusion/spark/src/function/datetime/date_part.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::types::logical_date;
use datafusion_common::{
Result, ScalarValue, internal_err, types::logical_string, utils::take_function_args,
};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
use datafusion_expr::{
Coercion, ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
Signature, TypeSignature, TypeSignatureClass, Volatility,
};
use std::{any::Any, sync::Arc};

/// Wrapper around datafusion date_part function to handle
/// Spark behavior returning day of the week 1-indexed instead of 0-indexed and different part aliases.
/// <https://spark.apache.org/docs/latest/api/sql/index.html#date_part>
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkDatePart {
signature: Signature,
aliases: Vec<String>,
}

impl Default for SparkDatePart {
fn default() -> Self {
Self::new()
}
}

impl SparkDatePart {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Coercible(vec![
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
Coercion::new_exact(TypeSignatureClass::Timestamp),
]),
TypeSignature::Coercible(vec![
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
]),
],
Volatility::Immutable,
),
aliases: vec![String::from("datepart")],
}
}
}

impl ScalarUDFImpl for SparkDatePart {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"date_part"
}

fn aliases(&self) -> &[String] {
&self.aliases
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
internal_err!("Use return_field_from_args in this case instead.")
}

fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
let nullable = args.arg_fields.iter().any(|f| f.is_nullable());

Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable)))
}

fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
internal_err!("spark date_part should have been simplified to standard date_part")
}

fn simplify(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we're using simplify here 👍

&self,
args: Vec<Expr>,
_info: &SimplifyContext,
) -> Result<ExprSimplifyResult> {
let [part_expr, date_expr] = take_function_args(self.name(), args)?;

let part = match part_expr.as_literal() {
Some(ScalarValue::Utf8(Some(v)))
| Some(ScalarValue::Utf8View(Some(v)))
| Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(),
_ => {
return internal_err!(
"First argument of `DATE_PART` must be non-null scalar Utf8"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as DF date_part, part is a literal

"First argument of `DATE_PART` must be non-null scalar Utf8"

);
}
};

// Map Spark-specific date part aliases to datafusion ones
let part = match part.as_str() {
"yearofweek" | "year_iso" => "isoyear",
"dayofweek" => "dow",
"dayofweek_iso" | "dow_iso" => "isodow",
other => other,
};

let part_expr = Expr::Literal(ScalarValue::new_utf8(part), None);

let date_part_expr = Expr::ScalarFunction(ScalarFunction::new_udf(
datafusion_functions::datetime::date_part(),
vec![part_expr, date_expr],
));
Comment on lines +126 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern is if the nullability of the output field will match here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you're right, should we update

fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
to be nullable depending on the inputs ?


match part {
// Add 1 for day-of-week parts to convert 0-indexed to 1-indexed
"dow" | "isodow" => Ok(ExprSimplifyResult::Simplified(
date_part_expr + Expr::Literal(ScalarValue::Int32(Some(1)), None),
)),
_ => Ok(ExprSimplifyResult::Simplified(date_part_expr)),
}
}
}
8 changes: 8 additions & 0 deletions datafusion/spark/src/function/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

pub mod date_add;
pub mod date_part;
pub mod date_sub;
pub mod extract;
pub mod last_day;
Expand All @@ -36,6 +37,7 @@ make_udf_function!(last_day::SparkLastDay, last_day);
make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval);
make_udf_function!(make_interval::SparkMakeInterval, make_interval);
make_udf_function!(next_day::SparkNextDay, next_day);
make_udf_function!(date_part::SparkDatePart, date_part);

pub mod expr_fn {
use datafusion_functions::export_functions;
Expand Down Expand Up @@ -83,6 +85,11 @@ pub mod expr_fn {
"Returns the first date which is later than start_date and named as indicated. The function returns NULL if at least one of the input parameters is NULL.",
arg1 arg2
));
export_functions!((
date_part,
"Extracts a part of the date or time from a date, time, or timestamp expression.",
arg1 arg2
));
}

pub fn functions() -> Vec<Arc<ScalarUDF>> {
Expand All @@ -96,5 +103,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
make_dt_interval(),
make_interval(),
next_day(),
date_part(),
]
}
19 changes: 19 additions & 0 deletions datafusion/spark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,28 @@
//! let expr = sha2(col("my_data"), lit(256));
//! ```
//!
//! # Example: using the Spark expression planner
//!
//! The [`planner::SparkFunctionPlanner`] provides Spark-compatible expression
//! planning, such as mapping SQL `EXTRACT` expressions to Spark's `date_part`
//! function. To use it, register it with your session context:
//!
//! ```ignore
//! use std::sync::Arc;
//! use datafusion::prelude::SessionContext;
//! use datafusion_spark::planner::SparkFunctionPlanner;
//!
//! let mut ctx = SessionContext::new();
//! // Register the Spark expression planner
//! ctx.register_expr_planner(Arc::new(SparkFunctionPlanner))?;
//! // Now EXTRACT expressions will use Spark semantics
//! let df = ctx.sql("SELECT EXTRACT(YEAR FROM timestamp_col) FROM my_table").await?;
//! ```
//!
//![`Expr`]: datafusion_expr::Expr

pub mod function;
pub mod planner;

use datafusion_catalog::TableFunction;
use datafusion_common::Result;
Expand Down
34 changes: 34 additions & 0 deletions datafusion/spark/src/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_expr::Expr;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::planner::{ExprPlanner, PlannerResult};

#[derive(Default, Debug)]
pub struct SparkFunctionPlanner;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're including this planner now, I feel we should update the lib docs with an example of using this

https://github.com/apache/datafusion/blob/main/datafusion/spark/src/lib.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I can do that. I also think it would be nice to provide a way to register the expr planner and the udfs at the same time with something like

pub fn with_default_features(mut self) -> Self {
.
we could do a with_spark_features ? could track that in a separate issue/PR


impl ExprPlanner for SparkFunctionPlanner {
fn plan_extract(
&self,
args: Vec<Expr>,
) -> datafusion_common::Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::function::datetime::date_part(), args),
)))
}
}
15 changes: 11 additions & 4 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,18 @@ impl TestContext {
// hardcode target partitions so plans are deterministic
.with_target_partitions(4);
let runtime = Arc::new(RuntimeEnv::default());
let mut state = SessionStateBuilder::new()

let mut state_builder = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
.build();
.with_runtime_env(runtime);

if is_spark_path(relative_path) {
state_builder = state_builder.with_expr_planners(vec![Arc::new(
datafusion_spark::planner::SparkFunctionPlanner,
)]);
}

let mut state = state_builder.with_default_features().build();

if is_spark_path(relative_path) {
info!("Registering Spark functions");
Expand Down
51 changes: 51 additions & 0 deletions datafusion/sqllogictest/test_files/datetime/date_part.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ SELECT date_part('year', ts_nano_no_tz), date_part('year', ts_nano_utc), date_pa
2020 2020 2019 2020 2020 2019
2020 2020 2019 2020 2020 2019

# date_part (isoyear) with columns and explicit timestamp
query IIIIII
SELECT date_part('isoyear', ts_nano_no_tz), date_part('isoyear', ts_nano_utc), date_part('isoyear', ts_nano_eastern), date_part('isoyear', ts_milli_no_tz), date_part('isoyear', ts_milli_utc), date_part('isoyear', ts_milli_eastern) FROM source_ts;
----
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020
2020 2020 2020 2020 2020 2020


# date_part (month)
query IIIIII
SELECT date_part('month', ts_nano_no_tz), date_part('month', ts_nano_utc), date_part('month', ts_nano_eastern), date_part('month', ts_milli_no_tz), date_part('month', ts_milli_utc), date_part('month', ts_milli_eastern) FROM source_ts;
Expand Down Expand Up @@ -228,6 +245,26 @@ SELECT EXTRACT('year' FROM timestamp '2020-09-08T12:00:00+00:00')
----
2020

query I
SELECT date_part('ISOYEAR', CAST('2000-01-01' AS DATE))
----
1999

query I
SELECT EXTRACT(isoyear FROM timestamp '2020-09-08T12:00:00+00:00')
----
2020

query I
SELECT EXTRACT("isoyear" FROM timestamp '2020-09-08T12:00:00+00:00')
----
2020

query I
SELECT EXTRACT('isoyear' FROM timestamp '2020-09-08T12:00:00+00:00')
----
2020

query I
SELECT date_part('QUARTER', CAST('2000-01-01' AS DATE))
----
Expand Down Expand Up @@ -865,9 +902,15 @@ SELECT extract(month from arrow_cast('20 months', 'Interval(YearMonth)'))
----
8

query error DataFusion error: Arrow error: Compute error: YearISO does not support: Interval\(YearMonth\)
SELECT extract(isoyear from arrow_cast('10 years', 'Interval(YearMonth)'))

query error DataFusion error: Arrow error: Compute error: Year does not support: Interval\(DayTime\)
SELECT extract(year from arrow_cast('10 days', 'Interval(DayTime)'))

query error DataFusion error: Arrow error: Compute error: YearISO does not support: Interval\(DayTime\)
SELECT extract(isoyear from arrow_cast('10 days', 'Interval(DayTime)'))

query error DataFusion error: Arrow error: Compute error: Month does not support: Interval\(DayTime\)
SELECT extract(month from arrow_cast('10 days', 'Interval(DayTime)'))

Expand Down Expand Up @@ -1011,6 +1054,9 @@ SELECT extract(month from arrow_cast(864000, 'Duration(Second)'))
query error DataFusion error: Arrow error: Compute error: Year does not support: Duration\(s\)
SELECT extract(year from arrow_cast(864000, 'Duration(Second)'))

query error DataFusion error: Arrow error: Compute error: YearISO does not support: Duration\(s\)
SELECT extract(isoyear from arrow_cast(864000, 'Duration(Second)'))

query I
SELECT extract(day from arrow_cast(NULL, 'Duration(Second)'))
----
Expand All @@ -1023,6 +1069,11 @@ SELECT (date_part('year', now()) = EXTRACT(year FROM now()))
----
true

query B
SELECT (date_part('isoyear', now()) = EXTRACT(isoyear FROM now()))
----
true

query B
SELECT (date_part('quarter', now()) = EXTRACT(quarter FROM now()))
----
Expand Down
Loading
Loading