Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
Ok(tmp_result)
} else if let ColumnarValue::Array(a) = tmp_result {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
} else if let ColumnarValue::Scalar(ScalarValue::Boolean(v)) = &tmp_result {
let a = BooleanArray::from(vec![*v; tmp_batch.num_rows()]);
scatter(selection, &a).map(ColumnarValue::Array)
} else {
Ok(tmp_result)
}
Expand Down
81 changes: 48 additions & 33 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,19 @@ impl PhysicalExpr for BinaryExpr {
// as it takes into account cases where the selection contains null values.
let batch = filter_record_batch(batch, selection)?;
let right_ret = self.right.evaluate(&batch)?;
return pre_selection_scatter(selection, right_ret);
match &right_ret {
ColumnarValue::Array(array) => {
return pre_selection_scatter(selection, array.as_boolean());
}
ColumnarValue::Scalar(scalar) => {
if let ScalarValue::Boolean(v) = scalar {
let array = BooleanArray::from(vec![*v; batch.num_rows()]);
return pre_selection_scatter(selection, &array);
} else {
return Ok(right_ret);
}
}
}
}
}

Expand Down Expand Up @@ -974,13 +986,8 @@ fn check_short_circuit<'a>(
/// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`].
fn pre_selection_scatter(
left_result: &BooleanArray,
right_result: ColumnarValue,
right_result: &BooleanArray,
) -> Result<ColumnarValue> {
let right_boolean_array = match &right_result {
ColumnarValue::Array(array) => array.as_boolean(),
ColumnarValue::Scalar(_) => return Ok(right_result),
};

let result_len = left_result.len();

let mut result_array_builder = BooleanArray::builder(result_len);
Expand All @@ -998,7 +1005,7 @@ fn pre_selection_scatter(

// copy values from right array for this slice
let len = end - start;
right_boolean_array
right_result
.slice(right_array_pos, len)
.iter()
.for_each(|v| result_array_builder.append_option(v));
Expand Down Expand Up @@ -5211,7 +5218,6 @@ mod tests {
/// 4. Test single true at first position
/// 5. Test single true at last position
/// 6. Test nulls in right array
/// 7. Test scalar right handling
#[test]
fn test_pre_selection_scatter() {
fn create_bool_array(bools: Vec<bool>) -> BooleanArray {
Expand All @@ -5222,11 +5228,9 @@ mod tests {
// Left: [T, F, T, F, T]
// Right: [F, T, F] (values for 3 true positions)
let left = create_bool_array(vec![true, false, true, false, true]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![
false, true, false,
])));
let right = create_bool_array(vec![false, true, false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, &right).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = create_bool_array(vec![false, false, true, false, false]);
Expand All @@ -5238,11 +5242,9 @@ mod tests {
// Right: [T, F, F, T, F]
let left =
create_bool_array(vec![false, true, true, false, true, true, true]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![
true, false, false, true, false,
])));
let right = create_bool_array(vec![true, false, false, true, false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, &right).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected =
Expand All @@ -5254,9 +5256,9 @@ mod tests {
// Left: [T, F, F]
// Right: [F]
let left = create_bool_array(vec![true, false, false]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false])));
let right = create_bool_array(vec![false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, &right).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = create_bool_array(vec![false, false, false]);
Expand All @@ -5267,9 +5269,9 @@ mod tests {
// Left: [F, F, T]
// Right: [F]
let left = create_bool_array(vec![false, false, true]);
let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false])));
let right = create_bool_array(vec![false]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, &right).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = create_bool_array(vec![false, false, false]);
Expand All @@ -5280,10 +5282,9 @@ mod tests {
// Left: [F, T, F, T]
// Right: [None, Some(false)] (with null at first position)
let left = create_bool_array(vec![false, true, false, true]);
let right_arr = BooleanArray::from(vec![None, Some(false)]);
let right = ColumnarValue::Array(Arc::new(right_arr));
let right = BooleanArray::from(vec![None, Some(false)]);

let result = pre_selection_scatter(&left, right).unwrap();
let result = pre_selection_scatter(&left, &right).unwrap();
let result_arr = result.into_array(left.len()).unwrap();

let expected = BooleanArray::from(vec![
Expand All @@ -5294,16 +5295,30 @@ mod tests {
]);
assert_eq!(&expected, result_arr.as_boolean());
}
// Test scalar right handling
{
// Left: [T, F, T]
// Right: Scalar true
let left = create_bool_array(vec![true, false, true]);
let right = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)));
}

let result = pre_selection_scatter(&left, right).unwrap();
assert!(matches!(result, ColumnarValue::Scalar(_)));
}
#[test]
fn test_and_true_preselection_returns_lhs() {
let schema =
Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)]));
let c_array = Arc::new(BooleanArray::from(vec![false, true, false, false, false]))
as ArrayRef;
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)])
.unwrap();

let expr = logical2physical(&logical_col("c").and(expr_lit(true)), &schema);

let result = expr.evaluate(&batch).unwrap();
let ColumnarValue::Array(result_arr) = result else {
panic!("Expected ColumnarValue::Array");
};

let expected: Vec<_> = c_array.as_boolean().iter().collect();
let actual: Vec<_> = result_arr.as_boolean().iter().collect();
assert_eq!(
expected, actual,
"AND with TRUE must equal LHS even with PreSelection"
);
}

#[test]
Expand Down
Loading