Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ use datafusion_expr::{
planner::ExprPlanner,
Expr, UserDefinedLogicalNode, WindowUDF,
};
use datafusion_functions::datetime::now::NowFunc;
use datafusion_functions::{init_udf_with_config, make_udf_function_with_config};
use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion_optimizer::Analyzer;
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
Expand Down Expand Up @@ -1072,6 +1074,16 @@ impl SessionContext {
} else {
let mut state = self.state.write();
state.config_mut().options_mut().set(&variable, &value)?;

// Register UDFs that return values based on session configuration
// e.g. now() which depends on the time_zone configuration option
if variable == "datafusion.execution.time_zone" {
let config_options = state.config().options();
let udf = state.udf("now")?;
let now_udf = init_udf_with_config!(udf.clone(), config_options, NowFunc);
state.register_udf(now_udf)?;
}

drop(state);
}

Expand Down
22 changes: 19 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ use datafusion_expr::TableSource;
use datafusion_expr::{
AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, WindowUDF,
};
use datafusion_functions::datetime::now::NowFunc;
use datafusion_functions::init_udf_with_config;
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
Expand Down Expand Up @@ -1420,9 +1422,23 @@ impl SessionStateBuilder {

if let Some(scalar_functions) = scalar_functions {
scalar_functions.into_iter().for_each(|udf| {
let existing_udf = state.register_udf(udf);
if let Ok(Some(existing_udf)) = existing_udf {
debug!("Overwrote an existing UDF: {}", existing_udf.name());
if udf.need_config() {
// Register now() with the configured timezone
// The now() function depends on the timezone configuration, so we need to
// register it after the config is set to ensure it uses the correct timezone
let new_udf = init_udf_with_config!(
udf.clone(),
state.config.options(),
NowFunc
);
if let Err(e) = state.register_udf(new_udf) {
info!("Unable to register UDF: {e}")
};
} else {
let existing_udf = state.register_udf(udf);
if let Ok(Some(existing_udf)) = existing_udf {
debug!("Overwrote an existing UDF: {}", existing_udf.name());
}
}
});
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/expr_api/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,7 @@ fn multiple_now() -> Result<()> {
// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
\n TableScan: test",
"Projection: TimestampNanosecond({}, Some(\"+00\")) AS now(), TimestampNanosecond({}, Some(\"+00\")) AS t2\n TableScan: test",
time.timestamp_nanos_opt().unwrap(),
time.timestamp_nanos_opt().unwrap()
);
Expand Down
9 changes: 9 additions & 0 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ impl ScalarUDF {
self.inner.return_type(arg_types)
}

pub fn need_config(&self) -> bool {
self.inner.need_config()
}

/// Return the datatype this function returns given the input argument types.
///
/// See [`ScalarUDFImpl::return_field_from_args`] for more details.
Expand Down Expand Up @@ -532,6 +536,11 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
/// [`DataFusionError::Internal`]: datafusion_common::DataFusionError::Internal
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;


fn need_config(&self) -> bool {
false
}

/// What type will be returned by this function, given the arguments?
///
/// By default, this function calls [`Self::return_type`] with the
Expand Down
8 changes: 6 additions & 2 deletions datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ make_udf_function!(date_part::DatePartFunc, date_part);
make_udf_function!(date_trunc::DateTruncFunc, date_trunc);
make_udf_function!(make_date::MakeDateFunc, make_date);
make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime);
make_udf_function!(now::NowFunc, now);
make_udf_function!(to_char::ToCharFunc, to_char);
make_udf_function!(to_date::ToDateFunc, to_date);
make_udf_function!(to_local_time::ToLocalTimeFunc, to_local_time);
Expand All @@ -56,6 +55,9 @@ make_udf_function!(to_timestamp::ToTimestampMillisFunc, to_timestamp_millis);
make_udf_function!(to_timestamp::ToTimestampMicrosFunc, to_timestamp_micros);
make_udf_function!(to_timestamp::ToTimestampNanosFunc, to_timestamp_nanos);

// create UDF with config
make_udf_function_with_config!(now::NowFunc, now);

// we cannot currently use the export_functions macro since it doesn't handle
// functions with varargs currently

Expand Down Expand Up @@ -91,6 +93,7 @@ pub mod expr_fn {
),(
now,
"returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement",
@config
),
(
to_local_time,
Expand Down Expand Up @@ -255,6 +258,7 @@ pub mod expr_fn {

/// Returns all DataFusion functions defined in this package
pub fn functions() -> Vec<Arc<ScalarUDF>> {
use datafusion_common::config::ConfigOptions;
vec![
current_date(),
current_time(),
Expand All @@ -263,7 +267,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
date_trunc(),
from_unixtime(),
make_date(),
now(),
now(&ConfigOptions::default()),
to_char(),
to_date(),
to_local_time(),
Expand Down
21 changes: 19 additions & 2 deletions datafusion/functions/src/datetime/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use arrow::datatypes::DataType::Timestamp;
use arrow::datatypes::TimeUnit::Nanosecond;
use arrow::datatypes::{DataType, Field, FieldRef};
use std::any::Any;
use std::sync::Arc;

use datafusion_common::config::ConfigOptions;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
Expand All @@ -41,6 +43,7 @@ The `now()` return value is determined at query time and will return the same ti
pub struct NowFunc {
signature: Signature,
aliases: Vec<String>,
timezone: Option<Arc<str>>,
}

impl Default for NowFunc {
Expand All @@ -54,6 +57,15 @@ impl NowFunc {
Self {
signature: Signature::nullary(Volatility::Stable),
aliases: vec!["current_timestamp".to_string()],
timezone: Some(Arc::from("+00")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NowFunc::new() (and therefore Default) now hardcodes the timezone as "+00".
Everywhere else in the codebase — including the default ConfigOptions and existing sqllogictests — expects the canonical "+00:00" form.
Won't this mean that any downstream caller that still constructs the UDF via NowFunc::new()/Default (for example, when registering their own function registry) now get a different, non-canonical offset that will no longer compare equal to the rest of the system?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could remove new(), because now relies on config for the initialization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can someone explain the implications of this change on downstream crates? Does it mean that that now() will always uses the current timezone from the ConfigOptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Yes, this is the implication

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb With this implementation, yes. To me the logical solution is to allow the time_zone config option to be an Option (and default to None) and fix any existing usage to handle that ... or to detect when the time_zone config option is '' and handle that in the same manner ('' -> None tz)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree

}
}

pub fn new_with_config(config: &ConfigOptions) -> Self {
Self {
signature: Signature::nullary(Volatility::Stable),
aliases: vec!["current_timestamp".to_string()],
timezone: Some(Arc::from(config.execution.time_zone.as_str())),
}
}
}
Expand All @@ -77,10 +89,14 @@ impl ScalarUDFImpl for NowFunc {
&self.signature
}

fn need_config(&self) -> bool {
true
}

fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
Ok(Field::new(
self.name(),
Timestamp(Nanosecond, Some("+00:00".into())),
Timestamp(Nanosecond, self.timezone.clone()),
false,
)
.into())
Expand All @@ -106,8 +122,9 @@ impl ScalarUDFImpl for NowFunc {
.execution_props()
.query_execution_start_time
.timestamp_nanos_opt();

Ok(ExprSimplifyResult::Simplified(Expr::Literal(
ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())),
ScalarValue::TimestampNanosecond(now_ts, self.timezone.clone()),
None,
)))
}
Expand Down
39 changes: 39 additions & 0 deletions datafusion/functions/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
/// Exported functions accept:
/// - `Vec<Expr>` argument (single argument followed by a comma)
/// - Variable number of `Expr` arguments (zero or more arguments, must be without commas)
/// - Functions that require config (marked with `@config` prefix)
#[macro_export]
macro_rules! export_functions {
($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => {
Expand All @@ -49,6 +50,15 @@ macro_rules! export_functions {
)*
};

// function that requires config (marked with @config)
(single $FUNC:ident, $DOC:expr, @config) => {
#[doc = $DOC]
pub fn $FUNC() -> datafusion_expr::Expr {
use datafusion_common::config::ConfigOptions;
super::$FUNC(&ConfigOptions::default()).call(vec![])
}
};

// single vector argument (a single argument followed by a comma)
(single $FUNC:ident, $DOC:expr, $arg:ident,) => {
#[doc = $DOC]
Expand Down Expand Up @@ -89,6 +99,22 @@ macro_rules! make_udf_function {
};
}

/// Creates a singleton `ScalarUDF` of the `$UDF` function and a function
/// named `$NAME` which returns that singleton. The function takes a
/// configuration argument of type `$CONFIG_TYPE` to create the UDF.
#[macro_export]
macro_rules! make_udf_function_with_config {
($UDF:ty, $NAME:ident) => {
#[allow(rustdoc::redundant_explicit_links)]
#[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))]
pub fn $NAME(config: &datafusion_common::config::ConfigOptions) -> std::sync::Arc<datafusion_expr::ScalarUDF> {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$UDF>::new_with_config(&config),
))
}
};
}

/// Macro creates a sub module if the feature is not enabled
///
/// The rationale for providing stub functions is to help users to configure datafusion
Expand Down Expand Up @@ -378,3 +404,16 @@ macro_rules! make_math_binary_udf {
}
};
}

#[macro_export]
macro_rules! init_udf_with_config {
($udf:expr, $config:expr, $func_type:ty) => {{
let mut udf = (*$udf).clone();
if let Some(_) = udf.inner().as_any().downcast_ref::<$func_type>() {
udf = <$func_type>::new_with_config($config).into();
}
let udf = Arc::new(udf);
Arc::clone(&udf)
}};
}

2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/arrow_typeof.slt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Timestamp(Nanosecond, None)
query T
SELECT arrow_typeof(now())
----
Timestamp(Nanosecond, Some("+00:00"))
Timestamp(Nanosecond, Some("+00"))

# arrow_typeof_timestamp_date32(
query T
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/dates.slt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ g
h

## Plan error when compare Utf8 and timestamp in where clause
statement error DataFusion error: type_coercion\ncaused by\nError during planning: Cannot coerce arithmetic expression Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 to valid types
statement error DataFusion error: type_coercion\ncaused by\nError during planning: Cannot coerce arithmetic expression Timestamp\(Nanosecond, Some\("\+00"\)\) \+ Utf8 to valid types
select i_item_desc from test
where d3_date > now() + '5 days';

Expand Down
10 changes: 10 additions & 0 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ true
##########
## Current time Tests
##########
statement ok
SET TIME ZONE = '+08'

query T
select arrow_typeof(now());
----
Timestamp(Nanosecond, Some("+08"))

statement ok
SET TIME ZONE = '+00'

query B
select cast(now() as time) = current_time();
Expand Down
Loading