diff --git a/datafusion/common/src/cse.rs b/datafusion/common/src/cse.rs index 674d3386171f..0d7e72b38b18 100644 --- a/datafusion/common/src/cse.rs +++ b/datafusion/common/src/cse.rs @@ -176,7 +176,9 @@ pub trait CSEController { /// Splits the children to normal and conditionally evaluated ones or returns `None` /// if all are always evaluated. - fn conditional_children(node: &Self::Node) -> Option>; + fn conditional_children( + node: &Self::Node, + ) -> Result>>; // Returns true if a node is valid. If a node is invalid then it can't be eliminated. // Validity is propagated up which means no subtree can be eliminated that contains @@ -362,7 +364,7 @@ where } else { // If we are already in a node that can short-circuit then start new // traversals on its normal conditional children. - match C::conditional_children(node) { + match C::conditional_children(node)? { Some((normal, conditional)) => { normal .into_iter() @@ -713,8 +715,8 @@ mod test { fn conditional_children( _: &Self::Node, - ) -> Option<(Vec<&Self::Node>, Vec<&Self::Node>)> { - None + ) -> Result, Vec<&Self::Node>)>> { + Ok(None) } fn is_valid(_node: &Self::Node) -> bool { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 1c9734a89bd3..f482a4d43850 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -116,7 +116,11 @@ pub use udaf::{ udaf_default_window_function_schema_name, AggregateUDF, AggregateUDFImpl, ReversedUDAF, SetMonotonicity, StatisticsArgs, }; -pub use udf::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl}; +pub use udf::{ + ArgumentEvaluation, DeferredScalarFunctionArg, DeferredScalarFunctionArgs, + DeferredScalarFunctionResult, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, + ScalarUDFImpl, +}; pub use udwf::{ReversedUDWF, WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index bc9e62fe6211..a71fd6665a35 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -23,11 +23,15 @@ use crate::simplify::{ExprSimplifyResult, SimplifyInfo}; use crate::sort_properties::{ExprProperties, SortProperties}; use crate::udf_eq::UdfEq; use crate::{ColumnarValue, Documentation, Expr, Signature}; +use arrow::array::{BooleanArray, RecordBatch}; +use arrow::compute::filter; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{not_impl_err, ExprSchema, Result, ScalarValue}; +use datafusion_common::{not_impl_err, DataFusionError, ExprSchema, Result, ScalarValue}; use datafusion_expr_common::dyn_eq::{DynEq, DynHash}; use datafusion_expr_common::interval_arithmetic::Interval; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::utils::scatter; use std::any::Any; use std::cmp::Ordering; use std::fmt::Debug; @@ -235,7 +239,9 @@ impl ScalarUDF { pub fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { #[cfg(debug_assertions)] let return_field = Arc::clone(&args.return_field); + let result = self.inner.invoke_with_args(args)?; + // Maybe this could be enabled always? // This doesn't use debug_assert!, but it's meant to run anywhere except on production. It's same in spirit, thus conditioning on debug_assertions. #[cfg(debug_assertions)] @@ -249,10 +255,59 @@ impl ScalarUDF { } // TODO verify return data is non-null when it was promised to be? } + Ok(result) } - /// Get the circuits of inner implementation + /// Invoke the function on `args`, returning the appropriate result. + /// + /// See [`ScalarUDFImpl::invoke_with_deferred_args`] for details. + pub fn invoke_with_deferred_args( + &self, + args: DeferredScalarFunctionArgs, + ) -> Result { + #[cfg(debug_assertions)] + let return_field = Arc::clone(&args.return_field); + + let result = self.inner.invoke_with_deferred_args(args)?; + + // Maybe this could be enabled always? + // This doesn't use debug_assert!, but it's meant to run anywhere except on production. It's same in spirit, thus conditioning on debug_assertions. + #[cfg(debug_assertions)] + { + if &result.value.data_type() != return_field.data_type() { + return datafusion_common::internal_err!("Function '{}' returned value of type '{:?}' while the following type was promised at planning time and expected: '{:?}'", + self.name(), + result.value.data_type(), + return_field.data_type() + ); + } + // TODO verify return data is non-null when it was promised to be? + } + + Ok(result) + } + + /// Determines which of the arguments passed to this function are evaluated eagerly + /// and which may be evaluated lazily. + /// + /// If all arguments are evaluated eagerly this function is allowed to return + /// `None`. + /// + /// When `Some` is returned, the length of the returned `Vec` will match the length of `args`. + /// + /// See [ScalarUDFImpl::argument_evaluation] for more information. + pub fn argument_evaluation( + &self, + args: &[Expr], + ) -> Result>> { + self.inner.argument_evaluation(args) + } + + /// Returns true if some of this `exprs` subexpressions may not be evaluated + /// and thus any side effects (like divide by zero) may not be encountered. + /// + /// See [ScalarUDFImpl::short_circuits] for more information. pub fn short_circuits(&self) -> bool { self.inner.short_circuits() } @@ -340,6 +395,119 @@ where } } +#[derive(Debug, Clone)] +pub enum ArgumentEvaluation { + Eager, + Lazy, +} + +#[derive(Debug, Clone)] +enum DeferredScalarFunctionArgInner<'a> { + Eager(ColumnarValue), + Lazy(&'a Arc, &'a RecordBatch), +} + +#[derive(Debug, Clone)] +pub struct DeferredScalarFunctionArg<'a> { + inner: DeferredScalarFunctionArgInner<'a>, +} + +impl DeferredScalarFunctionArg<'_> { + /// Creates a lazily evaluated function argument + pub fn lazy<'a>( + expr: &'a Arc, + record_batch: &'a RecordBatch, + ) -> DeferredScalarFunctionArg<'a> { + DeferredScalarFunctionArg { + inner: DeferredScalarFunctionArgInner::Lazy(expr, record_batch), + } + } + + /// Creates an eagerly evaluated function argument + pub fn eager(value: ColumnarValue) -> Self { + Self { + inner: DeferredScalarFunctionArgInner::Eager(value), + } + } + + /// Returns the value of this argument, evaluating if necessary + pub fn value(&self) -> Result { + match &self.inner { + DeferredScalarFunctionArgInner::Eager(v) => Ok(v.clone()), + DeferredScalarFunctionArgInner::Lazy(e, rb) => e.evaluate(rb), + } + } + + pub fn value_for_selection(&self, selection: &BooleanArray) -> Result { + match &self.inner { + DeferredScalarFunctionArgInner::Eager(v) => match v { + ColumnarValue::Array(array) => { + let filtered = filter(array.as_ref(), selection)?; + let scattered = scatter(selection, filtered.as_ref())?; + Ok(ColumnarValue::Array(scattered)) + } + s @ ColumnarValue::Scalar(_) => Ok(s.clone()), + }, + DeferredScalarFunctionArgInner::Lazy(e, rb) => { + e.evaluate_selection(rb, selection) + } + } + } +} + +/// Arguments passed to [`ScalarUDFImpl::invoke_with_deferred_args`] when invoking a +/// scalar function. +#[derive(Debug, Clone)] +pub struct DeferredScalarFunctionArgs<'a> { + /// The evaluated arguments to the function + pub args: Vec>, + /// Field associated with each arg, if it exists + pub arg_fields: Vec, + /// The number of rows in record batch being evaluated + pub number_rows: usize, + /// The return field of the scalar function returned (from `return_type` + /// or `return_field_from_args`) when creating the physical expression + /// from the logical expression + pub return_field: FieldRef, + /// The config options at execution time + pub config_options: Arc, +} + +impl TryFrom> for ScalarFunctionArgs { + type Error = DataFusionError; + + fn try_from( + value: DeferredScalarFunctionArgs<'_>, + ) -> std::result::Result { + let DeferredScalarFunctionArgs { + args, + arg_fields, + number_rows, + return_field, + config_options, + } = value; + + let args = args + .into_iter() + .map(|e| e.value()) + .collect::>>()?; + + Ok(ScalarFunctionArgs { + args, + arg_fields, + number_rows, + return_field, + config_options, + }) + } +} + +#[derive(Debug, Clone)] +pub struct DeferredScalarFunctionResult { + pub value: ColumnarValue, + pub all_args_were_scalar: bool, +} + /// Arguments passed to [`ScalarUDFImpl::invoke_with_args`] when invoking a /// scalar function. #[derive(Debug, Clone)] @@ -364,6 +532,12 @@ impl ScalarFunctionArgs { pub fn return_type(&self) -> &DataType { self.return_field.data_type() } + + pub fn all_args_are_scalar(&self) -> bool { + self.args + .iter() + .all(|arg| matches!(arg, ColumnarValue::Scalar(_))) + } } /// Information about arguments passed to the function @@ -604,6 +778,26 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { true } + /// Invoke the function with deferred argument evaluation returning the appropriate result. + /// + /// This function is similar to [ScalarUDF::invoke_with_args], but rather than evaluating all + /// arguments up front, the choice of when to evaluate arguments is left up to the + /// implementation of this function. + /// + /// See [ScalarUDFImpl::invoke_with_deferred_args] for details. + fn invoke_with_deferred_args( + &self, + args: DeferredScalarFunctionArgs, + ) -> Result { + let scalar_args: ScalarFunctionArgs = args.try_into()?; + let all_args_were_scalar = scalar_args.all_args_are_scalar(); + let result = self.invoke_with_args(scalar_args)?; + Ok(DeferredScalarFunctionResult { + value: result, + all_args_were_scalar, + }) + } + /// Invoke the function returning the appropriate result. /// /// # Performance @@ -648,10 +842,39 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// /// Setting this to true prevents certain optimizations such as common /// subexpression elimination + /// + /// When overriding this function to return `true`, [conditional_arguments] can also be + /// overridden to report more accurately which arguments are eagerly evaluated and which ones + /// lazily. fn short_circuits(&self) -> bool { false } + /// Determines which of the arguments passed to this function are evaluated eagerly + /// and which may be evaluated lazily. + /// + /// If all arguments are eagerly evaluated this function is allowed, but is not required to, + /// return `None`. Returning `None` is a micro optimization that saves a needless `Vec` + /// allocation. + /// + /// When `Some` is returned, implementations must ensure that the returned `Vec` had the same + /// length as `args`. + /// + /// When overriding this function, [ScalarUDFImpl::short_circuits] should also be overridden to return `true`. + /// + /// The default implementation returns `None` if [ScalarUDFImpl::short_circuits] returns `false`, + /// or a `Vec` with all values set to [ArgumentEvaluation::Lazy] otherwise. + fn argument_evaluation( + &self, + args: &[Expr], + ) -> Result>> { + if self.short_circuits() { + Ok(Some(vec![ArgumentEvaluation::Lazy; args.len()])) + } else { + Ok(None) + } + } + /// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the input /// intervals. /// @@ -825,6 +1048,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.invoke_with_args(args) } + fn invoke_with_deferred_args( + &self, + args: DeferredScalarFunctionArgs, + ) -> Result { + self.inner.invoke_with_deferred_args(args) + } + fn aliases(&self) -> &[String] { &self.aliases } @@ -837,6 +1067,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.simplify(args, info) } + fn argument_evaluation( + &self, + args: &[Expr], + ) -> Result>> { + self.inner.argument_evaluation(args) + } + fn short_circuits(&self) -> bool { self.inner.short_circuits() } diff --git a/datafusion/functions/src/core/nvl.rs b/datafusion/functions/src/core/nvl.rs index c8b34c4b1780..72b4d76105af 100644 --- a/datafusion/functions/src/core/nvl.rs +++ b/datafusion/functions/src/core/nvl.rs @@ -16,13 +16,14 @@ // under the License. use arrow::array::Array; -use arrow::compute::is_not_null; use arrow::compute::kernels::zip::zip; +use arrow::compute::{is_not_null, not, prep_null_mask_filter}; use arrow::datatypes::DataType; -use datafusion_common::{utils::take_function_args, Result}; +use datafusion_common::{plan_err, utils::take_function_args, Result}; use datafusion_expr::{ - ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, - Volatility, + ArgumentEvaluation, ColumnarValue, DeferredScalarFunctionArgs, + DeferredScalarFunctionResult, Documentation, Expr, ScalarFunctionArgs, ScalarUDFImpl, + Signature, Volatility, }; use datafusion_macros::user_doc; use std::sync::Arc; @@ -118,7 +119,7 @@ impl ScalarUDFImpl for NVLFunc { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - nvl_func(&args.args) + nvl_func_eager(&args.args) } fn aliases(&self) -> &[String] { @@ -128,9 +129,33 @@ impl ScalarUDFImpl for NVLFunc { fn documentation(&self) -> Option<&Documentation> { self.doc() } + + fn short_circuits(&self) -> bool { + true + } + + fn argument_evaluation( + &self, + args: &[Expr], + ) -> Result>> { + if args.len() != 2 { + return plan_err!("nvl/ifnull requires exactly 2 arguments"); + } + Ok(Some(vec![ + ArgumentEvaluation::Eager, + ArgumentEvaluation::Lazy, + ])) + } + + fn invoke_with_deferred_args( + &self, + args: DeferredScalarFunctionArgs, + ) -> Result { + nvl_func_lazy(&args) + } } -fn nvl_func(args: &[ColumnarValue]) -> Result { +fn nvl_func_eager(args: &[ColumnarValue]) -> Result { let [lhs, rhs] = take_function_args("nvl/ifnull", args)?; let (lhs_array, rhs_array) = match (lhs, rhs) { (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => { @@ -155,6 +180,54 @@ fn nvl_func(args: &[ColumnarValue]) -> Result { Ok(ColumnarValue::Array(value)) } +fn nvl_func_lazy( + args: &DeferredScalarFunctionArgs, +) -> Result { + let [lhs, rhs] = take_function_args("nvl/ifnull", &args.args)?; + + let lhs_value = lhs.value()?; + let lhs_array = match lhs_value { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(s) => { + return Ok(DeferredScalarFunctionResult { + value: if s.is_null() { + rhs.value()? + } else { + ColumnarValue::Array(s.to_array_of_size(args.number_rows)?) + }, + all_args_were_scalar: false, + }); + } + }; + + let not_null = is_not_null(lhs_array.as_ref())?; + if not_null.true_count() == lhs_array.len() { + return Ok(DeferredScalarFunctionResult { + value: ColumnarValue::Array(lhs_array), + all_args_were_scalar: false, + }); + } + + let selection = if not_null.nulls().is_some() { + let mask = prep_null_mask_filter(¬_null); + not(&mask)? + } else { + not(¬_null)? + }; + + let rhs_value = rhs.value_for_selection(&selection)?; + + let rhs_array = match rhs_value { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(s) => s.to_array_of_size(lhs_array.len())?, + }; + let value = zip(¬_null, &lhs_array, &rhs_array)?; + Ok(DeferredScalarFunctionResult { + value: ColumnarValue::Array(value), + all_args_were_scalar: false, + }) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -181,7 +254,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(6i32))); - let result = nvl_func(&[a, lit_array])?; + let result = nvl_func_eager(&[a, lit_array])?; let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(Int32Array::from(vec![ @@ -207,7 +280,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(20i32))); - let result = nvl_func(&[a, lit_array])?; + let result = nvl_func_eager(&[a, lit_array])?; let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(Int32Array::from(vec![ @@ -232,7 +305,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))); - let result = nvl_func(&[a, lit_array])?; + let result = nvl_func_eager(&[a, lit_array])?; let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(BooleanArray::from(vec![ @@ -252,7 +325,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::from("bax")); - let result = nvl_func(&[a, lit_array])?; + let result = nvl_func_eager(&[a, lit_array])?; let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(StringArray::from(vec![ @@ -273,7 +346,7 @@ mod tests { let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); - let result = nvl_func(&[lit_array, a])?; + let result = nvl_func_eager(&[lit_array, a])?; let result = result.into_array(0).expect("Failed to convert to array"); let expected = Arc::new(Int32Array::from(vec![ @@ -293,7 +366,7 @@ mod tests { let a_null = ColumnarValue::Scalar(ScalarValue::Int32(None)); let b_null = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); - let result_null = nvl_func(&[a_null, b_null])?; + let result_null = nvl_func_eager(&[a_null, b_null])?; let result_null = result_null .into_array(1) .expect("Failed to convert to array"); @@ -305,7 +378,7 @@ mod tests { let a_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32))); let b_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32))); - let result_nnull = nvl_func(&[a_nnull, b_nnull])?; + let result_nnull = nvl_func_eager(&[a_nnull, b_nnull])?; let result_nnull = result_nnull .into_array(1) .expect("Failed to convert to array"); diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index ec1f8f991a8e..3b09469201f9 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -34,7 +34,9 @@ use datafusion_expr::expr::{Alias, ScalarFunction}; use datafusion_expr::logical_plan::{ Aggregate, Filter, LogicalPlan, Projection, Sort, Window, }; -use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator, SortExpr}; +use datafusion_expr::{ + col, ArgumentEvaluation, BinaryExpr, Case, Expr, Operator, SortExpr, +}; const CSE_PREFIX: &str = "__common_expr"; @@ -647,15 +649,22 @@ impl<'a> ExprCSEController<'a> { impl CSEController for ExprCSEController<'_> { type Node = Expr; - fn conditional_children(node: &Expr) -> Option<(Vec<&Expr>, Vec<&Expr>)> { + fn conditional_children(node: &Expr) -> Result, Vec<&Expr>)>> { match node { - // In case of `ScalarFunction`s we don't know which children are surely - // executed so start visiting all children conditionally and stop the - // recursion with `TreeNodeRecursion::Jump`. - Expr::ScalarFunction(ScalarFunction { func, args }) - if func.short_circuits() => - { - Some((vec![], args.iter().collect())) + // In case of `ScalarFunction`s ask the function which arguments are evaluated + // eagerly and which are evaluated lazily. + Expr::ScalarFunction(ScalarFunction { func, args }) => { + Ok(func.argument_evaluation(args)?.map(|evaluation_types| { + let mut eager = vec![]; + let mut lazy = vec![]; + for (expr, evaluation) in args.iter().zip(evaluation_types) { + match evaluation { + ArgumentEvaluation::Eager => eager.push(expr), + ArgumentEvaluation::Lazy => lazy.push(expr), + }; + } + (eager, lazy) + })) } // In case of `And` and `Or` the first child is surely executed, but we @@ -664,7 +673,7 @@ impl CSEController for ExprCSEController<'_> { left, op: Operator::And | Operator::Or, right, - }) => Some((vec![left.as_ref()], vec![right.as_ref()])), + }) => Ok(Some((vec![left.as_ref()], vec![right.as_ref()]))), // In case of `Case` the optional base expression and the first when // expressions are surely executed, but we account subexpressions as @@ -673,7 +682,7 @@ impl CSEController for ExprCSEController<'_> { expr, when_then_expr, else_expr, - }) => Some(( + }) => Ok(Some(( expr.iter() .map(|e| e.as_ref()) .chain(when_then_expr.iter().take(1).map(|(when, _)| when.as_ref())) @@ -690,8 +699,8 @@ impl CSEController for ExprCSEController<'_> { ) .chain(else_expr.iter().map(|e| e.as_ref())) .collect(), - )), - _ => None, + ))), + _ => Ok(None), } } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 743d5b99cde9..394b2043df30 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -45,8 +45,8 @@ use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf; use datafusion_expr::{ - expr_vec_fmt, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, - Volatility, + expr_vec_fmt, ColumnarValue, DeferredScalarFunctionArg, DeferredScalarFunctionArgs, + ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, Volatility, }; /// Physical expression of a scalar function @@ -257,38 +257,55 @@ impl PhysicalExpr for ScalarFunctionExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let args = self - .args - .iter() - .map(|e| e.evaluate(batch)) - .collect::>>()?; - let arg_fields = self .args .iter() .map(|e| e.return_field(batch.schema_ref())) .collect::>>()?; - let input_empty = args.is_empty(); - let input_all_scalar = args - .iter() - .all(|arg| matches!(arg, ColumnarValue::Scalar(_))); - - // evaluate the function - let output = self.fun.invoke_with_args(ScalarFunctionArgs { - args, - arg_fields, - number_rows: batch.num_rows(), - return_field: Arc::clone(&self.return_field), - config_options: Arc::clone(&self.config_options), - })?; + let (output, all_args_were_scalar) = if self.fun.short_circuits() { + // Prepare for lazy argument evaluation + let args = self + .args + .iter() + .map(|e| DeferredScalarFunctionArg::lazy(e, batch)) + .collect::>(); + + let result = + self.fun + .invoke_with_deferred_args(DeferredScalarFunctionArgs { + args, + arg_fields, + number_rows: batch.num_rows(), + return_field: Arc::clone(&self.return_field), + config_options: Arc::clone(&self.config_options), + })?; + (result.value, result.all_args_were_scalar) + } else { + // Eagerly evaluate all arguments + let args = self + .args + .iter() + .map(|e| e.evaluate(batch)) + .collect::>>()?; + + let args = ScalarFunctionArgs { + args, + arg_fields, + number_rows: batch.num_rows(), + return_field: Arc::clone(&self.return_field), + config_options: Arc::clone(&self.config_options), + }; + let all_args_were_scalar = args.all_args_are_scalar(); + (self.fun.invoke_with_args(args)?, all_args_were_scalar) + }; if let ColumnarValue::Array(array) = &output { if array.len() != batch.num_rows() { // If the arguments are a non-empty slice of scalar values, we can assume that // returning a one-element array is equivalent to returning a scalar. let preserve_scalar = - array.len() == 1 && !input_empty && input_all_scalar; + array.len() == 1 && !self.args.is_empty() && all_args_were_scalar; return if preserve_scalar { ScalarValue::try_from_array(array, 0).map(ColumnarValue::Scalar) } else {