Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,26 @@ impl SessionContext {
} else {
let mut state = self.state.write();
state.config_mut().options_mut().set(&variable, &value)?;

// Re-initialize any UDFs that depend on configuration
// This allows both built-in and custom functions to respond to configuration changes
let config_options = state.config().options();

// Collect updated UDFs in a separate vector
let udfs_to_update: Vec<_> = state
.scalar_functions()
.values()
.filter_map(|udf| {
udf.inner()
.with_updated_config(config_options)
.map(Arc::new)
})
.collect();

for udf in udfs_to_update {
state.register_udf(udf)?;
}

drop(state);
}

Expand Down
29 changes: 24 additions & 5 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,12 +1419,31 @@ impl SessionStateBuilder {
}

if let Some(scalar_functions) = scalar_functions {
scalar_functions.into_iter().for_each(|udf| {
let existing_udf = state.register_udf(udf);
if let Ok(Some(existing_udf)) = existing_udf {
debug!("Overwrote an existing UDF: {}", existing_udf.name());
for udf in scalar_functions {
let config_options = state.config().options();
match udf.inner().with_updated_config(config_options) {
Some(new_udf) => {
if let Err(err) = state.register_udf(Arc::new(new_udf)) {
debug!(
"Failed to re-register updated UDF '{}': {}",
udf.name(),
err
);
}
}
None => match state.register_udf(Arc::clone(&udf)) {
Ok(Some(existing)) => {
debug!("Overwrote existing UDF '{}'", existing.name());
}
Ok(None) => {
debug!("Registered UDF '{}'", udf.name());
}
Err(err) => {
debug!("Failed to register UDF '{}': {}", udf.name(), err);
}
},
}
});
}
}

if let Some(aggregate_functions) = aggregate_functions {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/expr_api/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,7 @@ fn multiple_now() -> Result<()> {
// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
\n TableScan: test",
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\n TableScan: test",
time.timestamp_nanos_opt().unwrap(),
time.timestamp_nanos_opt().unwrap()
);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let statement = &ast[0];

// create a logical query plan
let config = ConfigOptions::default();
let context_provider = MyContextProvider::default()
.with_udf(datetime::now())
.with_udf(datetime::now(&config))
.with_udf(datafusion_functions::core::arrow_cast())
.with_udf(datafusion_functions::string::concat())
.with_udf(datafusion_functions::string::concat_ws());
Expand Down
31 changes: 31 additions & 0 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,33 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
/// [`DataFusionError::Internal`]: datafusion_common::DataFusionError::Internal
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;

/// Create a new instance of this function with updated configuration.
///
/// This method is called when configuration options change at runtime
/// (e.g., via `SET` statements) to allow functions that depend on
/// configuration to update themselves accordingly.
///
/// Note the current [`ConfigOptions`] are also passed to [`Self::invoke_with_args`] so
/// this API is not needed for functions where the values may
/// depend on the current options.
///
/// This API is useful for functions where the return
/// **type** depends on the configuration options, such as the `now()` function
/// which depends on the current timezone.
///
/// # Arguments
///
/// * `config` - The updated configuration options
///
/// # Returns
///
/// * `Some(ScalarUDF)` - A new instance of this function configured with the new settings
/// * `None` - If this function does not change with new configuration settings (the default)
///
fn with_updated_config(&self, _config: &ConfigOptions) -> Option<ScalarUDF> {
None
}

/// What type will be returned by this function, given the arguments?
///
/// By default, this function calls [`Self::return_type`] with the
Expand Down Expand Up @@ -833,6 +860,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.invoke_with_args(args)
}

fn with_updated_config(&self, _config: &ConfigOptions) -> Option<ScalarUDF> {
None
}

fn aliases(&self) -> &[String] {
&self.aliases
}
Expand Down
8 changes: 6 additions & 2 deletions datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ make_udf_function!(date_part::DatePartFunc, date_part);
make_udf_function!(date_trunc::DateTruncFunc, date_trunc);
make_udf_function!(make_date::MakeDateFunc, make_date);
make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime);
make_udf_function!(now::NowFunc, now);
make_udf_function!(to_char::ToCharFunc, to_char);
make_udf_function!(to_date::ToDateFunc, to_date);
make_udf_function!(to_local_time::ToLocalTimeFunc, to_local_time);
Expand All @@ -56,6 +55,9 @@ make_udf_function!(to_timestamp::ToTimestampMillisFunc, to_timestamp_millis);
make_udf_function!(to_timestamp::ToTimestampMicrosFunc, to_timestamp_micros);
make_udf_function!(to_timestamp::ToTimestampNanosFunc, to_timestamp_nanos);

// create UDF with config
make_udf_function_with_config!(now::NowFunc, now);

// we cannot currently use the export_functions macro since it doesn't handle
// functions with varargs currently

Expand Down Expand Up @@ -91,6 +93,7 @@ pub mod expr_fn {
),(
now,
"returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement",
@config
),
(
to_local_time,
Expand Down Expand Up @@ -255,6 +258,7 @@ pub mod expr_fn {

/// Returns all DataFusion functions defined in this package
pub fn functions() -> Vec<Arc<ScalarUDF>> {
use datafusion_common::config::ConfigOptions;
vec![
current_date(),
current_time(),
Expand All @@ -263,7 +267,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
date_trunc(),
from_unixtime(),
make_date(),
now(),
now(&ConfigOptions::default()),
to_char(),
to_date(),
to_local_time(),
Expand Down
28 changes: 23 additions & 5 deletions datafusion/functions/src/datetime/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use arrow::datatypes::DataType::Timestamp;
use arrow::datatypes::TimeUnit::Nanosecond;
use arrow::datatypes::{DataType, Field, FieldRef};
use std::any::Any;
use std::sync::Arc;

use datafusion_common::config::ConfigOptions;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature,
Volatility,
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl,
Signature, Volatility,
};
use datafusion_macros::user_doc;

Expand All @@ -41,19 +43,30 @@ The `now()` return value is determined at query time and will return the same ti
pub struct NowFunc {
signature: Signature,
aliases: Vec<String>,
timezone: Option<Arc<str>>,
}

impl Default for NowFunc {
fn default() -> Self {
Self::new()
Self::new_with_config(&ConfigOptions::default())
}
}

impl NowFunc {
#[deprecated(since = "50.2.0", note = "use `new_with_config` instead")]
pub fn new() -> Self {
Self {
signature: Signature::nullary(Volatility::Stable),
aliases: vec!["current_timestamp".to_string()],
timezone: Some(Arc::from("+00")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NowFunc::new() (and therefore Default) now hardcodes the timezone as "+00".
Everywhere else in the codebase — including the default ConfigOptions and existing sqllogictests — expects the canonical "+00:00" form.
Won't this mean that any downstream caller that still constructs the UDF via NowFunc::new()/Default (for example, when registering their own function registry) now get a different, non-canonical offset that will no longer compare equal to the rest of the system?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could remove new(), because now relies on config for the initialization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can someone explain the implications of this change on downstream crates? Does it mean that that now() will always uses the current timezone from the ConfigOptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Yes, this is the implication

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb With this implementation, yes. To me the logical solution is to allow the time_zone config option to be an Option (and default to None) and fix any existing usage to handle that ... or to detect when the time_zone config option is '' and handle that in the same manner ('' -> None tz)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree

}
}

pub fn new_with_config(config: &ConfigOptions) -> Self {
Self {
signature: Signature::nullary(Volatility::Stable),
aliases: vec!["current_timestamp".to_string()],
timezone: Some(Arc::from(config.execution.time_zone.as_str())),
}
}
}
Expand All @@ -77,10 +90,14 @@ impl ScalarUDFImpl for NowFunc {
&self.signature
}

fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> {
Some(Self::new_with_config(config).into())
}

fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
Ok(Field::new(
self.name(),
Timestamp(Nanosecond, Some("+00:00".into())),
Timestamp(Nanosecond, self.timezone.clone()),
false,
)
.into())
Expand All @@ -106,8 +123,9 @@ impl ScalarUDFImpl for NowFunc {
.execution_props()
.query_execution_start_time
.timestamp_nanos_opt();

Ok(ExprSimplifyResult::Simplified(Expr::Literal(
ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())),
ScalarValue::TimestampNanosecond(now_ts, self.timezone.clone()),
None,
)))
}
Expand Down
26 changes: 26 additions & 0 deletions datafusion/functions/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
/// Exported functions accept:
/// - `Vec<Expr>` argument (single argument followed by a comma)
/// - Variable number of `Expr` arguments (zero or more arguments, must be without commas)
/// - Functions that require config (marked with `@config` prefix)
#[macro_export]
macro_rules! export_functions {
($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => {
Expand All @@ -49,6 +50,15 @@ macro_rules! export_functions {
)*
};

// function that requires config (marked with @config)
(single $FUNC:ident, $DOC:expr, @config) => {
#[doc = $DOC]
pub fn $FUNC() -> datafusion_expr::Expr {
use datafusion_common::config::ConfigOptions;
super::$FUNC(&ConfigOptions::default()).call(vec![])
}
};

// single vector argument (a single argument followed by a comma)
(single $FUNC:ident, $DOC:expr, $arg:ident,) => {
#[doc = $DOC]
Expand Down Expand Up @@ -89,6 +99,22 @@ macro_rules! make_udf_function {
};
}

/// Creates a singleton `ScalarUDF` of the `$UDF` function and a function
/// named `$NAME` which returns that singleton. The function takes a
/// configuration argument of type `$CONFIG_TYPE` to create the UDF.
#[macro_export]
macro_rules! make_udf_function_with_config {
($UDF:ty, $NAME:ident) => {
#[allow(rustdoc::redundant_explicit_links)]
#[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))]
pub fn $NAME(config: &datafusion_common::config::ConfigOptions) -> std::sync::Arc<datafusion_expr::ScalarUDF> {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$UDF>::new_with_config(&config),
))
}
};
}

/// Macro creates a sub module if the feature is not enabled
///
/// The rationale for providing stub functions is to help users to configure datafusion
Expand Down
15 changes: 15 additions & 0 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ true
##########
## Current time Tests
##########
statement ok
SET TIME ZONE = '+08'

query T
select arrow_typeof(now());
----
Timestamp(Nanosecond, Some("+08"))

query I
SELECT count(1) result FROM (SELECT now() as n) a WHERE n > '2000-01-01'::date;
----
1

statement ok
SET TIME ZONE = '+00'

query B
select cast(now() as time) = current_time();
Expand Down