diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index b4cb08715f53..84db63ca3b98 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::utils::scatter; -use arrow::array::BooleanArray; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; @@ -106,6 +106,20 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { Ok(tmp_result) } else if let ColumnarValue::Array(a) = tmp_result { scatter(selection, a.as_ref()).map(ColumnarValue::Array) + } else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result { + // When the scalar is true or false, skip the scatter process + if let Some(v) = value { + if *v { + return Ok(ColumnarValue::from( + Arc::new(selection.clone()) as ArrayRef + )); + } else { + return Ok(tmp_result); + } + } else { + let array = BooleanArray::from(vec![None; batch.num_rows()]); + return scatter(selection, &array).map(ColumnarValue::Array); + } } else { Ok(tmp_result) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index eff948c6a0f4..abc963355bda 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -375,7 +375,44 @@ impl PhysicalExpr for BinaryExpr { // as it takes into account cases where the selection contains null values. let batch = filter_record_batch(batch, selection)?; let right_ret = self.right.evaluate(&batch)?; - return pre_selection_scatter(selection, right_ret); + + match &right_ret { + ColumnarValue::Array(array) => { + // When the array on the right is all true or all false, skip the scatter process + let boolean_array = array.as_boolean(); + let true_count = boolean_array.true_count(); + let length = boolean_array.len(); + if true_count == length { + return Ok(lhs); + } else if true_count == 0 && boolean_array.null_count() == 0 { + // If the right-hand array is returned at this point,the lengths will be inconsistent; + // returning a scalar can avoid this issue + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean( + Some(false), + ))); + } + + return pre_selection_scatter(selection, Some(boolean_array)); + } + ColumnarValue::Scalar(scalar) => { + if let ScalarValue::Boolean(v) = scalar { + // When the scalar is true or false, skip the scatter process + if let Some(v) = v { + if *v { + return Ok(lhs); + } else { + return Ok(right_ret); + } + } else { + return pre_selection_scatter(selection, None); + } + } else { + return internal_err!( + "Expected boolean scalar value, found: {right_ret:?}" + ); + } + } + } } } @@ -974,13 +1011,8 @@ fn check_short_circuit<'a>( /// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`]. fn pre_selection_scatter( left_result: &BooleanArray, - right_result: ColumnarValue, + right_result: Option<&BooleanArray>, ) -> Result { - let right_boolean_array = match &right_result { - ColumnarValue::Array(array) => array.as_boolean(), - ColumnarValue::Scalar(_) => return Ok(right_result), - }; - let result_len = left_result.len(); let mut result_array_builder = BooleanArray::builder(result_len); @@ -990,22 +1022,39 @@ fn pre_selection_scatter( // keep track of how much is filled let mut last_end = 0; - SlicesIterator::new(left_result).for_each(|(start, end)| { - // the gap needs to be filled with false - if start > last_end { - result_array_builder.append_n(start - last_end, false); + // reduce if condition in for_each + match right_result { + Some(right_result) => { + SlicesIterator::new(left_result).for_each(|(start, end)| { + // the gap needs to be filled with false + if start > last_end { + result_array_builder.append_n(start - last_end, false); + } + + // copy values from right array for this slice + let len = end - start; + right_result + .slice(right_array_pos, len) + .iter() + .for_each(|v| result_array_builder.append_option(v)); + + right_array_pos += len; + last_end = end; + }); } + None => SlicesIterator::new(left_result).for_each(|(start, end)| { + // the gap needs to be filled with false + if start > last_end { + result_array_builder.append_n(start - last_end, false); + } - // copy values from right array for this slice - let len = end - start; - right_boolean_array - .slice(right_array_pos, len) - .iter() - .for_each(|v| result_array_builder.append_option(v)); + // append nulls for this slice derictly + let len = end - start; + result_array_builder.append_nulls(len); - right_array_pos += len; - last_end = end; - }); + last_end = end; + }), + } // Fill any remaining positions with false if last_end < result_len { @@ -5211,7 +5260,6 @@ mod tests { /// 4. Test single true at first position /// 5. Test single true at last position /// 6. Test nulls in right array - /// 7. Test scalar right handling #[test] fn test_pre_selection_scatter() { fn create_bool_array(bools: Vec) -> BooleanArray { @@ -5222,11 +5270,9 @@ mod tests { // Left: [T, F, T, F, T] // Right: [F, T, F] (values for 3 true positions) let left = create_bool_array(vec![true, false, true, false, true]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![ - false, true, false, - ]))); + let right = create_bool_array(vec![false, true, false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, true, false, false]); @@ -5238,11 +5284,9 @@ mod tests { // Right: [T, F, F, T, F] let left = create_bool_array(vec![false, true, true, false, true, true, true]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![ - true, false, false, true, false, - ]))); + let right = create_bool_array(vec![true, false, false, true, false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = @@ -5254,9 +5298,9 @@ mod tests { // Left: [T, F, F] // Right: [F] let left = create_bool_array(vec![true, false, false]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false]))); + let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5267,9 +5311,9 @@ mod tests { // Left: [F, F, T] // Right: [F] let left = create_bool_array(vec![false, false, true]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false]))); + let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5280,10 +5324,9 @@ mod tests { // Left: [F, T, F, T] // Right: [None, Some(false)] (with null at first position) let left = create_bool_array(vec![false, true, false, true]); - let right_arr = BooleanArray::from(vec![None, Some(false)]); - let right = ColumnarValue::Array(Arc::new(right_arr)); + let right = BooleanArray::from(vec![None, Some(false)]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = BooleanArray::from(vec![ @@ -5294,16 +5337,30 @@ mod tests { ]); assert_eq!(&expected, result_arr.as_boolean()); } - // Test scalar right handling - { - // Left: [T, F, T] - // Right: Scalar true - let left = create_bool_array(vec![true, false, true]); - let right = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))); + } - let result = pre_selection_scatter(&left, right).unwrap(); - assert!(matches!(result, ColumnarValue::Scalar(_))); - } + #[test] + fn test_and_true_preselection_returns_lhs() { + let schema = + Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)])); + let c_array = Arc::new(BooleanArray::from(vec![false, true, false, false, false])) + as ArrayRef; + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)]) + .unwrap(); + + let expr = logical2physical(&logical_col("c").and(expr_lit(true)), &schema); + + let result = expr.evaluate(&batch).unwrap(); + let ColumnarValue::Array(result_arr) = result else { + panic!("Expected ColumnarValue::Array"); + }; + + let expected: Vec<_> = c_array.as_boolean().iter().collect(); + let actual: Vec<_> = result_arr.as_boolean().iter().collect(); + assert_eq!( + expected, actual, + "AND with TRUE must equal LHS even with PreSelection" + ); } #[test] diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md index 73e9731cdbc0..dc06f3d051bb 100644 --- a/docs/source/user-guide/sql/window_functions.md +++ b/docs/source/user-guide/sql/window_functions.md @@ -331,6 +331,8 @@ FROM employees; +-------------+--------+---------+ ``` +# + ## Analytical Functions - [first_value](#first_value)