From 0e75f6aff61213dc41227db459eccfdd7cf04b40 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 11 Oct 2025 12:46:08 +0300 Subject: [PATCH 01/18] fix: Use dynamic timezone in now() function for accurate timestamp retrieval --- datafusion/functions/src/datetime/now.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 65dadb42a89e..040a98fe60f8 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::timezone; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Nanosecond; use arrow::datatypes::{DataType, Field, FieldRef}; @@ -106,8 +107,16 @@ impl ScalarUDFImpl for NowFunc { .execution_props() .query_execution_start_time .timestamp_nanos_opt(); + + let timezone = info + .execution_props() + .config_options + .as_ref() + .map(|opts| opts.execution.time_zone.as_str()) + .unwrap_or("+00:00"); + Ok(ExprSimplifyResult::Simplified(Expr::Literal( - ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())), + ScalarValue::TimestampNanosecond(now_ts, Some(timezone.into())), None, ))) } From 33732c5c855c1435931d75d133bec34ef1cad413 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 11 Oct 2025 15:18:42 +0300 Subject: [PATCH 02/18] fix: tests --- datafusion/functions/src/datetime/now.rs | 5 ++--- datafusion/sqllogictest/test_files/timestamps.slt | 10 ++++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 040a98fe60f8..2e265e45ad4e 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::timezone; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Nanosecond; use arrow::datatypes::{DataType, Field, FieldRef}; @@ -81,7 +80,7 @@ impl ScalarUDFImpl for NowFunc { fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { Ok(Field::new( self.name(), - Timestamp(Nanosecond, Some("+00:00".into())), + Timestamp(Nanosecond, Some("+00".into())), false, ) .into()) @@ -113,7 +112,7 @@ impl ScalarUDFImpl for NowFunc { .config_options .as_ref() .map(|opts| opts.execution.time_zone.as_str()) - .unwrap_or("+00:00"); + .unwrap_or("+00"); Ok(ExprSimplifyResult::Simplified(Expr::Literal( ScalarValue::TimestampNanosecond(now_ts, Some(timezone.into())), diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 1a7ff41d64a6..fc6fcf4e54d2 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -73,6 +73,16 @@ true ########## ## Current time Tests ########## +statement ok +SET TIME ZONE = '+08' + +query T +select arrow_typeof(now()); +---- +Timestamp(Nanosecond, Some("+08")) + +statement ok +SET TIME ZONE = '+00' query B select cast(now() as time) = current_time(); From b81ebf654b83f22faa7036e2b7d8049693bbaee7 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 11 Oct 2025 15:49:26 +0300 Subject: [PATCH 03/18] fix: tests --- datafusion/sqllogictest/test_files/dates.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index 2e91a0363db0..fb4198477525 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -85,7 +85,7 @@ g h ## Plan error when compare Utf8 and timestamp in where clause -statement error DataFusion error: type_coercion\ncaused by\nError during planning: Cannot coerce arithmetic expression Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 to valid types +statement error DataFusion error: type_coercion\ncaused by\nError during planning: Cannot coerce arithmetic expression Timestamp\(Nanosecond, Some\("\+00"\)\) \+ Utf8 to valid types select i_item_desc from test where d3_date > now() + '5 days'; From c396d636a9ff4922b22ad4d68fbfdff974abc518 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 13 Oct 2025 21:51:54 +0300 Subject: [PATCH 04/18] feat: add configurable UDF for now() function with timezone support --- datafusion/core/src/execution/context/mod.rs | 15 +++++++++++++++ datafusion/functions/src/datetime/now.rs | 14 +++++++++++++- datafusion/functions/src/macros.rs | 16 ++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a8148b80495e..40986f33a53c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -81,6 +81,8 @@ use datafusion_expr::{ planner::ExprPlanner, Expr, UserDefinedLogicalNode, WindowUDF, }; +use datafusion_functions::datetime::now::NowFunc; +use datafusion_functions::make_udf_function_with_config; use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; use datafusion_optimizer::Analyzer; use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; @@ -1073,6 +1075,19 @@ impl SessionContext { let mut state = self.state.write(); state.config_mut().options_mut().set(&variable, &value)?; drop(state); + + // Register UDFs that return values based on session configuration + // e.g. now() which depends on the time_zone configuration option + if variable == "datafusion.execution.time_zone" { + let state = self.state.read(); + let config_options = state.config().options().clone(); + drop(state); + let now_udf = { + make_udf_function_with_config!(NowFunc, now, &ConfigOptions); + now(&config_options) + }; + self.state.write().register_udf(now_udf)?; + } } self.return_empty_dataframe() diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 2e265e45ad4e..9826d1fd1b87 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -19,7 +19,9 @@ use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Nanosecond; use arrow::datatypes::{DataType, Field, FieldRef}; use std::any::Any; +use std::sync::Arc; +use datafusion_common::config::ConfigOptions; use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ @@ -41,6 +43,7 @@ The `now()` return value is determined at query time and will return the same ti pub struct NowFunc { signature: Signature, aliases: Vec, + timezone: Option>, } impl Default for NowFunc { @@ -54,6 +57,15 @@ impl NowFunc { Self { signature: Signature::nullary(Volatility::Stable), aliases: vec!["current_timestamp".to_string()], + timezone: Some(Arc::from("+00")), + } + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { + Self { + signature: Signature::nullary(Volatility::Stable), + aliases: vec!["current_timestamp".to_string()], + timezone: Some(Arc::from(config.execution.time_zone.as_str())), } } } @@ -80,7 +92,7 @@ impl ScalarUDFImpl for NowFunc { fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { Ok(Field::new( self.name(), - Timestamp(Nanosecond, Some("+00".into())), + Timestamp(Nanosecond, self.timezone.clone()), false, ) .into()) diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 228d704e29cb..d98cd2879538 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -89,6 +89,22 @@ macro_rules! make_udf_function { }; } +/// Creates a singleton `ScalarUDF` of the `$UDF` function and a function +/// named `$NAME` which returns that singleton. The function takes a +/// configuration argument of type `$CONFIG_TYPE` to create the UDF. +#[macro_export] +macro_rules! make_udf_function_with_config { + ($UDF:ty, $NAME:ident, $CONFIG_TYPE:ty) => { + #[allow(rustdoc::redundant_explicit_links)] + #[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))] + pub fn $NAME(config: $CONFIG_TYPE) -> std::sync::Arc { + std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl( + <$UDF>::new_with_config(&config), + )) + } + }; +} + /// Macro creates a sub module if the feature is not enabled /// /// The rationale for providing stub functions is to help users to configure datafusion From 9f1b55f64407bb58843335790661ccba8e56cc2f Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 13 Oct 2025 21:55:01 +0300 Subject: [PATCH 05/18] chore --- datafusion/core/src/execution/context/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 40986f33a53c..07b00e02c374 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1079,10 +1079,9 @@ impl SessionContext { // Register UDFs that return values based on session configuration // e.g. now() which depends on the time_zone configuration option if variable == "datafusion.execution.time_zone" { - let state = self.state.read(); - let config_options = state.config().options().clone(); - drop(state); + let config_options = self.state.read().config().options().clone(); let now_udf = { + // recreate the function so it captures the new time zone make_udf_function_with_config!(NowFunc, now, &ConfigOptions); now(&config_options) }; From 048a6f13e9440ef3bd4657d00da75577525de94c Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 13 Oct 2025 22:23:07 +0300 Subject: [PATCH 06/18] chore: clippy --- datafusion/core/src/execution/context/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 07b00e02c374..da56fc8c4f02 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1079,11 +1079,12 @@ impl SessionContext { // Register UDFs that return values based on session configuration // e.g. now() which depends on the time_zone configuration option if variable == "datafusion.execution.time_zone" { - let config_options = self.state.read().config().options().clone(); + let state = self.state.read(); + let config_options = state.config().options(); let now_udf = { // recreate the function so it captures the new time zone make_udf_function_with_config!(NowFunc, now, &ConfigOptions); - now(&config_options) + now(config_options) }; self.state.write().register_udf(now_udf)?; } From e7a6ab969bb43f695ae15656807a6e45fe7d0e29 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 13 Oct 2025 22:52:06 +0300 Subject: [PATCH 07/18] chore --- datafusion/core/src/execution/context/mod.rs | 6 +++--- datafusion/functions/src/datetime/now.rs | 9 +-------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index da56fc8c4f02..819bd5d012d7 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1074,20 +1074,20 @@ impl SessionContext { } else { let mut state = self.state.write(); state.config_mut().options_mut().set(&variable, &value)?; - drop(state); // Register UDFs that return values based on session configuration // e.g. now() which depends on the time_zone configuration option if variable == "datafusion.execution.time_zone" { - let state = self.state.read(); let config_options = state.config().options(); let now_udf = { // recreate the function so it captures the new time zone make_udf_function_with_config!(NowFunc, now, &ConfigOptions); now(config_options) }; - self.state.write().register_udf(now_udf)?; + state.register_udf(now_udf)?; } + + drop(state); } self.return_empty_dataframe() diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 9826d1fd1b87..aa96c4b13d91 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -119,15 +119,8 @@ impl ScalarUDFImpl for NowFunc { .query_execution_start_time .timestamp_nanos_opt(); - let timezone = info - .execution_props() - .config_options - .as_ref() - .map(|opts| opts.execution.time_zone.as_str()) - .unwrap_or("+00"); - Ok(ExprSimplifyResult::Simplified(Expr::Literal( - ScalarValue::TimestampNanosecond(now_ts, Some(timezone.into())), + ScalarValue::TimestampNanosecond(now_ts, self.timezone.clone()), None, ))) } From ed73c86c4a2ef012a4e2f1e9f86df6391cac6d70 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 13 Oct 2025 23:34:17 +0300 Subject: [PATCH 08/18] chore: fix test --- datafusion/core/tests/expr_api/simplification.rs | 3 +-- datafusion/sqllogictest/test_files/arrow_typeof.slt | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/expr_api/simplification.rs b/datafusion/core/tests/expr_api/simplification.rs index 89651726a69a..9523ddf483c8 100644 --- a/datafusion/core/tests/expr_api/simplification.rs +++ b/datafusion/core/tests/expr_api/simplification.rs @@ -514,8 +514,7 @@ fn multiple_now() -> Result<()> { // expect the same timestamp appears in both exprs let actual = get_optimized_plan_formatted(plan, &time); let expected = format!( - "Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\ - \n TableScan: test", + "Projection: TimestampNanosecond({}, Some(\"+00\")) AS now(), TimestampNanosecond({}, Some(\"+00\")) AS t2\n TableScan: test", time.timestamp_nanos_opt().unwrap(), time.timestamp_nanos_opt().unwrap() ); diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 654218531f1d..c4c3b1367948 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -67,7 +67,7 @@ Timestamp(Nanosecond, None) query T SELECT arrow_typeof(now()) ---- -Timestamp(Nanosecond, Some("+00:00")) +Timestamp(Nanosecond, Some("+00")) # arrow_typeof_timestamp_date32( query T From 391ecf776655db82fbafb01d165d8eba592a7fab Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 14 Oct 2025 15:34:53 +0300 Subject: [PATCH 09/18] fix: initialize the function when building based on the config option --- datafusion/core/src/execution/context/mod.rs | 9 +++---- .../core/src/execution/session_state.rs | 22 ++++++++++++--- datafusion/expr/src/udf.rs | 9 +++++++ datafusion/functions/src/datetime/mod.rs | 8 ++++-- datafusion/functions/src/datetime/now.rs | 4 +++ datafusion/functions/src/macros.rs | 27 +++++++++++++++++-- 6 files changed, 66 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 819bd5d012d7..327ba8ef8f61 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -82,7 +82,7 @@ use datafusion_expr::{ Expr, UserDefinedLogicalNode, WindowUDF, }; use datafusion_functions::datetime::now::NowFunc; -use datafusion_functions::make_udf_function_with_config; +use datafusion_functions::{init_udf_with_config, make_udf_function_with_config}; use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; use datafusion_optimizer::Analyzer; use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; @@ -1079,11 +1079,8 @@ impl SessionContext { // e.g. now() which depends on the time_zone configuration option if variable == "datafusion.execution.time_zone" { let config_options = state.config().options(); - let now_udf = { - // recreate the function so it captures the new time zone - make_udf_function_with_config!(NowFunc, now, &ConfigOptions); - now(config_options) - }; + let udf = state.udf("now")?; + let now_udf = init_udf_with_config!(udf.clone(), config_options, NowFunc); state.register_udf(now_udf)?; } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index b04004dd495c..0122558cb03d 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -60,6 +60,8 @@ use datafusion_expr::TableSource; use datafusion_expr::{ AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, WindowUDF, }; +use datafusion_functions::datetime::now::NowFunc; +use datafusion_functions::init_udf_with_config; use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datafusion_optimizer::{ Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule, @@ -1420,9 +1422,23 @@ impl SessionStateBuilder { if let Some(scalar_functions) = scalar_functions { scalar_functions.into_iter().for_each(|udf| { - let existing_udf = state.register_udf(udf); - if let Ok(Some(existing_udf)) = existing_udf { - debug!("Overwrote an existing UDF: {}", existing_udf.name()); + if udf.need_config() { + // Register now() with the configured timezone + // The now() function depends on the timezone configuration, so we need to + // register it after the config is set to ensure it uses the correct timezone + let new_udf = init_udf_with_config!( + udf.clone(), + state.config.options(), + NowFunc + ); + if let Err(e) = state.register_udf(new_udf) { + info!("Unable to register UDF: {e}") + }; + } else { + let existing_udf = state.register_udf(udf); + if let Ok(Some(existing_udf)) = existing_udf { + debug!("Overwrote an existing UDF: {}", existing_udf.name()); + } } }); } diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index d522158f7b6b..23fe6e1aecb3 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -205,6 +205,10 @@ impl ScalarUDF { self.inner.return_type(arg_types) } + pub fn need_config(&self) -> bool { + self.inner.need_config() + } + /// Return the datatype this function returns given the input argument types. /// /// See [`ScalarUDFImpl::return_field_from_args`] for more details. @@ -532,6 +536,11 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// [`DataFusionError::Internal`]: datafusion_common::DataFusionError::Internal fn return_type(&self, arg_types: &[DataType]) -> Result; + + fn need_config(&self) -> bool { + false + } + /// What type will be returned by this function, given the arguments? /// /// By default, this function calls [`Self::return_type`] with the diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 5729b1edae95..d80f14facf82 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -45,7 +45,6 @@ make_udf_function!(date_part::DatePartFunc, date_part); make_udf_function!(date_trunc::DateTruncFunc, date_trunc); make_udf_function!(make_date::MakeDateFunc, make_date); make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime); -make_udf_function!(now::NowFunc, now); make_udf_function!(to_char::ToCharFunc, to_char); make_udf_function!(to_date::ToDateFunc, to_date); make_udf_function!(to_local_time::ToLocalTimeFunc, to_local_time); @@ -56,6 +55,9 @@ make_udf_function!(to_timestamp::ToTimestampMillisFunc, to_timestamp_millis); make_udf_function!(to_timestamp::ToTimestampMicrosFunc, to_timestamp_micros); make_udf_function!(to_timestamp::ToTimestampNanosFunc, to_timestamp_nanos); +// create UDF with config +make_udf_function_with_config!(now::NowFunc, now); + // we cannot currently use the export_functions macro since it doesn't handle // functions with varargs currently @@ -91,6 +93,7 @@ pub mod expr_fn { ),( now, "returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement", + @config ), ( to_local_time, @@ -255,6 +258,7 @@ pub mod expr_fn { /// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { + use datafusion_common::config::ConfigOptions; vec![ current_date(), current_time(), @@ -263,7 +267,7 @@ pub fn functions() -> Vec> { date_trunc(), from_unixtime(), make_date(), - now(), + now(&ConfigOptions::default()), to_char(), to_date(), to_local_time(), diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index aa96c4b13d91..fd07dd019455 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -89,6 +89,10 @@ impl ScalarUDFImpl for NowFunc { &self.signature } + fn need_config(&self) -> bool { + true + } + fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { Ok(Field::new( self.name(), diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index d98cd2879538..c1e280ed3005 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -40,6 +40,7 @@ /// Exported functions accept: /// - `Vec` argument (single argument followed by a comma) /// - Variable number of `Expr` arguments (zero or more arguments, must be without commas) +/// - Functions that require config (marked with `@config` prefix) #[macro_export] macro_rules! export_functions { ($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => { @@ -49,6 +50,15 @@ macro_rules! export_functions { )* }; + // function that requires config (marked with @config) + (single $FUNC:ident, $DOC:expr, @config) => { + #[doc = $DOC] + pub fn $FUNC() -> datafusion_expr::Expr { + use datafusion_common::config::ConfigOptions; + super::$FUNC(&ConfigOptions::default()).call(vec![]) + } + }; + // single vector argument (a single argument followed by a comma) (single $FUNC:ident, $DOC:expr, $arg:ident,) => { #[doc = $DOC] @@ -94,10 +104,10 @@ macro_rules! make_udf_function { /// configuration argument of type `$CONFIG_TYPE` to create the UDF. #[macro_export] macro_rules! make_udf_function_with_config { - ($UDF:ty, $NAME:ident, $CONFIG_TYPE:ty) => { + ($UDF:ty, $NAME:ident) => { #[allow(rustdoc::redundant_explicit_links)] #[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))] - pub fn $NAME(config: $CONFIG_TYPE) -> std::sync::Arc { + pub fn $NAME(config: &datafusion_common::config::ConfigOptions) -> std::sync::Arc { std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl( <$UDF>::new_with_config(&config), )) @@ -394,3 +404,16 @@ macro_rules! make_math_binary_udf { } }; } + +#[macro_export] +macro_rules! init_udf_with_config { + ($udf:expr, $config:expr, $func_type:ty) => {{ + let mut udf = (*$udf).clone(); + if let Some(_) = udf.inner().as_any().downcast_ref::<$func_type>() { + udf = <$func_type>::new_with_config($config).into(); + } + let udf = Arc::new(udf); + Arc::clone(&udf) + }}; +} + From 589cddd26b214f8859f6c667b951227fb2011700 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 14 Oct 2025 22:54:54 +0300 Subject: [PATCH 10/18] refactor: update UDF handling to support runtime configuration changes --- datafusion/core/src/execution/context/mod.rs | 28 ++++++++---- .../core/src/execution/session_state.rs | 43 ++++++++++--------- datafusion/core/tests/optimizer/mod.rs | 3 +- datafusion/expr/src/udf.rs | 31 ++++++++++--- datafusion/functions/src/datetime/now.rs | 8 ++-- datafusion/functions/src/macros.rs | 13 ------ 6 files changed, 72 insertions(+), 54 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 327ba8ef8f61..66ee1ff036cd 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -81,8 +81,6 @@ use datafusion_expr::{ planner::ExprPlanner, Expr, UserDefinedLogicalNode, WindowUDF, }; -use datafusion_functions::datetime::now::NowFunc; -use datafusion_functions::{init_udf_with_config, make_udf_function_with_config}; use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; use datafusion_optimizer::Analyzer; use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; @@ -1075,13 +1073,25 @@ impl SessionContext { let mut state = self.state.write(); state.config_mut().options_mut().set(&variable, &value)?; - // Register UDFs that return values based on session configuration - // e.g. now() which depends on the time_zone configuration option - if variable == "datafusion.execution.time_zone" { - let config_options = state.config().options(); - let udf = state.udf("now")?; - let now_udf = init_udf_with_config!(udf.clone(), config_options, NowFunc); - state.register_udf(now_udf)?; + // Re-initialize any UDFs that depend on configuration + // This allows both built-in and custom functions to respond to configuration changes + let config_options = state.config().options(); + let udf_names: Vec<_> = state.scalar_functions().keys().cloned().collect(); + + // Collect updated UDFs in a separate vector + let udfs_to_update: Vec<_> = udf_names + .into_iter() + .filter_map(|name| { + state.udf(&name).ok().and_then(|udf| { + udf.inner() + .with_updated_config(&config_options) + .map(Arc::new) + }) + }) + .collect(); + + for udf in udfs_to_update { + state.register_udf(udf)?; } drop(state); diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 0122558cb03d..0624ff9b852d 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -60,8 +60,6 @@ use datafusion_expr::TableSource; use datafusion_expr::{ AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, WindowUDF, }; -use datafusion_functions::datetime::now::NowFunc; -use datafusion_functions::init_udf_with_config; use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datafusion_optimizer::{ Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule, @@ -1421,26 +1419,31 @@ impl SessionStateBuilder { } if let Some(scalar_functions) = scalar_functions { - scalar_functions.into_iter().for_each(|udf| { - if udf.need_config() { - // Register now() with the configured timezone - // The now() function depends on the timezone configuration, so we need to - // register it after the config is set to ensure it uses the correct timezone - let new_udf = init_udf_with_config!( - udf.clone(), - state.config.options(), - NowFunc - ); - if let Err(e) = state.register_udf(new_udf) { - info!("Unable to register UDF: {e}") - }; - } else { - let existing_udf = state.register_udf(udf); - if let Ok(Some(existing_udf)) = existing_udf { - debug!("Overwrote an existing UDF: {}", existing_udf.name()); + for udf in scalar_functions { + let config_options = state.config().options(); + match udf.inner().with_updated_config(config_options) { + Some(new_udf) => { + if let Err(err) = state.register_udf(Arc::new(new_udf)) { + debug!( + "Failed to re-register updated UDF '{}': {}", + udf.name(), + err + ); + } } + None => match state.register_udf(udf.clone()) { + Ok(Some(existing)) => { + debug!("Overwrote existing UDF '{}'", existing.name()); + } + Ok(None) => { + debug!("Registered UDF '{}'", udf.name()); + } + Err(err) => { + debug!("Failed to register UDF '{}': {}", udf.name(), err); + } + }, } - }); + } } if let Some(aggregate_functions) = aggregate_functions { diff --git a/datafusion/core/tests/optimizer/mod.rs b/datafusion/core/tests/optimizer/mod.rs index 9899a0158fb8..aec32d05624c 100644 --- a/datafusion/core/tests/optimizer/mod.rs +++ b/datafusion/core/tests/optimizer/mod.rs @@ -144,8 +144,9 @@ fn test_sql(sql: &str) -> Result { let statement = &ast[0]; // create a logical query plan + let config = ConfigOptions::default(); let context_provider = MyContextProvider::default() - .with_udf(datetime::now()) + .with_udf(datetime::now(&config)) .with_udf(datafusion_functions::core::arrow_cast()) .with_udf(datafusion_functions::string::concat()) .with_udf(datafusion_functions::string::concat_ws()); diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 23fe6e1aecb3..45b33a94e05f 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -205,10 +205,6 @@ impl ScalarUDF { self.inner.return_type(arg_types) } - pub fn need_config(&self) -> bool { - self.inner.need_config() - } - /// Return the datatype this function returns given the input argument types. /// /// See [`ScalarUDFImpl::return_field_from_args`] for more details. @@ -536,9 +532,30 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// [`DataFusionError::Internal`]: datafusion_common::DataFusionError::Internal fn return_type(&self, arg_types: &[DataType]) -> Result; - - fn need_config(&self) -> bool { - false + /// Create a new instance of this function with updated configuration. + /// + /// This method is called when configuration options change at runtime + /// (e.g., via `SET` statements) to allow functions that depend on + /// configuration to update themselves accordingly. + /// + /// # Arguments + /// + /// * `config` - The updated configuration options + /// + /// # Returns + /// + /// * `Some(ScalarUDF)` - A new instance of this function configured with the new settings + /// * `None` - If this function does not support runtime reconfiguration + /// + /// # Example + /// + /// ```ignore + /// fn with_updated_config(&self, config: &ConfigOptions) -> Option { + /// Some(MyConfigDependentFunc::new_with_config(config).into()) + /// } + /// ``` + fn with_updated_config(&self, _config: &ConfigOptions) -> Option { + None } /// What type will be returned by this function, given the arguments? diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index fd07dd019455..20a23252f637 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -25,8 +25,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ - ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature, - Volatility, + ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl, + Signature, Volatility, }; use datafusion_macros::user_doc; @@ -89,8 +89,8 @@ impl ScalarUDFImpl for NowFunc { &self.signature } - fn need_config(&self) -> bool { - true + fn with_updated_config(&self, config: &ConfigOptions) -> Option { + Some(Self::new_with_config(config).into()) } fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index c1e280ed3005..9e195f2d5291 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -404,16 +404,3 @@ macro_rules! make_math_binary_udf { } }; } - -#[macro_export] -macro_rules! init_udf_with_config { - ($udf:expr, $config:expr, $func_type:ty) => {{ - let mut udf = (*$udf).clone(); - if let Some(_) = udf.inner().as_any().downcast_ref::<$func_type>() { - udf = <$func_type>::new_with_config($config).into(); - } - let udf = Arc::new(udf); - Arc::clone(&udf) - }}; -} - From cab15c091eb3c44e8e542d3b6400f6d03b6d2c6b Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 14 Oct 2025 22:58:34 +0300 Subject: [PATCH 11/18] chore --- datafusion/expr/src/udf.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 45b33a94e05f..b334be0c289d 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -547,13 +547,6 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// * `Some(ScalarUDF)` - A new instance of this function configured with the new settings /// * `None` - If this function does not support runtime reconfiguration /// - /// # Example - /// - /// ```ignore - /// fn with_updated_config(&self, config: &ConfigOptions) -> Option { - /// Some(MyConfigDependentFunc::new_with_config(config).into()) - /// } - /// ``` fn with_updated_config(&self, _config: &ConfigOptions) -> Option { None } From 179607ada7d6fe7606e3a6c480bbd6a700f4fd1e Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 14 Oct 2025 23:02:16 +0300 Subject: [PATCH 12/18] chore: Update tests --- datafusion/core/tests/expr_api/simplification.rs | 2 +- datafusion/sqllogictest/test_files/arrow_typeof.slt | 2 +- datafusion/sqllogictest/test_files/dates.slt | 2 +- datafusion/sqllogictest/test_files/timestamps.slt | 5 +++++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/expr_api/simplification.rs b/datafusion/core/tests/expr_api/simplification.rs index 9523ddf483c8..572a7e2b335c 100644 --- a/datafusion/core/tests/expr_api/simplification.rs +++ b/datafusion/core/tests/expr_api/simplification.rs @@ -514,7 +514,7 @@ fn multiple_now() -> Result<()> { // expect the same timestamp appears in both exprs let actual = get_optimized_plan_formatted(plan, &time); let expected = format!( - "Projection: TimestampNanosecond({}, Some(\"+00\")) AS now(), TimestampNanosecond({}, Some(\"+00\")) AS t2\n TableScan: test", + "Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\n TableScan: test", time.timestamp_nanos_opt().unwrap(), time.timestamp_nanos_opt().unwrap() ); diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index c4c3b1367948..654218531f1d 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -67,7 +67,7 @@ Timestamp(Nanosecond, None) query T SELECT arrow_typeof(now()) ---- -Timestamp(Nanosecond, Some("+00")) +Timestamp(Nanosecond, Some("+00:00")) # arrow_typeof_timestamp_date32( query T diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index fb4198477525..2e91a0363db0 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -85,7 +85,7 @@ g h ## Plan error when compare Utf8 and timestamp in where clause -statement error DataFusion error: type_coercion\ncaused by\nError during planning: Cannot coerce arithmetic expression Timestamp\(Nanosecond, Some\("\+00"\)\) \+ Utf8 to valid types +statement error DataFusion error: type_coercion\ncaused by\nError during planning: Cannot coerce arithmetic expression Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 to valid types select i_item_desc from test where d3_date > now() + '5 days'; diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index fc6fcf4e54d2..e2569c55c47e 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -81,6 +81,11 @@ select arrow_typeof(now()); ---- Timestamp(Nanosecond, Some("+08")) +query I +SELECT count(1) result FROM (SELECT now() as n) a WHERE n > '2000-01-01'::date; +---- +1 + statement ok SET TIME ZONE = '+00' From 82a643ecefb36f5d098bb5e7349230a9966163e2 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 14 Oct 2025 23:54:52 +0300 Subject: [PATCH 13/18] chore --- datafusion/expr/src/udf.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index b334be0c289d..5eff7f9b1265 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -852,6 +852,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.invoke_with_args(args) } + fn with_updated_config(&self, _config: &ConfigOptions) -> Option { + None + } + fn aliases(&self) -> &[String] { &self.aliases } From d5a9f9517298d3d4b9a8ac48a706205a7fd392ec Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 15 Oct 2025 00:37:00 +0300 Subject: [PATCH 14/18] chore: clippy --- datafusion/core/src/execution/context/mod.rs | 2 +- datafusion/core/src/execution/session_state.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 66ee1ff036cd..cf086f479dad 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1084,7 +1084,7 @@ impl SessionContext { .filter_map(|name| { state.udf(&name).ok().and_then(|udf| { udf.inner() - .with_updated_config(&config_options) + .with_updated_config(config_options) .map(Arc::new) }) }) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 0624ff9b852d..8ad0682b055d 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -1431,7 +1431,7 @@ impl SessionStateBuilder { ); } } - None => match state.register_udf(udf.clone()) { + None => match state.register_udf(Arc::clone(&udf)) { Ok(Some(existing)) => { debug!("Overwrote existing UDF '{}'", existing.name()); } From d8e4905c91836385795f96b1b8bf8b951ffc19ce Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 15 Oct 2025 11:19:57 +0300 Subject: [PATCH 15/18] refactor: update NowFunc default implementation to use new_with_config --- datafusion/functions/src/datetime/now.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 20a23252f637..96a35c241ff0 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -48,11 +48,12 @@ pub struct NowFunc { impl Default for NowFunc { fn default() -> Self { - Self::new() + Self::new_with_config(&ConfigOptions::default()) } } impl NowFunc { + #[deprecated(since = "50.2.0", note = "use `new_with_config` instead")] pub fn new() -> Self { Self { signature: Signature::nullary(Volatility::Stable), From 77ca3b6313a476d1ebae4b6f4e28d04c1b56b155 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 15 Oct 2025 16:37:56 +0300 Subject: [PATCH 16/18] refactor: simplify logic and clarify documentation --- datafusion/core/src/execution/context/mod.rs | 16 +++++++--------- datafusion/expr/src/udf.rs | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index cf086f479dad..f62eb815c9c7 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1076,17 +1076,15 @@ impl SessionContext { // Re-initialize any UDFs that depend on configuration // This allows both built-in and custom functions to respond to configuration changes let config_options = state.config().options(); - let udf_names: Vec<_> = state.scalar_functions().keys().cloned().collect(); // Collect updated UDFs in a separate vector - let udfs_to_update: Vec<_> = udf_names - .into_iter() - .filter_map(|name| { - state.udf(&name).ok().and_then(|udf| { - udf.inner() - .with_updated_config(config_options) - .map(Arc::new) - }) + let udfs_to_update: Vec<_> = state + .scalar_functions() + .values() + .filter_map(|udf| { + udf.inner() + .with_updated_config(config_options) + .map(Arc::new) }) .collect(); diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 5eff7f9b1265..c63994ca9b0a 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -545,7 +545,7 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// # Returns /// /// * `Some(ScalarUDF)` - A new instance of this function configured with the new settings - /// * `None` - If this function does not support runtime reconfiguration + /// * `None` - If this function does not change with new configuration settings (the default) /// fn with_updated_config(&self, _config: &ConfigOptions) -> Option { None From c2b8b5d2b7230cc90913006ab4b139a2ba2a2879 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Wed, 15 Oct 2025 20:58:27 +0300 Subject: [PATCH 17/18] Update datafusion/expr/src/udf.rs Co-authored-by: Andrew Lamb --- datafusion/expr/src/udf.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index c63994ca9b0a..3e28700c2138 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -536,7 +536,15 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// /// This method is called when configuration options change at runtime /// (e.g., via `SET` statements) to allow functions that depend on - /// configuration to update themselves accordingly. + /// configuration to update themselves accordingly. + /// + /// Note the current [`ConfigOptions`] are also passed to [`Self::invoke_with_args`] so + /// this API is not needed for functions where the values may + /// depend on the current options. + /// + /// This API is useful for functions where the return + /// **type** depends on the configuration options, such as the `now()` function + /// which depends on the current timezone. /// /// # Arguments /// From f9fbf67be0ca00e133d0c4e8f14ed3e3c75f2ff3 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 15 Oct 2025 21:03:33 +0300 Subject: [PATCH 18/18] chore: fmt --- datafusion/expr/src/udf.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 3e28700c2138..b3b25a1f3784 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -536,15 +536,15 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// /// This method is called when configuration options change at runtime /// (e.g., via `SET` statements) to allow functions that depend on - /// configuration to update themselves accordingly. + /// configuration to update themselves accordingly. /// /// Note the current [`ConfigOptions`] are also passed to [`Self::invoke_with_args`] so /// this API is not needed for functions where the values may - /// depend on the current options. + /// depend on the current options. /// /// This API is useful for functions where the return /// **type** depends on the configuration options, such as the `now()` function - /// which depends on the current timezone. + /// which depends on the current timezone. /// /// # Arguments ///