Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use crate::utils::scatter;

use arrow::array::BooleanArray;
use arrow::array::{ArrayRef, BooleanArray};
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -106,6 +106,20 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
Ok(tmp_result)
} else if let ColumnarValue::Array(a) = tmp_result {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
} else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result {
// When the scalar is true or false, skip the scatter process
if let Some(v) = value {
if *v {
return Ok(ColumnarValue::from(
Arc::new(selection.clone()) as ArrayRef
));
} else {
return Ok(tmp_result);
}
} else {
let array = BooleanArray::from(vec![None; batch.num_rows()]);
return scatter(selection, &array).map(ColumnarValue::Array);
}
} else {
Ok(tmp_result)
}
Expand Down
147 changes: 102 additions & 45 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,44 @@ impl PhysicalExpr for BinaryExpr {
// as it takes into account cases where the selection contains null values.
let batch = filter_record_batch(batch, selection)?;
let right_ret = self.right.evaluate(&batch)?;
return pre_selection_scatter(selection, right_ret);

match &right_ret {
ColumnarValue::Array(array) => {
// When the array on the right is all true or all false, skip the scatter process
let boolean_array = array.as_boolean();
let true_count = boolean_array.true_count();
let length = boolean_array.len();
if true_count == length {
return Ok(lhs);
} else if true_count == 0 && boolean_array.null_count() == 0 {
// If the right-hand array is returned at this point,the lengths will be inconsistent;
// returning a scalar can avoid this issue
return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(
Some(false),
)));
}

return pre_selection_scatter(selection, Some(boolean_array));
}
ColumnarValue::Scalar(scalar) => {
if let ScalarValue::Boolean(v) = scalar {
// When the scalar is true or false, skip the scatter process
if let Some(v) = v {
if *v {
return Ok(lhs);
} else {
return Ok(right_ret);
}
} else {
return pre_selection_scatter(selection, None);
}
} else {
return internal_err!(
"Expected boolean scalar value, found: {right_ret:?}"
);
}
}
}
}
}

Expand Down Expand Up @@ -974,13 +1011,8 @@ fn check_short_circuit<'a>(
/// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`].
fn pre_selection_scatter(
left_result: &BooleanArray,
right_result: ColumnarValue,
right_result: Option<&BooleanArray>,
) -> Result<ColumnarValue> {
let right_boolean_array = match &right_result {
ColumnarValue::Array(array) => array.as_boolean(),
ColumnarValue::Scalar(_) => return Ok(right_result),
};

let result_len = left_result.len();

let mut result_array_builder = BooleanArray::builder(result_len);
Expand All @@ -990,22 +1022,39 @@ fn pre_selection_scatter(

// keep track of how much is filled
let mut last_end = 0;
SlicesIterator::new(left_result).for_each(|(start, end)| {
// the gap needs to be filled with false
if start > last_end {
result_array_builder.append_n(start - last_end, false);
// reduce if condition in for_each
match right_result {
Some(right_result) => {
SlicesIterator::new(left_result).for_each(|(start, end)| {
// the gap needs to be filled with false
if start > last_end {
result_array_builder.append_n(start - last_end, false);
}

// copy values from right array for this slice
let len = end - start;
right_result
.slice(right_array_pos, len)
.iter()
.for_each(|v| result_array_builder.append_option(v));

right_array_pos += len;
last_end = end;
});
}
None => SlicesIterator::new(left_result).for_each(|(start, end)| {
// the gap needs to be filled with false
if start > last_end {
result_array_builder.append_n(start - last_end, false);
}

// copy values from right array for this slice
let len = end - start;
right_boolean_array
.slice(right_array_pos, len)
.iter()
.for_each(|v| result_array_builder.append_option(v));
// append nulls for this slice derictly
let len = end - start;
result_array_builder.append_nulls(len);

right_array_pos += len;
last_end = end;
});
last_end = end;
}),
}

// Fill any remaining positions with false
if last_end < result_len {
Expand Down Expand Up @@ -5211,7 +5260,6 @@ mod tests {
/// 4. Test single true at first position
/// 5. Test single true at last position
/// 6. Test nulls in right array
/// 7. Test scalar right handling
#[test]
fn test_pre_selection_scatter() {
fn create_bool_array(bools: Vec<bool>) -> BooleanArray {
Expand All @@ -5222,11 +5270,9 @@ mod tests {
// Left: [T, F, T, F, T]
// Right: [F, T, F] (values for 3 true positions)
let left = create_bool_array(vec![true, false, true, false, true]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![
false, true, false,
])));
let right = create_bool_array(vec![false, true, false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, Some(&right)).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = create_bool_array(vec![false, false, true, false, false]);
Expand All @@ -5238,11 +5284,9 @@ mod tests {
// Right: [T, F, F, T, F]
let left =
create_bool_array(vec![false, true, true, false, true, true, true]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![
true, false, false, true, false,
])));
let right = create_bool_array(vec![true, false, false, true, false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, Some(&right)).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected =
Expand All @@ -5254,9 +5298,9 @@ mod tests {
// Left: [T, F, F]
// Right: [F]
let left = create_bool_array(vec![true, false, false]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false])));
let right = create_bool_array(vec![false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, Some(&right)).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = create_bool_array(vec![false, false, false]);
Expand All @@ -5267,9 +5311,9 @@ mod tests {
// Left: [F, F, T]
// Right: [F]
let left = create_bool_array(vec![false, false, true]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false])));
let right = create_bool_array(vec![false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, Some(&right)).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = create_bool_array(vec![false, false, false]);
Expand All @@ -5280,10 +5324,9 @@ mod tests {
// Left: [F, T, F, T]
// Right: [None, Some(false)] (with null at first position)
let left = create_bool_array(vec![false, true, false, true]);
let right_arr = BooleanArray::from(vec![None, Some(false)]);
let right = ColumnarValue::Array(Arc::new(right_arr));
let right = BooleanArray::from(vec![None, Some(false)]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, Some(&right)).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = BooleanArray::from(vec![
Expand All @@ -5294,16 +5337,30 @@ mod tests {
]);
assert_eq!(&expected, result_arr.as_boolean());
}
// Test scalar right handling
{
// Left: [T, F, T]
// Right: Scalar true
let left = create_bool_array(vec![true, false, true]);
let right = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)));
}

let result = pre_selection_scatter(&left, right).unwrap();
assert!(matches!(result, ColumnarValue::Scalar(_)));
}
#[test]
fn test_and_true_preselection_returns_lhs() {
let schema =
Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)]));
let c_array = Arc::new(BooleanArray::from(vec![false, true, false, false, false]))
as ArrayRef;
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)])
.unwrap();

let expr = logical2physical(&logical_col("c").and(expr_lit(true)), &schema);

let result = expr.evaluate(&batch).unwrap();
let ColumnarValue::Array(result_arr) = result else {
panic!("Expected ColumnarValue::Array");
};

let expected: Vec<_> = c_array.as_boolean().iter().collect();
let actual: Vec<_> = result_arr.as_boolean().iter().collect();
assert_eq!(
expected, actual,
"AND with TRUE must equal LHS even with PreSelection"
);
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions docs/source/user-guide/sql/window_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ FROM employees;
+-------------+--------+---------+
```

#

## Analytical Functions

- [first_value](#first_value)
Expand Down