From 60ae760b13e0bfdc85c54900078a17e4b34ff92b Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Thu, 9 Oct 2025 11:51:12 +0200 Subject: [PATCH 1/2] #17982 Make `nvl` a thin wrapper for `coalesce` --- datafusion/expr/src/udf.rs | 56 ++++- datafusion/functions/src/core/coalesce.rs | 11 +- datafusion/functions/src/core/nvl.rs | 238 +++--------------- .../optimizer/src/common_subexpr_eliminate.rs | 6 +- .../test_files/string/string_view.slt | 2 +- 5 files changed, 105 insertions(+), 208 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index bc9e62fe6211..c49fc62df103 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -252,7 +252,28 @@ impl ScalarUDF { Ok(result) } - /// Get the circuits of inner implementation + /// Determines which of the arguments passed to this function are evaluated eagerly + /// and which may be evaluated lazily. + /// + /// If all arguments are evaluated eagerly this function may either return `None` or `Some(args, vec![])`. + /// + /// When `Some` is returned the first value of the tuple are the eagerly evaluated arguments, + /// and the second value of the tuple are the arguments that may be evaluated lazily. + /// The two sets of arguments are guaranteed to be disjunct, and each argument from `args` will + /// only be present in one the two `Vec`s. + /// + /// See [ScalarUDFImpl::conditional_arguments] for more information. + pub fn conditional_arguments<'a>( + &self, + args: &'a [Expr], + ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> { + self.inner.conditional_arguments(args) + } + + /// Returns true if some of this `exprs` subexpressions may not be evaluated + /// and thus any side effects (like divide by zero) may not be encountered. + /// + /// See [ScalarUDFImpl::short_circuits] for more information. pub fn short_circuits(&self) -> bool { self.inner.short_circuits() } @@ -648,10 +669,36 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// /// Setting this to true prevents certain optimizations such as common /// subexpression elimination + /// + /// When overriding this function to return `true`, [conditional_arguments] can also be + /// overridden to report more accurately which arguments are eagerly evaluated and which ones + /// lazily. fn short_circuits(&self) -> bool { false } + /// Determines which of the arguments passed to this function are evaluated eagerly + /// and which may be evaluated lazily. + /// + /// If all arguments are eagerly evaluated this function is allowed, but is not required to, + /// return `None`. Returning `None` is a micro optimization that saves a needless `Vec` + /// allocation. + /// + /// When `Some` is returned, implementations must ensure that the two returned `Vec`s are + /// disjunct, and that each argument from `args` is present in one the two `Vec`s. + /// + /// When overriding this function, [short_circuits] should also be overridden to return `true`. + fn conditional_arguments<'a>( + &self, + args: &'a [Expr], + ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> { + if self.short_circuits() { + Some((vec![], args.iter().collect())) + } else { + None + } + } + /// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the input /// intervals. /// @@ -837,6 +884,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.simplify(args, info) } + fn conditional_arguments<'a>( + &self, + args: &'a [Expr], + ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> { + self.inner.conditional_arguments(args) + } + fn short_circuits(&self) -> bool { self.inner.short_circuits() } diff --git a/datafusion/functions/src/core/coalesce.rs b/datafusion/functions/src/core/coalesce.rs index 3fba539dd04b..aab1f445d559 100644 --- a/datafusion/functions/src/core/coalesce.rs +++ b/datafusion/functions/src/core/coalesce.rs @@ -47,7 +47,7 @@ use std::any::Any; )] #[derive(Debug, PartialEq, Eq, Hash)] pub struct CoalesceFunc { - signature: Signature, + pub(super) signature: Signature, } impl Default for CoalesceFunc { @@ -126,6 +126,15 @@ impl ScalarUDFImpl for CoalesceFunc { internal_err!("coalesce should have been simplified to case") } + fn conditional_arguments<'a>( + &self, + args: &'a [Expr], + ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> { + let eager = vec![&args[0]]; + let lazy = args[1..].iter().collect(); + Some((eager, lazy)) + } + fn short_circuits(&self) -> bool { true } diff --git a/datafusion/functions/src/core/nvl.rs b/datafusion/functions/src/core/nvl.rs index c8b34c4b1780..6be6a2428d0e 100644 --- a/datafusion/functions/src/core/nvl.rs +++ b/datafusion/functions/src/core/nvl.rs @@ -15,17 +15,15 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::Array; -use arrow::compute::is_not_null; -use arrow::compute::kernels::zip::zip; -use arrow::datatypes::DataType; -use datafusion_common::{utils::take_function_args, Result}; +use crate::core::coalesce::CoalesceFunc; +use arrow::datatypes::{DataType, FieldRef}; +use datafusion_common::Result; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ - ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, - Volatility, + ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs, + ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; -use std::sync::Arc; #[user_doc( doc_section(label = "Conditional Functions"), @@ -57,7 +55,7 @@ use std::sync::Arc; )] #[derive(Debug, PartialEq, Eq, Hash)] pub struct NVLFunc { - signature: Signature, + coalesce: CoalesceFunc, aliases: Vec, } @@ -90,11 +88,13 @@ impl Default for NVLFunc { impl NVLFunc { pub fn new() -> Self { Self { - signature: Signature::uniform( - 2, - SUPPORTED_NVL_TYPES.to_vec(), - Volatility::Immutable, - ), + coalesce: CoalesceFunc { + signature: Signature::uniform( + 2, + SUPPORTED_NVL_TYPES.to_vec(), + Volatility::Immutable, + ), + }, aliases: vec![String::from("ifnull")], } } @@ -110,209 +110,45 @@ impl ScalarUDFImpl for NVLFunc { } fn signature(&self) -> &Signature { - &self.signature + &self.coalesce.signature } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[0].clone()) + self.coalesce.return_type(arg_types) } - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - nvl_func(&args.args) - } - - fn aliases(&self) -> &[String] { - &self.aliases - } - - fn documentation(&self) -> Option<&Documentation> { - self.doc() + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + self.coalesce.return_field_from_args(args) } -} - -fn nvl_func(args: &[ColumnarValue]) -> Result { - let [lhs, rhs] = take_function_args("nvl/ifnull", args)?; - let (lhs_array, rhs_array) = match (lhs, rhs) { - (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => { - (Arc::clone(lhs), rhs.to_array_of_size(lhs.len())?) - } - (ColumnarValue::Array(lhs), ColumnarValue::Array(rhs)) => { - (Arc::clone(lhs), Arc::clone(rhs)) - } - (ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => { - (lhs.to_array_of_size(rhs.len())?, Arc::clone(rhs)) - } - (ColumnarValue::Scalar(lhs), ColumnarValue::Scalar(rhs)) => { - let mut current_value = lhs; - if lhs.is_null() { - current_value = rhs; - } - return Ok(ColumnarValue::Scalar(current_value.clone())); - } - }; - let to_apply = is_not_null(&lhs_array)?; - let value = zip(&to_apply, &lhs_array, &rhs_array)?; - Ok(ColumnarValue::Array(value)) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::array::*; - use super::*; - use datafusion_common::ScalarValue; - - #[test] - fn nvl_int32() -> Result<()> { - let a = Int32Array::from(vec![ - Some(1), - Some(2), - None, - None, - Some(3), - None, - None, - Some(4), - Some(5), - ]); - let a = ColumnarValue::Array(Arc::new(a)); - - let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(6i32))); - - let result = nvl_func(&[a, lit_array])?; - let result = result.into_array(0).expect("Failed to convert to array"); - - let expected = Arc::new(Int32Array::from(vec![ - Some(1), - Some(2), - Some(6), - Some(6), - Some(3), - Some(6), - Some(6), - Some(4), - Some(5), - ])) as ArrayRef; - assert_eq!(expected.as_ref(), result.as_ref()); - Ok(()) + fn simplify( + &self, + args: Vec, + info: &dyn SimplifyInfo, + ) -> Result { + self.coalesce.simplify(args, info) } - #[test] - // Ensure that arrays with no nulls can also invoke nvl() correctly - fn nvl_int32_non_nulls() -> Result<()> { - let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]); - let a = ColumnarValue::Array(Arc::new(a)); - - let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(20i32))); - - let result = nvl_func(&[a, lit_array])?; - let result = result.into_array(0).expect("Failed to convert to array"); - - let expected = Arc::new(Int32Array::from(vec![ - Some(1), - Some(3), - Some(10), - Some(7), - Some(8), - Some(1), - Some(2), - Some(4), - Some(5), - ])) as ArrayRef; - assert_eq!(expected.as_ref(), result.as_ref()); - Ok(()) + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + self.coalesce.invoke_with_args(args) } - #[test] - fn nvl_boolean() -> Result<()> { - let a = BooleanArray::from(vec![Some(true), Some(false), None]); - let a = ColumnarValue::Array(Arc::new(a)); - - let lit_array = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))); - - let result = nvl_func(&[a, lit_array])?; - let result = result.into_array(0).expect("Failed to convert to array"); - - let expected = Arc::new(BooleanArray::from(vec![ - Some(true), - Some(false), - Some(false), - ])) as ArrayRef; - - assert_eq!(expected.as_ref(), result.as_ref()); - Ok(()) + fn conditional_arguments<'a>( + &self, + args: &'a [Expr], + ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> { + self.coalesce.conditional_arguments(args) } - #[test] - fn nvl_string() -> Result<()> { - let a = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); - let a = ColumnarValue::Array(Arc::new(a)); - - let lit_array = ColumnarValue::Scalar(ScalarValue::from("bax")); - - let result = nvl_func(&[a, lit_array])?; - let result = result.into_array(0).expect("Failed to convert to array"); - - let expected = Arc::new(StringArray::from(vec![ - Some("foo"), - Some("bar"), - Some("bax"), - Some("baz"), - ])) as ArrayRef; - - assert_eq!(expected.as_ref(), result.as_ref()); - Ok(()) + fn short_circuits(&self) -> bool { + self.coalesce.short_circuits() } - #[test] - fn nvl_literal_first() -> Result<()> { - let a = Int32Array::from(vec![Some(1), Some(2), None, None, Some(3), Some(4)]); - let a = ColumnarValue::Array(Arc::new(a)); - - let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); - - let result = nvl_func(&[lit_array, a])?; - let result = result.into_array(0).expect("Failed to convert to array"); - - let expected = Arc::new(Int32Array::from(vec![ - Some(2), - Some(2), - Some(2), - Some(2), - Some(2), - Some(2), - ])) as ArrayRef; - assert_eq!(expected.as_ref(), result.as_ref()); - Ok(()) + fn aliases(&self) -> &[String] { + &self.aliases } - #[test] - fn nvl_scalar() -> Result<()> { - let a_null = ColumnarValue::Scalar(ScalarValue::Int32(None)); - let b_null = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); - - let result_null = nvl_func(&[a_null, b_null])?; - let result_null = result_null - .into_array(1) - .expect("Failed to convert to array"); - - let expected_null = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef; - - assert_eq!(expected_null.as_ref(), result_null.as_ref()); - - let a_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); - let b_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32))); - - let result_nnull = nvl_func(&[a_nnull, b_nnull])?; - let result_nnull = result_nnull - .into_array(1) - .expect("Failed to convert to array"); - - let expected_nnull = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef; - assert_eq!(expected_nnull.as_ref(), result_nnull.as_ref()); - - Ok(()) + fn documentation(&self) -> Option<&Documentation> { + self.doc() } } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index ec1f8f991a8e..251006849459 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -652,10 +652,8 @@ impl CSEController for ExprCSEController<'_> { // In case of `ScalarFunction`s we don't know which children are surely // executed so start visiting all children conditionally and stop the // recursion with `TreeNodeRecursion::Jump`. - Expr::ScalarFunction(ScalarFunction { func, args }) - if func.short_circuits() => - { - Some((vec![], args.iter().collect())) + Expr::ScalarFunction(ScalarFunction { func, args }) => { + func.conditional_arguments(args) } // In case of `And` and `Or` the first child is surely executed, but we diff --git a/datafusion/sqllogictest/test_files/string/string_view.slt b/datafusion/sqllogictest/test_files/string/string_view.slt index fb67daa0b840..4d30f572ad6f 100644 --- a/datafusion/sqllogictest/test_files/string/string_view.slt +++ b/datafusion/sqllogictest/test_files/string/string_view.slt @@ -988,7 +988,7 @@ query TT EXPLAIN SELECT NVL(column1_utf8view, 'a') as c2 FROM test; ---- logical_plan -01)Projection: nvl(test.column1_utf8view, Utf8View("a")) AS c2 +01)Projection: CASE WHEN test.column1_utf8view IS NOT NULL THEN test.column1_utf8view ELSE Utf8View("a") END AS c2 02)--TableScan: test projection=[column1_utf8view] ## Ensure no casts for nullif From d8c9848ca86df36cabe4c809fba8fa898128c143 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Thu, 9 Oct 2025 15:53:30 +0200 Subject: [PATCH 2/2] #17982 Documentation fix --- datafusion/expr/src/udf.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index c49fc62df103..4701202cefc7 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -670,7 +670,7 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// Setting this to true prevents certain optimizations such as common /// subexpression elimination /// - /// When overriding this function to return `true`, [conditional_arguments] can also be + /// When overriding this function to return `true`, [ScalarUDFImpl::conditional_arguments] can also be /// overridden to report more accurately which arguments are eagerly evaluated and which ones /// lazily. fn short_circuits(&self) -> bool { @@ -687,7 +687,7 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// When `Some` is returned, implementations must ensure that the two returned `Vec`s are /// disjunct, and that each argument from `args` is present in one the two `Vec`s. /// - /// When overriding this function, [short_circuits] should also be overridden to return `true`. + /// When overriding this function, [ScalarUDFImpl::short_circuits] should also be overridden to return `true`. fn conditional_arguments<'a>( &self, args: &'a [Expr],