Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions datafusion/common/src/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ pub trait CSEController {

/// Splits the children to normal and conditionally evaluated ones or returns `None`
/// if all are always evaluated.
fn conditional_children(node: &Self::Node) -> Option<ChildrenList<&Self::Node>>;
fn conditional_children(
node: &Self::Node,
) -> Result<Option<ChildrenList<&Self::Node>>>;

// Returns true if a node is valid. If a node is invalid then it can't be eliminated.
// Validity is propagated up which means no subtree can be eliminated that contains
Expand Down Expand Up @@ -362,7 +364,7 @@ where
} else {
// If we are already in a node that can short-circuit then start new
// traversals on its normal conditional children.
match C::conditional_children(node) {
match C::conditional_children(node)? {
Some((normal, conditional)) => {
normal
.into_iter()
Expand Down Expand Up @@ -713,8 +715,8 @@ mod test {

fn conditional_children(
_: &Self::Node,
) -> Option<(Vec<&Self::Node>, Vec<&Self::Node>)> {
None
) -> Result<Option<(Vec<&Self::Node>, Vec<&Self::Node>)>> {
Ok(None)
}

fn is_valid(_node: &Self::Node) -> bool {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ pub use udaf::{
udaf_default_window_function_schema_name, AggregateUDF, AggregateUDFImpl,
ReversedUDAF, SetMonotonicity, StatisticsArgs,
};
pub use udf::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl};
pub use udf::{
ArgumentEvaluation, DeferredScalarFunctionArg, DeferredScalarFunctionArgs,
DeferredScalarFunctionResult, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
ScalarUDFImpl,
};
pub use udwf::{ReversedUDWF, WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};

Expand Down
241 changes: 239 additions & 2 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ use crate::simplify::{ExprSimplifyResult, SimplifyInfo};
use crate::sort_properties::{ExprProperties, SortProperties};
use crate::udf_eq::UdfEq;
use crate::{ColumnarValue, Documentation, Expr, Signature};
use arrow::array::{BooleanArray, RecordBatch};
use arrow::compute::filter;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, ExprSchema, Result, ScalarValue};
use datafusion_common::{not_impl_err, DataFusionError, ExprSchema, Result, ScalarValue};
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::utils::scatter;
use std::any::Any;
use std::cmp::Ordering;
use std::fmt::Debug;
Expand Down Expand Up @@ -235,7 +239,9 @@ impl ScalarUDF {
pub fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
#[cfg(debug_assertions)]
let return_field = Arc::clone(&args.return_field);

let result = self.inner.invoke_with_args(args)?;

// Maybe this could be enabled always?
// This doesn't use debug_assert!, but it's meant to run anywhere except on production. It's same in spirit, thus conditioning on debug_assertions.
#[cfg(debug_assertions)]
Expand All @@ -249,10 +255,59 @@ impl ScalarUDF {
}
// TODO verify return data is non-null when it was promised to be?
}

Ok(result)
}

/// Get the circuits of inner implementation
/// Invoke the function on `args`, returning the appropriate result.
///
/// See [`ScalarUDFImpl::invoke_with_deferred_args`] for details.
pub fn invoke_with_deferred_args(
&self,
args: DeferredScalarFunctionArgs,
) -> Result<DeferredScalarFunctionResult> {
#[cfg(debug_assertions)]
let return_field = Arc::clone(&args.return_field);

let result = self.inner.invoke_with_deferred_args(args)?;

// Maybe this could be enabled always?
// This doesn't use debug_assert!, but it's meant to run anywhere except on production. It's same in spirit, thus conditioning on debug_assertions.
#[cfg(debug_assertions)]
{
if &result.value.data_type() != return_field.data_type() {
return datafusion_common::internal_err!("Function '{}' returned value of type '{:?}' while the following type was promised at planning time and expected: '{:?}'",
self.name(),
result.value.data_type(),
return_field.data_type()
);
}
// TODO verify return data is non-null when it was promised to be?
}

Ok(result)
}

/// Determines which of the arguments passed to this function are evaluated eagerly
/// and which may be evaluated lazily.
///
/// If all arguments are evaluated eagerly this function is allowed to return
/// `None`.
///
/// When `Some` is returned, the length of the returned `Vec` will match the length of `args`.
///
/// See [ScalarUDFImpl::argument_evaluation] for more information.
pub fn argument_evaluation(
&self,
args: &[Expr],
) -> Result<Option<Vec<ArgumentEvaluation>>> {
self.inner.argument_evaluation(args)
}

/// Returns true if some of this `exprs` subexpressions may not be evaluated
/// and thus any side effects (like divide by zero) may not be encountered.
///
/// See [ScalarUDFImpl::short_circuits] for more information.
pub fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
Expand Down Expand Up @@ -340,6 +395,119 @@ where
}
}

#[derive(Debug, Clone)]
pub enum ArgumentEvaluation {
Eager,
Lazy,
}

#[derive(Debug, Clone)]
enum DeferredScalarFunctionArgInner<'a> {
Eager(ColumnarValue),
Lazy(&'a Arc<dyn PhysicalExpr>, &'a RecordBatch),
}

#[derive(Debug, Clone)]
pub struct DeferredScalarFunctionArg<'a> {
inner: DeferredScalarFunctionArgInner<'a>,
}

impl DeferredScalarFunctionArg<'_> {
/// Creates a lazily evaluated function argument
pub fn lazy<'a>(
expr: &'a Arc<dyn PhysicalExpr>,
record_batch: &'a RecordBatch,
) -> DeferredScalarFunctionArg<'a> {
DeferredScalarFunctionArg {
inner: DeferredScalarFunctionArgInner::Lazy(expr, record_batch),
}
}

/// Creates an eagerly evaluated function argument
pub fn eager(value: ColumnarValue) -> Self {
Self {
inner: DeferredScalarFunctionArgInner::Eager(value),
}
}

/// Returns the value of this argument, evaluating if necessary
pub fn value(&self) -> Result<ColumnarValue> {
match &self.inner {
DeferredScalarFunctionArgInner::Eager(v) => Ok(v.clone()),
DeferredScalarFunctionArgInner::Lazy(e, rb) => e.evaluate(rb),
}
}

pub fn value_for_selection(&self, selection: &BooleanArray) -> Result<ColumnarValue> {
match &self.inner {
DeferredScalarFunctionArgInner::Eager(v) => match v {
ColumnarValue::Array(array) => {
let filtered = filter(array.as_ref(), selection)?;
let scattered = scatter(selection, filtered.as_ref())?;
Ok(ColumnarValue::Array(scattered))
}
s @ ColumnarValue::Scalar(_) => Ok(s.clone()),
},
DeferredScalarFunctionArgInner::Lazy(e, rb) => {
e.evaluate_selection(rb, selection)
}
}
}
}

/// Arguments passed to [`ScalarUDFImpl::invoke_with_deferred_args`] when invoking a
/// scalar function.
#[derive(Debug, Clone)]
pub struct DeferredScalarFunctionArgs<'a> {
/// The evaluated arguments to the function
pub args: Vec<DeferredScalarFunctionArg<'a>>,
/// Field associated with each arg, if it exists
pub arg_fields: Vec<FieldRef>,
/// The number of rows in record batch being evaluated
pub number_rows: usize,
/// The return field of the scalar function returned (from `return_type`
/// or `return_field_from_args`) when creating the physical expression
/// from the logical expression
pub return_field: FieldRef,
/// The config options at execution time
pub config_options: Arc<ConfigOptions>,
}

impl TryFrom<DeferredScalarFunctionArgs<'_>> for ScalarFunctionArgs {
type Error = DataFusionError;

fn try_from(
value: DeferredScalarFunctionArgs<'_>,
) -> std::result::Result<Self, Self::Error> {
let DeferredScalarFunctionArgs {
args,
arg_fields,
number_rows,
return_field,
config_options,
} = value;

let args = args
.into_iter()
.map(|e| e.value())
.collect::<Result<Vec<_>>>()?;

Ok(ScalarFunctionArgs {
args,
arg_fields,
number_rows,
return_field,
config_options,
})
}
}

#[derive(Debug, Clone)]
pub struct DeferredScalarFunctionResult {
pub value: ColumnarValue,
pub all_args_were_scalar: bool,
}

/// Arguments passed to [`ScalarUDFImpl::invoke_with_args`] when invoking a
/// scalar function.
#[derive(Debug, Clone)]
Expand All @@ -364,6 +532,12 @@ impl ScalarFunctionArgs {
pub fn return_type(&self) -> &DataType {
self.return_field.data_type()
}

pub fn all_args_are_scalar(&self) -> bool {
self.args
.iter()
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
}
}

/// Information about arguments passed to the function
Expand Down Expand Up @@ -604,6 +778,26 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
true
}

/// Invoke the function with deferred argument evaluation returning the appropriate result.
///
/// This function is similar to [ScalarUDF::invoke_with_args], but rather than evaluating all
/// arguments up front, the choice of when to evaluate arguments is left up to the
/// implementation of this function.
///
/// See [ScalarUDFImpl::invoke_with_deferred_args] for details.
fn invoke_with_deferred_args(
&self,
args: DeferredScalarFunctionArgs,
) -> Result<DeferredScalarFunctionResult> {
let scalar_args: ScalarFunctionArgs = args.try_into()?;
let all_args_were_scalar = scalar_args.all_args_are_scalar();
let result = self.invoke_with_args(scalar_args)?;
Ok(DeferredScalarFunctionResult {
value: result,
all_args_were_scalar,
})
}

/// Invoke the function returning the appropriate result.
///
/// # Performance
Expand Down Expand Up @@ -648,10 +842,39 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
///
/// Setting this to true prevents certain optimizations such as common
/// subexpression elimination
///
/// When overriding this function to return `true`, [conditional_arguments] can also be
/// overridden to report more accurately which arguments are eagerly evaluated and which ones
/// lazily.
fn short_circuits(&self) -> bool {
false
}

/// Determines which of the arguments passed to this function are evaluated eagerly
/// and which may be evaluated lazily.
///
/// If all arguments are eagerly evaluated this function is allowed, but is not required to,
/// return `None`. Returning `None` is a micro optimization that saves a needless `Vec`
/// allocation.
///
/// When `Some` is returned, implementations must ensure that the returned `Vec` had the same
/// length as `args`.
///
/// When overriding this function, [ScalarUDFImpl::short_circuits] should also be overridden to return `true`.
///
/// The default implementation returns `None` if [ScalarUDFImpl::short_circuits] returns `false`,
/// or a `Vec` with all values set to [ArgumentEvaluation::Lazy] otherwise.
fn argument_evaluation(
&self,
args: &[Expr],
) -> Result<Option<Vec<ArgumentEvaluation>>> {
if self.short_circuits() {
Ok(Some(vec![ArgumentEvaluation::Lazy; args.len()]))
} else {
Ok(None)
}
}

/// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the input
/// intervals.
///
Expand Down Expand Up @@ -825,6 +1048,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.invoke_with_args(args)
}

fn invoke_with_deferred_args(
&self,
args: DeferredScalarFunctionArgs,
) -> Result<DeferredScalarFunctionResult> {
self.inner.invoke_with_deferred_args(args)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
Expand All @@ -837,6 +1067,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.simplify(args, info)
}

fn argument_evaluation(
&self,
args: &[Expr],
) -> Result<Option<Vec<ArgumentEvaluation>>> {
self.inner.argument_evaluation(args)
}

fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
Expand Down
Loading
Loading