diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a8148b80495e..f62eb815c9c7 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1072,6 +1072,26 @@ impl SessionContext { } else { let mut state = self.state.write(); state.config_mut().options_mut().set(&variable, &value)?; + + // Re-initialize any UDFs that depend on configuration + // This allows both built-in and custom functions to respond to configuration changes + let config_options = state.config().options(); + + // Collect updated UDFs in a separate vector + let udfs_to_update: Vec<_> = state + .scalar_functions() + .values() + .filter_map(|udf| { + udf.inner() + .with_updated_config(config_options) + .map(Arc::new) + }) + .collect(); + + for udf in udfs_to_update { + state.register_udf(udf)?; + } + drop(state); } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index b04004dd495c..8ad0682b055d 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -1419,12 +1419,31 @@ impl SessionStateBuilder { } if let Some(scalar_functions) = scalar_functions { - scalar_functions.into_iter().for_each(|udf| { - let existing_udf = state.register_udf(udf); - if let Ok(Some(existing_udf)) = existing_udf { - debug!("Overwrote an existing UDF: {}", existing_udf.name()); + for udf in scalar_functions { + let config_options = state.config().options(); + match udf.inner().with_updated_config(config_options) { + Some(new_udf) => { + if let Err(err) = state.register_udf(Arc::new(new_udf)) { + debug!( + "Failed to re-register updated UDF '{}': {}", + udf.name(), + err + ); + } + } + None => match state.register_udf(Arc::clone(&udf)) { + Ok(Some(existing)) => { + debug!("Overwrote existing UDF '{}'", existing.name()); + } + Ok(None) => { + debug!("Registered UDF '{}'", udf.name()); + } + Err(err) => { + debug!("Failed to register UDF '{}': {}", udf.name(), err); + } + }, } - }); + } } if let Some(aggregate_functions) = aggregate_functions { diff --git a/datafusion/core/tests/expr_api/simplification.rs b/datafusion/core/tests/expr_api/simplification.rs index 89651726a69a..572a7e2b335c 100644 --- a/datafusion/core/tests/expr_api/simplification.rs +++ b/datafusion/core/tests/expr_api/simplification.rs @@ -514,8 +514,7 @@ fn multiple_now() -> Result<()> { // expect the same timestamp appears in both exprs let actual = get_optimized_plan_formatted(plan, &time); let expected = format!( - "Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\ - \n TableScan: test", + "Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\n TableScan: test", time.timestamp_nanos_opt().unwrap(), time.timestamp_nanos_opt().unwrap() ); diff --git a/datafusion/core/tests/optimizer/mod.rs b/datafusion/core/tests/optimizer/mod.rs index 9899a0158fb8..aec32d05624c 100644 --- a/datafusion/core/tests/optimizer/mod.rs +++ b/datafusion/core/tests/optimizer/mod.rs @@ -144,8 +144,9 @@ fn test_sql(sql: &str) -> Result { let statement = &ast[0]; // create a logical query plan + let config = ConfigOptions::default(); let context_provider = MyContextProvider::default() - .with_udf(datetime::now()) + .with_udf(datetime::now(&config)) .with_udf(datafusion_functions::core::arrow_cast()) .with_udf(datafusion_functions::string::concat()) .with_udf(datafusion_functions::string::concat_ws()); diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index d522158f7b6b..b3b25a1f3784 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -532,6 +532,33 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// [`DataFusionError::Internal`]: datafusion_common::DataFusionError::Internal fn return_type(&self, arg_types: &[DataType]) -> Result; + /// Create a new instance of this function with updated configuration. + /// + /// This method is called when configuration options change at runtime + /// (e.g., via `SET` statements) to allow functions that depend on + /// configuration to update themselves accordingly. + /// + /// Note the current [`ConfigOptions`] are also passed to [`Self::invoke_with_args`] so + /// this API is not needed for functions where the values may + /// depend on the current options. + /// + /// This API is useful for functions where the return + /// **type** depends on the configuration options, such as the `now()` function + /// which depends on the current timezone. + /// + /// # Arguments + /// + /// * `config` - The updated configuration options + /// + /// # Returns + /// + /// * `Some(ScalarUDF)` - A new instance of this function configured with the new settings + /// * `None` - If this function does not change with new configuration settings (the default) + /// + fn with_updated_config(&self, _config: &ConfigOptions) -> Option { + None + } + /// What type will be returned by this function, given the arguments? /// /// By default, this function calls [`Self::return_type`] with the @@ -833,6 +860,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.invoke_with_args(args) } + fn with_updated_config(&self, _config: &ConfigOptions) -> Option { + None + } + fn aliases(&self) -> &[String] { &self.aliases } diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 5729b1edae95..d80f14facf82 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -45,7 +45,6 @@ make_udf_function!(date_part::DatePartFunc, date_part); make_udf_function!(date_trunc::DateTruncFunc, date_trunc); make_udf_function!(make_date::MakeDateFunc, make_date); make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime); -make_udf_function!(now::NowFunc, now); make_udf_function!(to_char::ToCharFunc, to_char); make_udf_function!(to_date::ToDateFunc, to_date); make_udf_function!(to_local_time::ToLocalTimeFunc, to_local_time); @@ -56,6 +55,9 @@ make_udf_function!(to_timestamp::ToTimestampMillisFunc, to_timestamp_millis); make_udf_function!(to_timestamp::ToTimestampMicrosFunc, to_timestamp_micros); make_udf_function!(to_timestamp::ToTimestampNanosFunc, to_timestamp_nanos); +// create UDF with config +make_udf_function_with_config!(now::NowFunc, now); + // we cannot currently use the export_functions macro since it doesn't handle // functions with varargs currently @@ -91,6 +93,7 @@ pub mod expr_fn { ),( now, "returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement", + @config ), ( to_local_time, @@ -255,6 +258,7 @@ pub mod expr_fn { /// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { + use datafusion_common::config::ConfigOptions; vec![ current_date(), current_time(), @@ -263,7 +267,7 @@ pub fn functions() -> Vec> { date_trunc(), from_unixtime(), make_date(), - now(), + now(&ConfigOptions::default()), to_char(), to_date(), to_local_time(), diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index 65dadb42a89e..96a35c241ff0 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -19,12 +19,14 @@ use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Nanosecond; use arrow::datatypes::{DataType, Field, FieldRef}; use std::any::Any; +use std::sync::Arc; +use datafusion_common::config::ConfigOptions; use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ - ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature, - Volatility, + ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl, + Signature, Volatility, }; use datafusion_macros::user_doc; @@ -41,19 +43,30 @@ The `now()` return value is determined at query time and will return the same ti pub struct NowFunc { signature: Signature, aliases: Vec, + timezone: Option>, } impl Default for NowFunc { fn default() -> Self { - Self::new() + Self::new_with_config(&ConfigOptions::default()) } } impl NowFunc { + #[deprecated(since = "50.2.0", note = "use `new_with_config` instead")] pub fn new() -> Self { Self { signature: Signature::nullary(Volatility::Stable), aliases: vec!["current_timestamp".to_string()], + timezone: Some(Arc::from("+00")), + } + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { + Self { + signature: Signature::nullary(Volatility::Stable), + aliases: vec!["current_timestamp".to_string()], + timezone: Some(Arc::from(config.execution.time_zone.as_str())), } } } @@ -77,10 +90,14 @@ impl ScalarUDFImpl for NowFunc { &self.signature } + fn with_updated_config(&self, config: &ConfigOptions) -> Option { + Some(Self::new_with_config(config).into()) + } + fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { Ok(Field::new( self.name(), - Timestamp(Nanosecond, Some("+00:00".into())), + Timestamp(Nanosecond, self.timezone.clone()), false, ) .into()) @@ -106,8 +123,9 @@ impl ScalarUDFImpl for NowFunc { .execution_props() .query_execution_start_time .timestamp_nanos_opt(); + Ok(ExprSimplifyResult::Simplified(Expr::Literal( - ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())), + ScalarValue::TimestampNanosecond(now_ts, self.timezone.clone()), None, ))) } diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 228d704e29cb..9e195f2d5291 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -40,6 +40,7 @@ /// Exported functions accept: /// - `Vec` argument (single argument followed by a comma) /// - Variable number of `Expr` arguments (zero or more arguments, must be without commas) +/// - Functions that require config (marked with `@config` prefix) #[macro_export] macro_rules! export_functions { ($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => { @@ -49,6 +50,15 @@ macro_rules! export_functions { )* }; + // function that requires config (marked with @config) + (single $FUNC:ident, $DOC:expr, @config) => { + #[doc = $DOC] + pub fn $FUNC() -> datafusion_expr::Expr { + use datafusion_common::config::ConfigOptions; + super::$FUNC(&ConfigOptions::default()).call(vec![]) + } + }; + // single vector argument (a single argument followed by a comma) (single $FUNC:ident, $DOC:expr, $arg:ident,) => { #[doc = $DOC] @@ -89,6 +99,22 @@ macro_rules! make_udf_function { }; } +/// Creates a singleton `ScalarUDF` of the `$UDF` function and a function +/// named `$NAME` which returns that singleton. The function takes a +/// configuration argument of type `$CONFIG_TYPE` to create the UDF. +#[macro_export] +macro_rules! make_udf_function_with_config { + ($UDF:ty, $NAME:ident) => { + #[allow(rustdoc::redundant_explicit_links)] + #[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))] + pub fn $NAME(config: &datafusion_common::config::ConfigOptions) -> std::sync::Arc { + std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl( + <$UDF>::new_with_config(&config), + )) + } + }; +} + /// Macro creates a sub module if the feature is not enabled /// /// The rationale for providing stub functions is to help users to configure datafusion diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 1a7ff41d64a6..e2569c55c47e 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -73,6 +73,21 @@ true ########## ## Current time Tests ########## +statement ok +SET TIME ZONE = '+08' + +query T +select arrow_typeof(now()); +---- +Timestamp(Nanosecond, Some("+08")) + +query I +SELECT count(1) result FROM (SELECT now() as n) a WHERE n > '2000-01-01'::date; +---- +1 + +statement ok +SET TIME ZONE = '+00' query B select cast(now() as time) = current_time();