Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,28 @@ impl ScalarUDF {
Ok(result)
}

/// Get the circuits of inner implementation
/// Determines which of the arguments passed to this function are evaluated eagerly
/// and which may be evaluated lazily.
///
/// If all arguments are evaluated eagerly this function may either return `None` or `Some(args, vec![])`.
///
/// When `Some` is returned the first value of the tuple are the eagerly evaluated arguments,
/// and the second value of the tuple are the arguments that may be evaluated lazily.
/// The two sets of arguments are guaranteed to be disjunct, and each argument from `args` will
/// only be present in one the two `Vec`s.
///
/// See [ScalarUDFImpl::conditional_arguments] for more information.
pub fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
self.inner.conditional_arguments(args)
}

/// Returns true if some of this `exprs` subexpressions may not be evaluated
/// and thus any side effects (like divide by zero) may not be encountered.
///
/// See [ScalarUDFImpl::short_circuits] for more information.
pub fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
Expand Down Expand Up @@ -648,10 +669,36 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
///
/// Setting this to true prevents certain optimizations such as common
/// subexpression elimination
///
/// When overriding this function to return `true`, [ScalarUDFImpl::conditional_arguments] can also be
/// overridden to report more accurately which arguments are eagerly evaluated and which ones
/// lazily.
fn short_circuits(&self) -> bool {
false
}

/// Determines which of the arguments passed to this function are evaluated eagerly
/// and which may be evaluated lazily.
///
/// If all arguments are eagerly evaluated this function is allowed, but is not required to,
/// return `None`. Returning `None` is a micro optimization that saves a needless `Vec`
/// allocation.
///
/// When `Some` is returned, implementations must ensure that the two returned `Vec`s are
/// disjunct, and that each argument from `args` is present in one the two `Vec`s.
///
/// When overriding this function, [ScalarUDFImpl::short_circuits] should also be overridden to return `true`.
fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
if self.short_circuits() {
Some((vec![], args.iter().collect()))
} else {
None
}
}

/// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the input
/// intervals.
///
Expand Down Expand Up @@ -837,6 +884,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.simplify(args, info)
}

fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
self.inner.conditional_arguments(args)
}

fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
Expand Down
11 changes: 10 additions & 1 deletion datafusion/functions/src/core/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::any::Any;
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct CoalesceFunc {
signature: Signature,
pub(super) signature: Signature,
}

impl Default for CoalesceFunc {
Expand Down Expand Up @@ -126,6 +126,15 @@ impl ScalarUDFImpl for CoalesceFunc {
internal_err!("coalesce should have been simplified to case")
}

fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
let eager = vec![&args[0]];
let lazy = args[1..].iter().collect();
Some((eager, lazy))
}

fn short_circuits(&self) -> bool {
true
}
Expand Down
238 changes: 37 additions & 201 deletions datafusion/functions/src/core/nvl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::Array;
use arrow::compute::is_not_null;
use arrow::compute::kernels::zip::zip;
use arrow::datatypes::DataType;
use datafusion_common::{utils::take_function_args, Result};
use crate::core::coalesce::CoalesceFunc;
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::Result;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
Volatility,
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
ScalarUDFImpl, Signature, Volatility,
};
use datafusion_macros::user_doc;
use std::sync::Arc;

#[user_doc(
doc_section(label = "Conditional Functions"),
Expand Down Expand Up @@ -57,7 +55,7 @@ use std::sync::Arc;
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct NVLFunc {
signature: Signature,
coalesce: CoalesceFunc,
aliases: Vec<String>,
}

Expand Down Expand Up @@ -90,11 +88,13 @@ impl Default for NVLFunc {
impl NVLFunc {
pub fn new() -> Self {
Self {
signature: Signature::uniform(
2,
SUPPORTED_NVL_TYPES.to_vec(),
Volatility::Immutable,
),
coalesce: CoalesceFunc {
signature: Signature::uniform(
2,
SUPPORTED_NVL_TYPES.to_vec(),
Volatility::Immutable,
),
},
aliases: vec![String::from("ifnull")],
}
}
Expand All @@ -110,209 +110,45 @@ impl ScalarUDFImpl for NVLFunc {
}

fn signature(&self) -> &Signature {
&self.signature
&self.coalesce.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
self.coalesce.return_type(arg_types)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
nvl_func(&args.args)
}

fn aliases(&self) -> &[String] {
&self.aliases
}

fn documentation(&self) -> Option<&Documentation> {
self.doc()
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
self.coalesce.return_field_from_args(args)
}
}

fn nvl_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let [lhs, rhs] = take_function_args("nvl/ifnull", args)?;
let (lhs_array, rhs_array) = match (lhs, rhs) {
(ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => {
(Arc::clone(lhs), rhs.to_array_of_size(lhs.len())?)
}
(ColumnarValue::Array(lhs), ColumnarValue::Array(rhs)) => {
(Arc::clone(lhs), Arc::clone(rhs))
}
(ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => {
(lhs.to_array_of_size(rhs.len())?, Arc::clone(rhs))
}
(ColumnarValue::Scalar(lhs), ColumnarValue::Scalar(rhs)) => {
let mut current_value = lhs;
if lhs.is_null() {
current_value = rhs;
}
return Ok(ColumnarValue::Scalar(current_value.clone()));
}
};
let to_apply = is_not_null(&lhs_array)?;
let value = zip(&to_apply, &lhs_array, &rhs_array)?;
Ok(ColumnarValue::Array(value))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow::array::*;

use super::*;
use datafusion_common::ScalarValue;

#[test]
fn nvl_int32() -> Result<()> {
let a = Int32Array::from(vec![
Some(1),
Some(2),
None,
None,
Some(3),
None,
None,
Some(4),
Some(5),
]);
let a = ColumnarValue::Array(Arc::new(a));

let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(6i32)));

let result = nvl_func(&[a, lit_array])?;
let result = result.into_array(0).expect("Failed to convert to array");

let expected = Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
Some(6),
Some(6),
Some(3),
Some(6),
Some(6),
Some(4),
Some(5),
])) as ArrayRef;
assert_eq!(expected.as_ref(), result.as_ref());
Ok(())
fn simplify(
&self,
args: Vec<Expr>,
info: &dyn SimplifyInfo,
) -> Result<ExprSimplifyResult> {
self.coalesce.simplify(args, info)
}

#[test]
// Ensure that arrays with no nulls can also invoke nvl() correctly
fn nvl_int32_non_nulls() -> Result<()> {
let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]);
let a = ColumnarValue::Array(Arc::new(a));

let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(20i32)));

let result = nvl_func(&[a, lit_array])?;
let result = result.into_array(0).expect("Failed to convert to array");

let expected = Arc::new(Int32Array::from(vec![
Some(1),
Some(3),
Some(10),
Some(7),
Some(8),
Some(1),
Some(2),
Some(4),
Some(5),
])) as ArrayRef;
assert_eq!(expected.as_ref(), result.as_ref());
Ok(())
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
self.coalesce.invoke_with_args(args)
}

#[test]
fn nvl_boolean() -> Result<()> {
let a = BooleanArray::from(vec![Some(true), Some(false), None]);
let a = ColumnarValue::Array(Arc::new(a));

let lit_array = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));

let result = nvl_func(&[a, lit_array])?;
let result = result.into_array(0).expect("Failed to convert to array");

let expected = Arc::new(BooleanArray::from(vec![
Some(true),
Some(false),
Some(false),
])) as ArrayRef;

assert_eq!(expected.as_ref(), result.as_ref());
Ok(())
fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
self.coalesce.conditional_arguments(args)
}

#[test]
fn nvl_string() -> Result<()> {
let a = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
let a = ColumnarValue::Array(Arc::new(a));

let lit_array = ColumnarValue::Scalar(ScalarValue::from("bax"));

let result = nvl_func(&[a, lit_array])?;
let result = result.into_array(0).expect("Failed to convert to array");

let expected = Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
Some("bax"),
Some("baz"),
])) as ArrayRef;

assert_eq!(expected.as_ref(), result.as_ref());
Ok(())
fn short_circuits(&self) -> bool {
self.coalesce.short_circuits()
}

#[test]
fn nvl_literal_first() -> Result<()> {
let a = Int32Array::from(vec![Some(1), Some(2), None, None, Some(3), Some(4)]);
let a = ColumnarValue::Array(Arc::new(a));

let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));

let result = nvl_func(&[lit_array, a])?;
let result = result.into_array(0).expect("Failed to convert to array");

let expected = Arc::new(Int32Array::from(vec![
Some(2),
Some(2),
Some(2),
Some(2),
Some(2),
Some(2),
])) as ArrayRef;
assert_eq!(expected.as_ref(), result.as_ref());
Ok(())
fn aliases(&self) -> &[String] {
&self.aliases
}

#[test]
fn nvl_scalar() -> Result<()> {
let a_null = ColumnarValue::Scalar(ScalarValue::Int32(None));
let b_null = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));

let result_null = nvl_func(&[a_null, b_null])?;
let result_null = result_null
.into_array(1)
.expect("Failed to convert to array");

let expected_null = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef;

assert_eq!(expected_null.as_ref(), result_null.as_ref());

let a_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
let b_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));

let result_nnull = nvl_func(&[a_nnull, b_nnull])?;
let result_nnull = result_nnull
.into_array(1)
.expect("Failed to convert to array");

let expected_nnull = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef;
assert_eq!(expected_nnull.as_ref(), result_nnull.as_ref());

Ok(())
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}
Loading