diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..cdf2aee46da25 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2878,7 +2878,7 @@ mod tests { // verify that the plan correctly casts u8 to i64 // the cast from u8 to i64 for literal will be simplified, and get lit(int64(5)) // the cast here is implicit so has CastOptions with safe=true - let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64 } }, fail_on_overflow: false"#; + let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64 } }, fail_on_overflow: false, preselection_threshold: 0.2"#; assert_contains!(format!("{exec_plan:?}"), expected); Ok(()) @@ -3280,7 +3280,7 @@ mod tests { let execution_plan = plan(&logical_plan).await?; // verify that the plan correctly adds cast from Int64(1) to Utf8, and the const will be evaluated. - let expected = r#"expr: BinaryExpr { left: BinaryExpr { left: Column { name: "c1", index: 0 }, op: Eq, right: Literal { value: Utf8("a"), field: Field { name: "lit", data_type: Utf8 } }, fail_on_overflow: false }"#; + let expected = r#"expr: BinaryExpr { left: BinaryExpr { left: Column { name: "c1", index: 0 }, op: Eq, right: Literal { value: Utf8("a"), field: Field { name: "lit", data_type: Utf8 } }, fail_on_overflow: false, preselection_threshold: 0.2 }"#; assert_contains!(format!("{execution_plan:?}"), expected); diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 8df09c22bbd8d..a966333b7945f 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -48,31 +48,44 @@ use kernels::{ concat_elements_utf8view, regex_match_dyn, regex_match_dyn_scalar, }; +/// Default threshold for pre-selection optimization in AND operations. +/// When the ratio of true values in the left-hand side is below this threshold, +/// the RecordBatch will be filtered before evaluating the right-hand side. +pub const DEFAULT_PRESELECTION_THRESHOLD: f32 = 0.2; + /// Binary expression -#[derive(Debug, Clone, Eq)] +#[derive(Debug, Clone)] pub struct BinaryExpr { left: Arc, op: Operator, right: Arc, /// Specifies whether an error is returned on overflow or not fail_on_overflow: bool, + /// Threshold ratio (0.0 to 1.0) for pre-selection optimization in AND operations. + /// When the ratio of true values in the LHS is <= this threshold, pre-selection is applied. + /// Set to 0.0 to disable pre-selection, or 1.0 to always enable it for AND operations. + preselection_threshold: f32, } -// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808 +// Manually derive PartialEq, Eq and Hash to work around https://github.com/rust-lang/rust/issues/78808 +// and because f32 doesn't implement Eq impl PartialEq for BinaryExpr { fn eq(&self, other: &Self) -> bool { self.left.eq(&other.left) && self.op.eq(&other.op) && self.right.eq(&other.right) && self.fail_on_overflow.eq(&other.fail_on_overflow) + && self.preselection_threshold == other.preselection_threshold } } +impl Eq for BinaryExpr {} impl Hash for BinaryExpr { fn hash(&self, state: &mut H) { self.left.hash(state); self.op.hash(state); self.right.hash(state); self.fail_on_overflow.hash(state); + self.preselection_threshold.to_bits().hash(state); } } @@ -88,16 +101,32 @@ impl BinaryExpr { op, right, fail_on_overflow: false, + preselection_threshold: DEFAULT_PRESELECTION_THRESHOLD, } } /// Create new binary expression with explicit fail_on_overflow value pub fn with_fail_on_overflow(self, fail_on_overflow: bool) -> Self { Self { - left: self.left, - op: self.op, - right: self.right, fail_on_overflow, + ..self + } + } + + /// Set the pre-selection threshold for AND operations. + /// + /// When evaluating `lhs AND rhs`, if the ratio of true values in `lhs` + /// is less than or equal to this threshold, the RecordBatch will be + /// filtered before evaluating `rhs`, which can improve performance + /// when `rhs` is expensive to evaluate. + /// + /// - Set to `0.0` to disable pre-selection optimization + /// - Set to `1.0` to always apply pre-selection for AND operations + /// - Default is [`DEFAULT_PRESELECTION_THRESHOLD`] (0.2) + pub fn with_preselection_threshold(self, threshold: f32) -> Self { + Self { + preselection_threshold: threshold, + ..self } } @@ -115,6 +144,11 @@ impl BinaryExpr { pub fn op(&self) -> &Operator { &self.op } + + /// Get the pre-selection threshold for AND operations + pub fn preselection_threshold(&self) -> f32 { + self.preselection_threshold + } } impl std::fmt::Display for BinaryExpr { @@ -188,7 +222,7 @@ impl PhysicalExpr for BinaryExpr { let lhs = self.left.evaluate(batch)?; // Check if we can apply short-circuit evaluation. - match check_short_circuit(&lhs, &self.op) { + match check_short_circuit(&lhs, &self.op, self.preselection_threshold) { ShortCircuitStrategy::None => {} ShortCircuitStrategy::ReturnLeft => return Ok(lhs), ShortCircuitStrategy::ReturnRight => { @@ -638,25 +672,20 @@ enum ShortCircuitStrategy<'a> { PreSelection(&'a BooleanArray), } -/// Based on the results calculated from the left side of the short-circuit operation, -/// if the proportion of `true` is less than 0.2 and the current operation is an `and`, -/// the `RecordBatch` will be filtered in advance. -const PRE_SELECTION_THRESHOLD: f32 = 0.2; - /// Checks if a logical operator (`AND`/`OR`) can short-circuit evaluation based on the left-hand side (lhs) result. /// /// Short-circuiting occurs under these circumstances: /// - For `AND`: /// - if LHS is all false => short-circuit → return LHS /// - if LHS is all true => short-circuit → return RHS -/// - if LHS is mixed and true_count/sum_count <= [`PRE_SELECTION_THRESHOLD`] -> pre-selection +/// - if LHS is mixed and true_count/sum_count <= `preselection_threshold` -> pre-selection /// - For `OR`: /// - if LHS is all true => short-circuit → return LHS /// - if LHS is all false => short-circuit → return RHS /// # Arguments /// * `lhs` - The left-hand side (lhs) columnar value (array or scalar) -/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar) /// * `op` - The logical operator (`AND` or `OR`) +/// * `preselection_threshold` - Threshold ratio for pre-selection optimization in AND operations /// /// # Implementation Notes /// 1. Only works with Boolean-typed arguments (other types automatically return `false`) @@ -665,6 +694,7 @@ const PRE_SELECTION_THRESHOLD: f32 = 0.2; fn check_short_circuit<'a>( lhs: &'a ColumnarValue, op: &Operator, + preselection_threshold: f32, ) -> ShortCircuitStrategy<'a> { // Quick reject for non-logical operators,and quick judgment when op is and let is_and = match op { @@ -708,7 +738,7 @@ fn check_short_circuit<'a>( } // determine if we can pre-selection - if true_count as f32 / len as f32 <= PRE_SELECTION_THRESHOLD { + if true_count as f32 / len as f32 <= preselection_threshold { return ShortCircuitStrategy::PreSelection(bool_array); } } else { @@ -4899,7 +4929,11 @@ mod tests { let left_expr = logical2physical(&logical_col("a").eq(expr_lit(2)), &schema); let left_value = left_expr.evaluate(&batch).unwrap(); assert!(matches!( - check_short_circuit(&left_value, &Operator::And), + check_short_circuit( + &left_value, + &Operator::And, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::ReturnLeft )); @@ -4909,9 +4943,11 @@ mod tests { let ColumnarValue::Array(array) = &left_value else { panic!("Expected ColumnarValue::Array"); }; - let ShortCircuitStrategy::PreSelection(value) = - check_short_circuit(&left_value, &Operator::And) - else { + let ShortCircuitStrategy::PreSelection(value) = check_short_circuit( + &left_value, + &Operator::And, + DEFAULT_PRESELECTION_THRESHOLD, + ) else { panic!("Expected ShortCircuitStrategy::PreSelection"); }; let expected_boolean_arr: Vec<_> = @@ -4923,7 +4959,11 @@ mod tests { let left_expr = logical2physical(&logical_col("a").gt(expr_lit(0)), &schema); let left_value = left_expr.evaluate(&batch).unwrap(); assert!(matches!( - check_short_circuit(&left_value, &Operator::Or), + check_short_circuit( + &left_value, + &Operator::Or, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::ReturnLeft )); @@ -4932,7 +4972,11 @@ mod tests { logical2physical(&logical_col("a").gt(expr_lit(2)), &schema); let left_value = left_expr.evaluate(&batch).unwrap(); assert!(matches!( - check_short_circuit(&left_value, &Operator::Or), + check_short_circuit( + &left_value, + &Operator::Or, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::None )); @@ -4968,13 +5012,21 @@ mod tests { let mixed_nulls = logical2physical(&logical_col("c"), &schema_nullable); let mixed_nulls_value = mixed_nulls.evaluate(&batch_nullable).unwrap(); assert!(matches!( - check_short_circuit(&mixed_nulls_value, &Operator::And), + check_short_circuit( + &mixed_nulls_value, + &Operator::And, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::None )); // Case: Mixed values with nulls - shouldn't short-circuit for OR assert!(matches!( - check_short_circuit(&mixed_nulls_value, &Operator::Or), + check_short_circuit( + &mixed_nulls_value, + &Operator::Or, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::None )); @@ -4991,11 +5043,19 @@ mod tests { // All nulls shouldn't short-circuit for AND or OR assert!(matches!( - check_short_circuit(&null_value, &Operator::And), + check_short_circuit( + &null_value, + &Operator::And, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::None )); assert!(matches!( - check_short_circuit(&null_value, &Operator::Or), + check_short_circuit( + &null_value, + &Operator::Or, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::None )); @@ -5003,35 +5063,130 @@ mod tests { // Scalar true let scalar_true = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))); assert!(matches!( - check_short_circuit(&scalar_true, &Operator::Or), + check_short_circuit( + &scalar_true, + &Operator::Or, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::ReturnLeft )); // Should short-circuit OR assert!(matches!( - check_short_circuit(&scalar_true, &Operator::And), + check_short_circuit( + &scalar_true, + &Operator::And, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::ReturnRight )); // Should return the RHS for AND // Scalar false let scalar_false = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))); assert!(matches!( - check_short_circuit(&scalar_false, &Operator::And), + check_short_circuit( + &scalar_false, + &Operator::And, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::ReturnLeft )); // Should short-circuit AND assert!(matches!( - check_short_circuit(&scalar_false, &Operator::Or), + check_short_circuit( + &scalar_false, + &Operator::Or, + DEFAULT_PRESELECTION_THRESHOLD + ), ShortCircuitStrategy::ReturnRight )); // Should return the RHS for OR // Scalar null let scalar_null = ColumnarValue::Scalar(ScalarValue::Boolean(None)); assert!(matches!( - check_short_circuit(&scalar_null, &Operator::And), + check_short_circuit( + &scalar_null, + &Operator::And, + DEFAULT_PRESELECTION_THRESHOLD + ), + ShortCircuitStrategy::None + )); + assert!(matches!( + check_short_circuit( + &scalar_null, + &Operator::Or, + DEFAULT_PRESELECTION_THRESHOLD + ), + ShortCircuitStrategy::None + )); + } + + #[test] + fn test_preselection_threshold() { + // Test default threshold + let expr = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::And, + Arc::new(Column::new("b", 1)), + ); + assert_eq!( + expr.preselection_threshold(), + DEFAULT_PRESELECTION_THRESHOLD + ); + assert_eq!(expr.preselection_threshold(), 0.2); + + // Test custom threshold via builder + let expr_custom = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::And, + Arc::new(Column::new("b", 1)), + ) + .with_preselection_threshold(0.5); + assert_eq!(expr_custom.preselection_threshold(), 0.5); + + // Test threshold of 0.0 (disable pre-selection) + let expr_disabled = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::And, + Arc::new(Column::new("b", 1)), + ) + .with_preselection_threshold(0.0); + assert_eq!(expr_disabled.preselection_threshold(), 0.0); + + // Test threshold of 1.0 (always pre-select for AND) + let expr_always = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::And, + Arc::new(Column::new("b", 1)), + ) + .with_preselection_threshold(1.0); + assert_eq!(expr_always.preselection_threshold(), 1.0); + + // Test that threshold affects check_short_circuit behavior + // Create a boolean array with 20% true values (exactly at threshold) + let bool_array = BooleanArray::from(vec![true, false, false, false, false]); + let value = ColumnarValue::Array(Arc::new(bool_array)); + + // With default threshold (0.2), 20% true should trigger PreSelection + assert!(matches!( + check_short_circuit(&value, &Operator::And, 0.2), + ShortCircuitStrategy::PreSelection(_) + )); + + // With threshold 0.1, 20% true should NOT trigger PreSelection + assert!(matches!( + check_short_circuit(&value, &Operator::And, 0.1), ShortCircuitStrategy::None )); + + // With threshold 0.0, should never trigger PreSelection assert!(matches!( - check_short_circuit(&scalar_null, &Operator::Or), + check_short_circuit(&value, &Operator::And, 0.0), ShortCircuitStrategy::None )); + + // With threshold 1.0, should always trigger PreSelection (for mixed values) + assert!(matches!( + check_short_circuit(&value, &Operator::And, 1.0), + ShortCircuitStrategy::PreSelection(_) + )); } /// Test for [pre_selection_scatter] diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index fd8b2667259f5..101d27823ebf0 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -486,14 +486,14 @@ mod test { ) .unwrap(); let snap = dynamic_filter_1.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#); + insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false, preselection_threshold: 0.2 }"#); let dynamic_filter_2 = reassign_expr_columns( Arc::clone(&dynamic_filter) as Arc, &filter_schema_2, ) .unwrap(); let snap = dynamic_filter_2.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#); + insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false, preselection_threshold: 0.2 }"#); // Both filters allow evaluating the same expression let batch_1 = RecordBatch::try_new( Arc::clone(&filter_schema_1), diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..5fc9e2a81cba1 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -39,7 +39,7 @@ pub use crate::PhysicalSortExpr; /// Module with some convenient methods used in expression building pub use crate::aggregate::stats::StatsType; -pub use binary::{BinaryExpr, binary, similar_to}; +pub use binary::{BinaryExpr, DEFAULT_PRESELECTION_THRESHOLD, binary, similar_to}; pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use cast_column::CastColumnExpr;