From faa5c6188f609f541286c92d644776da68742288 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Sun, 27 Jul 2025 16:51:09 +0800 Subject: [PATCH 1/5] fix error result in execute&pre_selection --- .../physical-expr-common/src/physical_expr.rs | 7 ++ .../physical-expr/src/expressions/binary.rs | 81 +++++++++++-------- 2 files changed, 55 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index b4cb08715f53..52f8f1cfcfa8 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -106,6 +106,13 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { Ok(tmp_result) } else if let ColumnarValue::Array(a) = tmp_result { scatter(selection, a.as_ref()).map(ColumnarValue::Array) + } else if let ColumnarValue::Scalar(scalar) = &tmp_result { + if let ScalarValue::Boolean(v) = scalar { + let a = BooleanArray::from(vec![*v; tmp_batch.num_rows()]); + scatter(selection, &a).map(ColumnarValue::Array) + } else { + Ok(tmp_result) + } } else { Ok(tmp_result) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index eff948c6a0f4..94f3dc9dc35c 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -375,7 +375,19 @@ impl PhysicalExpr for BinaryExpr { // as it takes into account cases where the selection contains null values. let batch = filter_record_batch(batch, selection)?; let right_ret = self.right.evaluate(&batch)?; - return pre_selection_scatter(selection, right_ret); + match &right_ret { + ColumnarValue::Array(array) => { + return pre_selection_scatter(selection, array.as_boolean()); + } + ColumnarValue::Scalar(scalar) => { + if let ScalarValue::Boolean(v) = scalar { + let array = BooleanArray::from(vec![*v; batch.num_rows()]); + return pre_selection_scatter(selection, &array); + } else { + return Ok(right_ret); + } + } + } } } @@ -974,13 +986,8 @@ fn check_short_circuit<'a>( /// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`]. fn pre_selection_scatter( left_result: &BooleanArray, - right_result: ColumnarValue, + right_result: &BooleanArray, ) -> Result { - let right_boolean_array = match &right_result { - ColumnarValue::Array(array) => array.as_boolean(), - ColumnarValue::Scalar(_) => return Ok(right_result), - }; - let result_len = left_result.len(); let mut result_array_builder = BooleanArray::builder(result_len); @@ -998,7 +1005,7 @@ fn pre_selection_scatter( // copy values from right array for this slice let len = end - start; - right_boolean_array + right_result .slice(right_array_pos, len) .iter() .for_each(|v| result_array_builder.append_option(v)); @@ -5211,7 +5218,6 @@ mod tests { /// 4. Test single true at first position /// 5. Test single true at last position /// 6. Test nulls in right array - /// 7. Test scalar right handling #[test] fn test_pre_selection_scatter() { fn create_bool_array(bools: Vec) -> BooleanArray { @@ -5222,11 +5228,9 @@ mod tests { // Left: [T, F, T, F, T] // Right: [F, T, F] (values for 3 true positions) let left = create_bool_array(vec![true, false, true, false, true]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![ - false, true, false, - ]))); + let right = create_bool_array(vec![false, true, false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, &right).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, true, false, false]); @@ -5238,11 +5242,9 @@ mod tests { // Right: [T, F, F, T, F] let left = create_bool_array(vec![false, true, true, false, true, true, true]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![ - true, false, false, true, false, - ]))); + let right = create_bool_array(vec![true, false, false, true, false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, &right).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = @@ -5254,9 +5256,9 @@ mod tests { // Left: [T, F, F] // Right: [F] let left = create_bool_array(vec![true, false, false]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false]))); + let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, &right).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5267,9 +5269,9 @@ mod tests { // Left: [F, F, T] // Right: [F] let left = create_bool_array(vec![false, false, true]); - let right = ColumnarValue::Array(Arc::new(create_bool_array(vec![false]))); + let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, &right).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5280,10 +5282,9 @@ mod tests { // Left: [F, T, F, T] // Right: [None, Some(false)] (with null at first position) let left = create_bool_array(vec![false, true, false, true]); - let right_arr = BooleanArray::from(vec![None, Some(false)]); - let right = ColumnarValue::Array(Arc::new(right_arr)); + let right = BooleanArray::from(vec![None, Some(false)]); - let result = pre_selection_scatter(&left, right).unwrap(); + let result = pre_selection_scatter(&left, &right).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = BooleanArray::from(vec![ @@ -5294,16 +5295,30 @@ mod tests { ]); assert_eq!(&expected, result_arr.as_boolean()); } - // Test scalar right handling - { - // Left: [T, F, T] - // Right: Scalar true - let left = create_bool_array(vec![true, false, true]); - let right = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))); + } - let result = pre_selection_scatter(&left, right).unwrap(); - assert!(matches!(result, ColumnarValue::Scalar(_))); - } + #[test] + fn test_and_true_preselection_returns_lhs() { + let schema = + Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)])); + let c_array = Arc::new(BooleanArray::from(vec![false, true, false, false, false])) + as ArrayRef; + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)]) + .unwrap(); + + let expr = logical2physical(&logical_col("c").and(expr_lit(true)), &schema); + + let result = expr.evaluate(&batch).unwrap(); + let ColumnarValue::Array(result_arr) = result else { + panic!("Expected ColumnarValue::Array"); + }; + + let expected: Vec<_> = c_array.as_boolean().iter().collect(); + let actual: Vec<_> = result_arr.as_boolean().iter().collect(); + assert_eq!( + expected, actual, + "AND with TRUE must equal LHS even with PreSelection" + ); } #[test] From bb6e39702b612799d42f12964e13b453cebf3031 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Sun, 27 Jul 2025 17:15:07 +0800 Subject: [PATCH 2/5] fix clippy --- datafusion/physical-expr-common/src/physical_expr.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 52f8f1cfcfa8..16affd91079f 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -106,13 +106,9 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { Ok(tmp_result) } else if let ColumnarValue::Array(a) = tmp_result { scatter(selection, a.as_ref()).map(ColumnarValue::Array) - } else if let ColumnarValue::Scalar(scalar) = &tmp_result { - if let ScalarValue::Boolean(v) = scalar { - let a = BooleanArray::from(vec![*v; tmp_batch.num_rows()]); - scatter(selection, &a).map(ColumnarValue::Array) - } else { - Ok(tmp_result) - } + } else if let ColumnarValue::Scalar(ScalarValue::Boolean(v)) = &tmp_result { + let a = BooleanArray::from(vec![*v; tmp_batch.num_rows()]); + scatter(selection, &a).map(ColumnarValue::Array) } else { Ok(tmp_result) } From 9f24c054ec564040dc61edc12375cef3729d4e89 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Sat, 2 Aug 2025 18:02:48 +0800 Subject: [PATCH 3/5] Optimize implementation --- .../physical-expr/src/expressions/binary.rs | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 94f3dc9dc35c..bfc5b4420d87 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -375,16 +375,43 @@ impl PhysicalExpr for BinaryExpr { // as it takes into account cases where the selection contains null values. let batch = filter_record_batch(batch, selection)?; let right_ret = self.right.evaluate(&batch)?; + match &right_ret { ColumnarValue::Array(array) => { - return pre_selection_scatter(selection, array.as_boolean()); + // When the array on the right is all true or all false, skip the scatter process + let boolean_array = array.as_boolean(); + let true_count = boolean_array.true_count(); + let length = boolean_array.len(); + if true_count == length { + return Ok(lhs); + } else if true_count == 0 && boolean_array.null_count() == 0 { + // If the right-hand array is returned at this point,the lengths will be inconsistent; + // returning a scalar can avoid this issue + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean( + Some(false), + ))); + } + + return pre_selection_scatter(selection, boolean_array); } ColumnarValue::Scalar(scalar) => { if let ScalarValue::Boolean(v) = scalar { - let array = BooleanArray::from(vec![*v; batch.num_rows()]); - return pre_selection_scatter(selection, &array); + // When the scalar is true or false, skip the scatter process + if let Some(v) = v { + if *v { + return Ok(lhs); + } else { + return Ok(right_ret); + } + } else { + let array = + BooleanArray::from(vec![*v; batch.num_rows()]); + return pre_selection_scatter(selection, &array); + } } else { - return Ok(right_ret); + return internal_err!( + "Expected boolean scalar value, found: {right_ret:?}" + ); } } } From bea2ee77fb74b14222841647ca2ebe1e9e085110 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Sun, 3 Aug 2025 20:55:31 +0800 Subject: [PATCH 4/5] more efficiency impl --- .../physical-expr-common/src/physical_expr.rs | 19 ++++-- .../physical-expr/src/expressions/binary.rs | 61 ++++++++++++------- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 16affd91079f..84db63ca3b98 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::utils::scatter; -use arrow::array::BooleanArray; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; @@ -106,9 +106,20 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { Ok(tmp_result) } else if let ColumnarValue::Array(a) = tmp_result { scatter(selection, a.as_ref()).map(ColumnarValue::Array) - } else if let ColumnarValue::Scalar(ScalarValue::Boolean(v)) = &tmp_result { - let a = BooleanArray::from(vec![*v; tmp_batch.num_rows()]); - scatter(selection, &a).map(ColumnarValue::Array) + } else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result { + // When the scalar is true or false, skip the scatter process + if let Some(v) = value { + if *v { + return Ok(ColumnarValue::from( + Arc::new(selection.clone()) as ArrayRef + )); + } else { + return Ok(tmp_result); + } + } else { + let array = BooleanArray::from(vec![None; batch.num_rows()]); + return scatter(selection, &array).map(ColumnarValue::Array); + } } else { Ok(tmp_result) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index bfc5b4420d87..abc963355bda 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -392,7 +392,7 @@ impl PhysicalExpr for BinaryExpr { ))); } - return pre_selection_scatter(selection, boolean_array); + return pre_selection_scatter(selection, Some(boolean_array)); } ColumnarValue::Scalar(scalar) => { if let ScalarValue::Boolean(v) = scalar { @@ -404,9 +404,7 @@ impl PhysicalExpr for BinaryExpr { return Ok(right_ret); } } else { - let array = - BooleanArray::from(vec![*v; batch.num_rows()]); - return pre_selection_scatter(selection, &array); + return pre_selection_scatter(selection, None); } } else { return internal_err!( @@ -1013,7 +1011,7 @@ fn check_short_circuit<'a>( /// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`]. fn pre_selection_scatter( left_result: &BooleanArray, - right_result: &BooleanArray, + right_result: Option<&BooleanArray>, ) -> Result { let result_len = left_result.len(); @@ -1024,22 +1022,39 @@ fn pre_selection_scatter( // keep track of how much is filled let mut last_end = 0; - SlicesIterator::new(left_result).for_each(|(start, end)| { - // the gap needs to be filled with false - if start > last_end { - result_array_builder.append_n(start - last_end, false); + // reduce if condition in for_each + match right_result { + Some(right_result) => { + SlicesIterator::new(left_result).for_each(|(start, end)| { + // the gap needs to be filled with false + if start > last_end { + result_array_builder.append_n(start - last_end, false); + } + + // copy values from right array for this slice + let len = end - start; + right_result + .slice(right_array_pos, len) + .iter() + .for_each(|v| result_array_builder.append_option(v)); + + right_array_pos += len; + last_end = end; + }); } + None => SlicesIterator::new(left_result).for_each(|(start, end)| { + // the gap needs to be filled with false + if start > last_end { + result_array_builder.append_n(start - last_end, false); + } - // copy values from right array for this slice - let len = end - start; - right_result - .slice(right_array_pos, len) - .iter() - .for_each(|v| result_array_builder.append_option(v)); + // append nulls for this slice derictly + let len = end - start; + result_array_builder.append_nulls(len); - right_array_pos += len; - last_end = end; - }); + last_end = end; + }), + } // Fill any remaining positions with false if last_end < result_len { @@ -5257,7 +5272,7 @@ mod tests { let left = create_bool_array(vec![true, false, true, false, true]); let right = create_bool_array(vec![false, true, false]); - let result = pre_selection_scatter(&left, &right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, true, false, false]); @@ -5271,7 +5286,7 @@ mod tests { create_bool_array(vec![false, true, true, false, true, true, true]); let right = create_bool_array(vec![true, false, false, true, false]); - let result = pre_selection_scatter(&left, &right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = @@ -5285,7 +5300,7 @@ mod tests { let left = create_bool_array(vec![true, false, false]); let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, &right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5298,7 +5313,7 @@ mod tests { let left = create_bool_array(vec![false, false, true]); let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, &right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5311,7 +5326,7 @@ mod tests { let left = create_bool_array(vec![false, true, false, true]); let right = BooleanArray::from(vec![None, Some(false)]); - let result = pre_selection_scatter(&left, &right).unwrap(); + let result = pre_selection_scatter(&left, Some(&right)).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = BooleanArray::from(vec![ From c198d4ecbed30a4a41c2b61275c9a51afb370c72 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Sun, 3 Aug 2025 22:20:14 +0800 Subject: [PATCH 5/5] fix CI --- docs/source/user-guide/sql/window_functions.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md index 73e9731cdbc0..dc06f3d051bb 100644 --- a/docs/source/user-guide/sql/window_functions.md +++ b/docs/source/user-guide/sql/window_functions.md @@ -331,6 +331,8 @@ FROM employees; +-------------+--------+---------+ ``` +# + ## Analytical Functions - [first_value](#first_value)