diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..2715ad98202cb 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1091,6 +1091,7 @@ impl DefaultPhysicalPlanner { filter, join_type, null_equality, + null_aware, schema: join_schema, .. }) => { @@ -1487,6 +1488,8 @@ impl DefaultPhysicalPlanner { } else if session_state.config().target_partitions() > 1 && session_state.config().repartition_joins() && prefer_hash_join + && !*null_aware + // Null-aware joins must use CollectLeft { Arc::new(HashJoinExec::try_new( physical_left, @@ -1497,6 +1500,7 @@ impl DefaultPhysicalPlanner { None, PartitionMode::Auto, *null_equality, + *null_aware, )?) } else { Arc::new(HashJoinExec::try_new( @@ -1508,6 +1512,7 @@ impl DefaultPhysicalPlanner { None, PartitionMode::CollectLeft, *null_equality, + *null_aware, )?) }; diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs index 380a47505ac2d..b7c06e78045bd 100644 --- a/datafusion/core/tests/execution/coop.rs +++ b/datafusion/core/tests/execution/coop.rs @@ -606,6 +606,7 @@ async fn join_yields( None, PartitionMode::CollectLeft, NullEquality::NullEqualsNull, + false, )?); query_yields(join, session_ctx.task_ctx()).await @@ -655,6 +656,7 @@ async fn join_agg_yields( None, PartitionMode::CollectLeft, NullEquality::NullEqualsNull, + false, )?); // Project only one column (“value” from the left side) because we just want to sum that @@ -720,6 +722,7 @@ async fn hash_join_yields( None, PartitionMode::CollectLeft, NullEquality::NullEqualsNull, + false, )?); query_yields(join, session_ctx.task_ctx()).await @@ -751,9 +754,10 @@ async fn hash_join_without_repartition_and_no_agg( /* filter */ None, &JoinType::Inner, /* output64 */ None, - // Using CollectLeft is fine—just avoid RepartitionExec’s partitioned channels. + // Using CollectLeft is fine—just avoid RepartitionExec's partitioned channels. PartitionMode::CollectLeft, NullEquality::NullEqualsNull, + false, )?); query_yields(join, session_ctx.task_ctx()).await diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index ce422494db101..111a232020d5e 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -849,6 +849,7 @@ impl JoinFuzzTestCase { None, PartitionMode::Partitioned, NullEquality::NullEqualsNothing, + false, ) .unwrap(), ) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d12739658c400..f707650b04730 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -233,6 +233,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -354,6 +355,7 @@ async fn test_static_filter_pushdown_through_hash_join() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -418,6 +420,7 @@ async fn test_static_filter_pushdown_through_hash_join() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -981,6 +984,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { None, PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ) as Arc; @@ -1170,6 +1174,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -1363,6 +1368,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { None, PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -1531,6 +1537,7 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -1550,6 +1557,7 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ) as Arc; @@ -1665,6 +1673,7 @@ async fn test_hashjoin_parent_filter_pushdown() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -2771,6 +2780,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -2899,6 +2909,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { None, PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -3049,6 +3060,7 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { None, PartitionMode::Partitioned, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -3199,6 +3211,7 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { None, PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -3333,6 +3346,7 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() { None, PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -3441,6 +3455,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { None, PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, + false, ) .unwrap(), ) as Arc; diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 37bcefd418bdb..9234a95591baa 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -222,6 +222,7 @@ async fn test_join_with_swap() { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -284,6 +285,7 @@ async fn test_left_join_no_swap() { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -333,6 +335,7 @@ async fn test_join_with_swap_semi() { None, PartitionMode::Partitioned, NullEquality::NullEqualsNothing, + false, ) .unwrap(); @@ -388,6 +391,7 @@ async fn test_join_with_swap_mark() { None, PartitionMode::Partitioned, NullEquality::NullEqualsNothing, + false, ) .unwrap(); @@ -461,6 +465,7 @@ async fn test_nested_join_swap() { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, + false, ) .unwrap(); let child_schema = child_join.schema(); @@ -478,6 +483,7 @@ async fn test_nested_join_swap() { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, + false, ) .unwrap(); @@ -518,6 +524,7 @@ async fn test_join_no_swap() { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -745,6 +752,7 @@ async fn test_hash_join_swap_on_joins_with_projections( Some(projection), PartitionMode::Partitioned, NullEquality::NullEqualsNothing, + false, )?); let swapped = join @@ -906,6 +914,7 @@ fn check_join_partition_mode( None, PartitionMode::Auto, NullEquality::NullEqualsNothing, + false, ) .unwrap(), ); @@ -1554,6 +1563,7 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { None, t.initial_mode, NullEquality::NullEqualsNothing, + false, )?) as _; let optimized_join_plan = diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index ff87ad7212967..d9b36dc4b87ce 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -1284,6 +1284,7 @@ fn test_hash_join_after_projection() -> Result<()> { None, PartitionMode::Auto, NullEquality::NullEqualsNothing, + false, )?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index 84534b4fd833d..b717f546dc422 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1139,6 +1139,7 @@ fn hash_join_exec( None, PartitionMode::Partitioned, NullEquality::NullEqualsNothing, + false, ) .unwrap(), ) diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 40beb12d48cdb..feac8190ffde4 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -247,6 +247,7 @@ pub fn hash_join_exec( None, PartitionMode::Partitioned, NullEquality::NullEqualsNothing, + false, )?)) } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6f654428e41a1..edf989a6df596 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1011,6 +1011,25 @@ impl LogicalPlanBuilder { join_keys: (Vec>, Vec>), filter: Option, null_equality: NullEquality, + ) -> Result { + self.join_detailed_with_options( + right, + join_type, + join_keys, + filter, + null_equality, + false, + ) + } + + pub fn join_detailed_with_options( + self, + right: LogicalPlan, + join_type: JoinType, + join_keys: (Vec>, Vec>), + filter: Option, + null_equality: NullEquality, + null_aware: bool, ) -> Result { if join_keys.0.len() != join_keys.1.len() { return plan_err!("left_keys and right_keys were not the same length"); @@ -1128,6 +1147,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equality, + null_aware, }))) } @@ -1201,6 +1221,7 @@ impl LogicalPlanBuilder { join_type, JoinConstraint::Using, NullEquality::NullEqualsNothing, + false, // null_aware )?; Ok(Self::new(LogicalPlan::Join(join))) @@ -1217,6 +1238,7 @@ impl LogicalPlanBuilder { JoinType::Inner, JoinConstraint::On, NullEquality::NullEqualsNothing, + false, // null_aware )?; Ok(Self::new(LogicalPlan::Join(join))) @@ -1471,6 +1493,7 @@ impl LogicalPlanBuilder { join_type, JoinConstraint::On, NullEquality::NullEqualsNothing, + false, // null_aware )?; Ok(Self::new(LogicalPlan::Join(join))) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 4219c24bfc9c9..5b143ef135eda 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -661,6 +661,7 @@ impl LogicalPlan { on, schema: _, null_equality, + null_aware, }) => { let schema = build_join_schema(left.schema(), right.schema(), &join_type)?; @@ -682,6 +683,7 @@ impl LogicalPlan { filter, schema: DFSchemaRef::new(schema), null_equality, + null_aware, })) } LogicalPlan::Subquery(_) => Ok(self), @@ -901,6 +903,7 @@ impl LogicalPlan { join_constraint, on, null_equality, + null_aware, .. }) => { let (left, right) = self.only_two_inputs(inputs)?; @@ -942,6 +945,7 @@ impl LogicalPlan { filter: filter_expr, schema: DFSchemaRef::new(schema), null_equality: *null_equality, + null_aware: *null_aware, })) } LogicalPlan::Subquery(Subquery { @@ -3781,6 +3785,14 @@ pub struct Join { pub schema: DFSchemaRef, /// Defines the null equality for the join. pub null_equality: NullEquality, + /// Whether this is a null-aware anti join (for NOT IN semantics). + /// + /// Only applies to LeftAnti joins. When true, implements SQL NOT IN semantics where: + /// - If the right side (subquery) contains any NULL in join keys, no rows are output + /// - Left side rows with NULL in join keys are not output + /// + /// This is required for correct NOT IN subquery behavior with three-valued logic. + pub null_aware: bool, } impl Join { @@ -3798,10 +3810,12 @@ impl Join { /// * `join_type` - Type of join (Inner, Left, Right, etc.) /// * `join_constraint` - Join constraint (On, Using) /// * `null_equality` - How to handle nulls in join comparisons + /// * `null_aware` - Whether this is a null-aware anti join (for NOT IN semantics) /// /// # Returns /// /// A new Join operator with the computed schema + #[expect(clippy::too_many_arguments)] pub fn try_new( left: Arc, right: Arc, @@ -3810,6 +3824,7 @@ impl Join { join_type: JoinType, join_constraint: JoinConstraint, null_equality: NullEquality, + null_aware: bool, ) -> Result { let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?; @@ -3822,6 +3837,7 @@ impl Join { join_constraint, schema: Arc::new(join_schema), null_equality, + null_aware, }) } @@ -3877,6 +3893,7 @@ impl Join { join_constraint: original_join.join_constraint, schema: Arc::new(join_schema), null_equality: original_join.null_equality, + null_aware: original_join.null_aware, }, requalified, )) @@ -5329,6 +5346,7 @@ mod tests { join_constraint: JoinConstraint::On, schema: Arc::new(left_schema.join(&right_schema)?), null_equality: NullEquality::NullEqualsNothing, + null_aware: false, })) } @@ -5440,6 +5458,7 @@ mod tests { join_type, JoinConstraint::On, NullEquality::NullEqualsNothing, + false, )?; match join_type { @@ -5585,6 +5604,7 @@ mod tests { JoinType::Inner, JoinConstraint::Using, NullEquality::NullEqualsNothing, + false, )?; let fields = join.schema.fields(); @@ -5636,6 +5656,7 @@ mod tests { JoinType::Inner, JoinConstraint::On, NullEquality::NullEqualsNothing, + false, )?; let fields = join.schema.fields(); @@ -5685,6 +5706,7 @@ mod tests { JoinType::Inner, JoinConstraint::On, NullEquality::NullEqualsNull, + false, )?; assert_eq!(join.null_equality, NullEquality::NullEqualsNull); @@ -5727,6 +5749,7 @@ mod tests { join_type, JoinConstraint::On, NullEquality::NullEqualsNothing, + false, )?; let fields = join.schema.fields(); @@ -5766,6 +5789,7 @@ mod tests { JoinType::Inner, JoinConstraint::Using, NullEquality::NullEqualsNothing, + false, )?; assert_eq!( diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 62a27b0a025ad..ac193e758cee3 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -133,6 +133,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equality, + null_aware, }) => (left, right).map_elements(f)?.update_data(|(left, right)| { LogicalPlan::Join(Join { left, @@ -143,6 +144,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equality, + null_aware, }) }), LogicalPlan::Limit(Limit { skip, fetch, input }) => input @@ -564,6 +566,7 @@ impl LogicalPlan { join_constraint, schema, null_equality, + null_aware, }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| { LogicalPlan::Join(Join { left, @@ -574,6 +577,7 @@ impl LogicalPlan { join_constraint, schema, null_equality, + null_aware, }) }), LogicalPlan::Sort(Sort { expr, input, fetch }) => expr diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index c8acb044876c4..b2742719cb9e9 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -27,7 +27,10 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::alias::AliasGenerator; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{Column, Result, assert_or_internal_err, plan_err}; +use datafusion_common::{ + Column, DFSchemaRef, ExprSchema, NullEquality, Result, assert_or_internal_err, + plan_err, +}; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; use datafusion_expr::logical_plan::{JoinType, Subquery}; @@ -310,6 +313,39 @@ fn mark_join( ) } +/// Check if join keys in the join filter may contain NULL values +/// +/// Returns true if any join key column is nullable on either side. +/// This is used to optimize null-aware anti joins: if all join keys are non-nullable, +/// we can use a regular anti join instead of the more expensive null-aware variant. +fn join_keys_may_be_null( + join_filter: &Expr, + left_schema: &DFSchemaRef, + right_schema: &DFSchemaRef, +) -> Result { + // Extract columns from the join filter + let mut columns = std::collections::HashSet::new(); + expr_to_columns(join_filter, &mut columns)?; + + // Check if any column is nullable + for col in columns { + // Check in left schema + if let Ok(field) = left_schema.field_from_column(&col) + && field.as_ref().is_nullable() + { + return Ok(true); + } + // Check in right schema + if let Ok(field) = right_schema.field_from_column(&col) + && field.as_ref().is_nullable() + { + return Ok(true); + } + } + + Ok(false) +} + fn build_join( left: &LogicalPlan, subquery: &LogicalPlan, @@ -403,6 +439,8 @@ fn build_join( // Degenerate case: no right columns referenced by the predicate(s) sub_query_alias.clone() }; + + // Mark joins don't use null-aware semantics (they use three-valued logic with mark column) let new_plan = LogicalPlanBuilder::from(left.clone()) .join_on(right_projected, join_type, Some(join_filter))? .build()?; @@ -415,10 +453,36 @@ fn build_join( return Ok(Some(new_plan)); } + // Determine if this should be a null-aware anti join + // Null-aware semantics are only needed for NOT IN subqueries, not NOT EXISTS: + // - NOT IN: Uses three-valued logic, requires null-aware handling + // - NOT EXISTS: Uses two-valued logic, regular anti join is correct + // We can distinguish them: NOT IN has in_predicate_opt, NOT EXISTS does not + // + // Additionally, if the join keys are non-nullable on both sides, we don't need + // null-aware semantics because NULLs cannot exist in the data. + let null_aware = matches!(join_type, JoinType::LeftAnti) + && in_predicate_opt.is_some() + && join_keys_may_be_null(&join_filter, left.schema(), sub_query_alias.schema())?; + // join our sub query into the main plan - let new_plan = LogicalPlanBuilder::from(left.clone()) - .join_on(sub_query_alias, join_type, Some(join_filter))? - .build()?; + let new_plan = if null_aware { + // Use join_detailed_with_options to set null_aware flag + LogicalPlanBuilder::from(left.clone()) + .join_detailed_with_options( + sub_query_alias, + join_type, + (Vec::::new(), Vec::::new()), // No equijoin keys, filter-based join + Some(join_filter), + NullEquality::NullEqualsNothing, + true, // null_aware + )? + .build()? + } else { + LogicalPlanBuilder::from(left.clone()) + .join_on(sub_query_alias, join_type, Some(join_filter))? + .build()? + }; debug!( "predicate subquery optimized:\n{}", new_plan.display_indent() diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 770291566346c..c5a3a7d96ce89 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -341,6 +341,7 @@ fn find_inner_join( filter: None, schema: join_schema, null_equality, + null_aware: false, })); } } @@ -363,6 +364,7 @@ fn find_inner_join( join_type: JoinType::Inner, join_constraint: JoinConstraint::On, null_equality, + null_aware: false, })) } @@ -1367,6 +1369,7 @@ mod tests { filter: None, schema: join_schema, null_equality: NullEquality::NullEqualsNull, // Test preservation + null_aware: false, }); // Apply filter that can create join conditions diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 2c78051c14134..58abe38d04bc7 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -119,6 +119,7 @@ impl OptimizerRule for EliminateOuterJoin { filter: join.filter.clone(), schema: Arc::clone(&join.schema), null_equality: join.null_equality, + null_aware: join.null_aware, })); Filter::try_new(filter.predicate, new_join) .map(|f| Transformed::yes(LogicalPlan::Filter(f))) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index a623faf8a2ff0..0a50761e8a9f7 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -76,6 +76,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equality, + null_aware, }) => { let left_schema = left.schema(); let right_schema = right.schema(); @@ -117,6 +118,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { // According to `is not distinct from`'s semantics, it's // safe to override it null_equality: NullEquality::NullEqualsNull, + null_aware, }))); } } @@ -132,6 +134,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equality, + null_aware, }))) } else { Ok(Transformed::no(LogicalPlan::Join(Join { @@ -143,6 +146,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equality, + null_aware, }))) } } diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 6120e1f3b5826..f3ec083efb240 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -295,6 +295,7 @@ pub fn adjust_input_keys_ordering( projection, mode, null_equality, + null_aware, .. }) = plan.as_any().downcast_ref::() { @@ -314,6 +315,7 @@ pub fn adjust_input_keys_ordering( projection.clone(), PartitionMode::Partitioned, *null_equality, + *null_aware, ) .map(|e| Arc::new(e) as _) }; @@ -618,6 +620,7 @@ pub fn reorder_join_keys_to_inputs( projection, mode, null_equality, + null_aware, .. }) = plan_any.downcast_ref::() { @@ -644,6 +647,7 @@ pub fn reorder_join_keys_to_inputs( projection.clone(), PartitionMode::Partitioned, *null_equality, + *null_aware, )?)); } } diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index f837c79a4e391..7412d0ba97812 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -184,7 +184,9 @@ pub(crate) fn try_collect_left( match (left_can_collect, right_can_collect) { (true, true) => { + // Don't swap null-aware anti joins as they have specific side requirements if hash_join.join_type().supports_swap() + && !hash_join.null_aware && should_swap_join_order(&**left, &**right)? { Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?)) @@ -198,6 +200,7 @@ pub(crate) fn try_collect_left( hash_join.projection.clone(), PartitionMode::CollectLeft, hash_join.null_equality(), + hash_join.null_aware, )?))) } } @@ -210,9 +213,11 @@ pub(crate) fn try_collect_left( hash_join.projection.clone(), PartitionMode::CollectLeft, hash_join.null_equality(), + hash_join.null_aware, )?))), (false, true) => { - if hash_join.join_type().supports_swap() { + // Don't swap null-aware anti joins as they have specific side requirements + if hash_join.join_type().supports_swap() && !hash_join.null_aware { hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some) } else { Ok(None) @@ -232,10 +237,23 @@ pub(crate) fn partitioned_hash_join( ) -> Result> { let left = hash_join.left(); let right = hash_join.right(); - if hash_join.join_type().supports_swap() && should_swap_join_order(&**left, &**right)? + // Don't swap null-aware anti joins as they have specific side requirements + if hash_join.join_type().supports_swap() + && !hash_join.null_aware + && should_swap_join_order(&**left, &**right)? { hash_join.swap_inputs(PartitionMode::Partitioned) } else { + // Null-aware anti joins must use CollectLeft mode because they track probe-side state + // (probe_side_non_empty, probe_side_has_null) per-partition, but need global knowledge + // for correct null handling. With partitioning, a partition might not see probe rows + // even if the probe side is globally non-empty, leading to incorrect NULL row handling. + let partition_mode = if hash_join.null_aware { + PartitionMode::CollectLeft + } else { + PartitionMode::Partitioned + }; + Ok(Arc::new(HashJoinExec::try_new( Arc::clone(left), Arc::clone(right), @@ -243,8 +261,9 @@ pub(crate) fn partitioned_hash_join( hash_join.filter().cloned(), hash_join.join_type(), hash_join.projection.clone(), - PartitionMode::Partitioned, + partition_mode, hash_join.null_equality(), + hash_join.null_aware, )?)) } } @@ -277,7 +296,9 @@ fn statistical_join_selection_subrule( PartitionMode::Partitioned => { let left = hash_join.left(); let right = hash_join.right(); + // Don't swap null-aware anti joins as they have specific side requirements if hash_join.join_type().supports_swap() + && !hash_join.null_aware && should_swap_join_order(&**left, &**right)? { hash_join @@ -484,6 +505,7 @@ pub fn hash_join_swap_subrule( if let Some(hash_join) = input.as_any().downcast_ref::() && hash_join.left.boundedness().is_unbounded() && !hash_join.right.boundedness().is_unbounded() + && !hash_join.null_aware // Don't swap null-aware anti joins && matches!( *hash_join.join_type(), JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b290b8549c53b..4b7271d60780d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -17,7 +17,7 @@ use std::fmt; use std::mem::size_of; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock}; use std::{any::Any, vec}; @@ -206,6 +206,11 @@ pub(super) struct JoinLeftData { /// Membership testing strategy for filter pushdown /// Contains either InList values for small build sides or hash table reference for large build sides pub(super) membership: PushdownStrategy, + /// Shared atomic flag indicating if any probe partition saw data (for null-aware anti joins) + /// This is shared across all probe partitions to provide global knowledge + pub(super) probe_side_non_empty: AtomicBool, + /// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins) + pub(super) probe_side_has_null: AtomicBool, } impl JoinLeftData { @@ -466,6 +471,8 @@ pub struct HashJoinExec { column_indices: Vec, /// The equality null-handling behavior of the join algorithm. pub null_equality: NullEquality, + /// Flag to indicate if this is a null-aware anti join + pub null_aware: bool, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side @@ -526,6 +533,7 @@ impl HashJoinExec { projection: Option>, partition_mode: PartitionMode, null_equality: NullEquality, + null_aware: bool, ) -> Result { let left_schema = left.schema(); let right_schema = right.schema(); @@ -535,6 +543,21 @@ impl HashJoinExec { check_join_is_valid(&left_schema, &right_schema, &on)?; + // Validate null_aware flag + if null_aware { + if !matches!(join_type, JoinType::LeftAnti) { + return plan_err!( + "null_aware can only be true for LeftAnti joins, got {join_type}" + ); + } + if on.len() != 1 { + return plan_err!( + "null_aware anti join only supports single column join key, got {} columns", + on.len() + ); + } + } + let (join_schema, column_indices) = build_join_schema(&left_schema, &right_schema, join_type); @@ -572,6 +595,7 @@ impl HashJoinExec { projection, column_indices, null_equality, + null_aware, cache, dynamic_filter: None, }) @@ -683,6 +707,7 @@ impl HashJoinExec { projection, self.mode, self.null_equality, + self.null_aware, ) } @@ -806,6 +831,7 @@ impl HashJoinExec { ), partition_mode, self.null_equality(), + self.null_aware, )?; // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again if matches!( @@ -988,6 +1014,7 @@ impl ExecutionPlan for HashJoinExec { projection: self.projection.clone(), column_indices: self.column_indices.clone(), null_equality: self.null_equality, + null_aware: self.null_aware, cache: Self::compute_properties( &children[0], &children[1], @@ -1018,6 +1045,7 @@ impl ExecutionPlan for HashJoinExec { projection: self.projection.clone(), column_indices: self.column_indices.clone(), null_equality: self.null_equality, + null_aware: self.null_aware, cache: self.cache.clone(), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, @@ -1187,6 +1215,7 @@ impl ExecutionPlan for HashJoinExec { self.right.output_ordering().is_some(), build_accumulator, self.mode, + self.null_aware, ))) } @@ -1252,6 +1281,7 @@ impl ExecutionPlan for HashJoinExec { None, *self.partition_mode(), self.null_equality, + self.null_aware, )?))) } else { try_embed_projection(projection, self) @@ -1344,6 +1374,7 @@ impl ExecutionPlan for HashJoinExec { projection: self.projection.clone(), column_indices: self.column_indices.clone(), null_equality: self.null_equality, + null_aware: self.null_aware, cache: self.cache.clone(), dynamic_filter: Some(HashJoinExecDynamicFilter { filter: dynamic_filter, @@ -1708,6 +1739,8 @@ async fn collect_left_input( _reservation: reservation, bounds, membership, + probe_side_non_empty: AtomicBool::new(false), + probe_side_has_null: AtomicBool::new(false), }; Ok(data) @@ -1831,6 +1864,26 @@ mod tests { TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap() } + /// Build a table with two columns supporting nullable values + fn build_table_two_cols( + a: (&str, &Vec>), + b: (&str, &Vec>), + ) -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new(a.0, DataType::Int32, true), + Field::new(b.0, DataType::Int32, true), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(a.1.clone())), + Arc::new(Int32Array::from(b.1.clone())), + ], + ) + .unwrap(); + TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap() + } + fn join( left: Arc, right: Arc, @@ -1847,6 +1900,7 @@ mod tests { None, PartitionMode::CollectLeft, null_equality, + false, ) } @@ -1867,6 +1921,7 @@ mod tests { None, PartitionMode::CollectLeft, null_equality, + false, ) } @@ -1965,6 +2020,7 @@ mod tests { None, partition_mode, null_equality, + false, )?; let columns = columns(&join.schema()); @@ -4848,6 +4904,7 @@ mod tests { None, PartitionMode::Partitioned, NullEquality::NullEqualsNothing, + false, )?; let stream = join.execute(1, task_ctx)?; @@ -5038,6 +5095,7 @@ mod tests { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, + false, )?; join.dynamic_filter = Some(HashJoinExecDynamicFilter { filter: dynamic_filter, @@ -5091,6 +5149,7 @@ mod tests { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, + false, )?; join.dynamic_filter = Some(HashJoinExecDynamicFilter { filter: dynamic_filter, @@ -5322,4 +5381,235 @@ mod tests { Ok(()) } + + /// Test null-aware anti join when probe side (right) contains NULL + /// Expected: no rows should be output (NULL in subquery means all results are unknown) + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_anti_join_probe_null(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, false); + + // Build left table (rows to potentially output) + let left = build_table_two_cols( + ("c1", &vec![Some(1), Some(2), Some(3), Some(4)]), + ("dummy", &vec![Some(10), Some(20), Some(30), Some(40)]), + ); + + // Build right table (subquery with NULL) + let right = build_table_two_cols( + ("c2", &vec![Some(1), Some(2), Some(3), None]), + ("dummy", &vec![Some(100), Some(200), Some(300), Some(400)]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("c1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("c2", &right.schema())?) as _, + )]; + + // Create null-aware anti join + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftAnti, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware = true + )?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + + // Expected: empty result (probe side has NULL, so no rows should be output) + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + ++ + ++ + "); + } + Ok(()) + } + + /// Test null-aware anti join when build side (left) contains NULL keys + /// Expected: rows with NULL keys should not be output + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_anti_join_build_null(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, false); + + // Build left table with NULL key (this row should not be output) + let left = build_table_two_cols( + ("c1", &vec![Some(1), Some(4), None]), + ("dummy", &vec![Some(10), Some(40), Some(0)]), + ); + + // Build right table (no NULL, so probe-side check passes) + let right = build_table_two_cols( + ("c2", &vec![Some(1), Some(2), Some(3)]), + ("dummy", &vec![Some(100), Some(200), Some(300)]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("c1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("c2", &right.schema())?) as _, + )]; + + // Create null-aware anti join + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftAnti, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware = true + )?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + + // Expected: only c1=4 (not c1=1 which matches, not c1=NULL) + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-------+ + | c1 | dummy | + +----+-------+ + | 4 | 40 | + +----+-------+ + "); + } + Ok(()) + } + + /// Test null-aware anti join with no NULLs (should work like regular anti join) + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_anti_join_no_nulls(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, false); + + // Build left table (no NULLs) + let left = build_table_two_cols( + ("c1", &vec![Some(1), Some(2), Some(4), Some(5)]), + ("dummy", &vec![Some(10), Some(20), Some(40), Some(50)]), + ); + + // Build right table (no NULLs) + let right = build_table_two_cols( + ("c2", &vec![Some(1), Some(2), Some(3)]), + ("dummy", &vec![Some(100), Some(200), Some(300)]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("c1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("c2", &right.schema())?) as _, + )]; + + // Create null-aware anti join + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftAnti, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware = true + )?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + + // Expected: c1=4 and c1=5 (they don't match anything in right) + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-------+ + | c1 | dummy | + +----+-------+ + | 4 | 40 | + | 5 | 50 | + +----+-------+ + "); + } + Ok(()) + } + + /// Test that null_aware validation rejects non-LeftAnti join types + #[tokio::test] + async fn test_null_aware_validation_wrong_join_type() { + let left = + build_table_two_cols(("c1", &vec![Some(1)]), ("dummy", &vec![Some(10)])); + let right = + build_table_two_cols(("c2", &vec![Some(1)]), ("dummy", &vec![Some(100)])); + + let on = vec![( + Arc::new(Column::new_with_schema("c1", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("c2", &right.schema()).unwrap()) as _, + )]; + + // Try to create null-aware Inner join (should fail) + let result = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware = true (invalid for Inner join) + ); + + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("null_aware can only be true for LeftAnti joins") + ); + } + + /// Test that null_aware validation rejects multi-column joins + #[tokio::test] + async fn test_null_aware_validation_multi_column() { + let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3])); + let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3])); + + // Try multi-column join + let on = vec![ + ( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _, + ), + ( + Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _, + ), + ]; + + // Try to create null-aware anti join with 2 columns (should fail) + let result = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftAnti, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware = true (invalid for multi-column) + ); + + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("null_aware anti join only supports single column join key") + ); + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index a08ab2eedab3b..54e620f99de7a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -21,6 +21,7 @@ //! [`super::HashJoinExec`]. See comments in [`HashJoinStream`] for more details. use std::sync::Arc; +use std::sync::atomic::Ordering; use std::task::Poll; use crate::joins::Map; @@ -224,6 +225,8 @@ pub(super) struct HashJoinStream { /// Uses `BatchCoalescer` from arrow to efficiently combine batches. /// When batches are already close to target size, they bypass coalescing. output_buffer: Box, + /// Whether this is a null-aware anti join + null_aware: bool, } impl RecordBatchStream for HashJoinStream { @@ -371,6 +374,7 @@ impl HashJoinStream { right_side_ordered: bool, build_accumulator: Option>, mode: PartitionMode, + null_aware: bool, ) -> Self { // Create output buffer with coalescing. // Use biggest_coalesce_batch_size to bypass coalescing for batches @@ -402,6 +406,7 @@ impl HashJoinStream { build_waiter: None, mode, output_buffer, + null_aware, } } @@ -484,6 +489,10 @@ impl HashJoinStream { )?; build_timer.done(); + // Note: For null-aware anti join, we need to check the probe side (right) for NULLs, + // not the build side (left). The probe-side NULL check happens during process_probe_batch. + // The probe_side_has_null flag will be set there if any probe batch contains NULL. + // Handle dynamic filter build-side information accumulation // // Dynamic filter coordination between partitions: @@ -595,6 +604,44 @@ impl HashJoinStream { let timer = self.join_metrics.join_time.timer(); + // Null-aware anti join semantics: + // For LeftAnti: output LEFT (build) rows where LEFT.key NOT IN RIGHT.key + // 1. If RIGHT (probe) contains NULL in any batch, no LEFT rows should be output + // 2. LEFT rows with NULL keys should not be output (handled in final stage) + if self.null_aware { + // Mark that we've seen a probe batch with actual rows (probe side is non-empty) + // Only set this if batch has rows - empty batches don't count + // Use shared atomic state so all partitions can see this global information + if state.batch.num_rows() > 0 { + build_side + .left_data + .probe_side_non_empty + .store(true, Ordering::Relaxed); + } + + // Check if probe side (RIGHT) contains NULL + // Since null_aware validation ensures single column join, we only check the first column + let probe_key_column = &state.values[0]; + if probe_key_column.null_count() > 0 { + // Found NULL in probe side - set shared flag to prevent any output + build_side + .left_data + .probe_side_has_null + .store(true, Ordering::Relaxed); + } + + // If probe side has NULL (detected in this or any other partition), return empty result + if build_side + .left_data + .probe_side_has_null + .load(Ordering::Relaxed) + { + timer.done(); + self.state = HashJoinStreamState::FetchProbeBatch; + return Ok(StatefulStreamResult::Continue); + } + } + // if the left side is empty, we can skip the (potentially expensive) join operation let is_empty = build_side.left_data.map().is_empty(); @@ -766,18 +813,66 @@ impl HashJoinStream { } let build_side = self.build_side.try_as_ready()?; + + // For null-aware anti join, if probe side had NULL, no rows should be output + // Check shared atomic state to get global knowledge across all partitions + if self.null_aware + && build_side + .left_data + .probe_side_has_null + .load(Ordering::Relaxed) + { + timer.done(); + self.state = HashJoinStreamState::Completed; + return Ok(StatefulStreamResult::Continue); + } if !build_side.left_data.report_probe_completed() { self.state = HashJoinStreamState::Completed; return Ok(StatefulStreamResult::Continue); } // use the global left bitmap to produce the left indices and right indices - let (left_side, right_side) = get_final_indices_from_shared_bitmap( + let (mut left_side, mut right_side) = get_final_indices_from_shared_bitmap( build_side.left_data.visited_indices_bitmap(), self.join_type, true, ); + // For null-aware anti join, filter out LEFT rows with NULL in join keys + // BUT only if the probe side (RIGHT) was non-empty. If probe side is empty, + // NULL NOT IN (empty) = TRUE, so NULL rows should be returned. + // Use shared atomic state to get global knowledge across all partitions + if self.null_aware + && self.join_type == JoinType::LeftAnti + && build_side + .left_data + .probe_side_non_empty + .load(Ordering::Relaxed) + { + // Since null_aware validation ensures single column join, we only check the first column + let build_key_column = &build_side.left_data.values()[0]; + + // Filter out indices where the key is NULL + let filtered_indices: Vec = left_side + .iter() + .filter_map(|idx| { + let idx_usize = idx.unwrap() as usize; + if build_key_column.is_null(idx_usize) { + None // Skip rows with NULL keys + } else { + Some(idx.unwrap()) + } + }) + .collect(); + + left_side = UInt64Array::from(filtered_indices); + + // Update right_side to match the new length + let mut builder = arrow::array::UInt32Builder::with_capacity(left_side.len()); + builder.append_nulls(left_side.len()); + right_side = builder.finish(); + } + self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(left_side.len()); diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index 27284bf546bc1..0455fb2a1eb6e 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -152,6 +152,7 @@ pub async fn partitioned_hash_join_with_filter( None, PartitionMode::Partitioned, null_equality, + false, // null_aware )?); let mut batches = vec![]; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index bd7dd3a6aff3c..5f590560c4675 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1111,6 +1111,7 @@ message HashJoinExecNode { datafusion_common.NullEquality null_equality = 7; JoinFilter filter = 8; repeated uint32 projection = 9; + bool null_aware = 10; } enum StreamPartitionMode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e269606d163a3..f6d364f269b48 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -8041,6 +8041,9 @@ impl serde::Serialize for HashJoinExecNode { if !self.projection.is_empty() { len += 1; } + if self.null_aware { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.HashJoinExecNode", len)?; if let Some(v) = self.left.as_ref() { struct_ser.serialize_field("left", v)?; @@ -8072,6 +8075,9 @@ impl serde::Serialize for HashJoinExecNode { if !self.projection.is_empty() { struct_ser.serialize_field("projection", &self.projection)?; } + if self.null_aware { + struct_ser.serialize_field("nullAware", &self.null_aware)?; + } struct_ser.end() } } @@ -8093,6 +8099,8 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { "nullEquality", "filter", "projection", + "null_aware", + "nullAware", ]; #[allow(clippy::enum_variant_names)] @@ -8105,6 +8113,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { NullEquality, Filter, Projection, + NullAware, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -8134,6 +8143,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { "nullEquality" | "null_equality" => Ok(GeneratedField::NullEquality), "filter" => Ok(GeneratedField::Filter), "projection" => Ok(GeneratedField::Projection), + "nullAware" | "null_aware" => Ok(GeneratedField::NullAware), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -8161,6 +8171,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { let mut null_equality__ = None; let mut filter__ = None; let mut projection__ = None; + let mut null_aware__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Left => { @@ -8214,6 +8225,12 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { .into_iter().map(|x| x.0).collect()) ; } + GeneratedField::NullAware => { + if null_aware__.is_some() { + return Err(serde::de::Error::duplicate_field("nullAware")); + } + null_aware__ = Some(map_.next_value()?); + } } } Ok(HashJoinExecNode { @@ -8225,6 +8242,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { null_equality: null_equality__.unwrap_or_default(), filter: filter__, projection: projection__.unwrap_or_default(), + null_aware: null_aware__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index cf343e0258d0b..c1afd73ec3c52 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1688,6 +1688,8 @@ pub struct HashJoinExecNode { pub filter: ::core::option::Option, #[prost(uint32, repeated, tag = "9")] pub projection: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "10")] + pub null_aware: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SymmetricHashJoinExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0666fc2979b38..45868df4ced6c 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1239,6 +1239,7 @@ impl protobuf::PhysicalPlanNode { projection, partition_mode, null_equality.into(), + hashjoin.null_aware, )?)) } @@ -2232,6 +2233,7 @@ impl protobuf::PhysicalPlanNode { projection: exec.projection.as_ref().map_or_else(Vec::new, |v| { v.iter().map(|x| *x as u32).collect::>() }), + null_aware: exec.null_aware, }, ))), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 57421fd1f25e6..31878e2e34b3d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -285,6 +285,7 @@ fn roundtrip_hash_join() -> Result<()> { None, *partition_mode, NullEquality::NullEqualsNothing, + false, )?))?; } } diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 38037ede21db2..b7dc215bff6c1 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3516,7 +3516,6 @@ AS VALUES query IT SELECT t1_id, t1_name FROM join_test_left WHERE t1_id NOT IN (SELECT t2_id FROM join_test_right) ORDER BY t1_id; ---- -NULL e #### # join_partitioned_test diff --git a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt new file mode 100644 index 0000000000000..5907a85a9b923 --- /dev/null +++ b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt @@ -0,0 +1,453 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +############# +## Null-Aware Anti Join Tests +## Tests for automatic null-aware semantics in NOT IN subqueries +############# + +statement ok +CREATE TABLE outer_table(id INT, value TEXT) AS VALUES +(1, 'a'), +(2, 'b'), +(3, 'c'), +(4, 'd'), +(NULL, 'e'); + +statement ok +CREATE TABLE inner_table_no_null(id INT, value TEXT) AS VALUES +(2, 'x'), +(4, 'y'); + +statement ok +CREATE TABLE inner_table_with_null(id INT, value TEXT) AS VALUES +(2, 'x'), +(NULL, 'y'); + +############# +## Test 1: NOT IN with no NULLs - should behave like regular anti join +############# + +query IT rowsort +SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_no_null); +---- +1 a +3 c + +# Verify the plan uses LeftAnti join +query TT +EXPLAIN SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_no_null); +---- +logical_plan +01)LeftAnti Join: outer_table.id = __correlated_sq_1.id +02)--TableScan: outer_table projection=[id, value] +03)--SubqueryAlias: __correlated_sq_1 +04)----TableScan: inner_table_no_null projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +############# +## Test 2: NOT IN with NULL in subquery - should return 0 rows (null-aware semantics) +############# + +query IT rowsort +SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_with_null); +---- + +# Verify the result is empty even though there are rows in outer_table +# that don't match the non-NULL value (2) in the subquery. +# This is correct null-aware behavior: if subquery contains NULL, result is unknown. + +############# +## Test 3: NOT IN with NULL in outer table but not in subquery +## NULL rows from outer should not appear in output +############# + +query IT rowsort +SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_no_null) AND id IS NOT NULL; +---- +1 a +3 c + +############# +## Test 4: Test with all NULL subquery +############# + +statement ok +CREATE TABLE all_null_table(id INT) AS VALUES (NULL), (NULL); + +query IT rowsort +SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM all_null_table); +---- + +############# +## Test 5: Test with empty subquery - should return all rows +############# + +statement ok +CREATE TABLE empty_table(id INT, value TEXT); + +query IT rowsort +SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM empty_table); +---- +1 a +2 b +3 c +4 d +NULL e + +############# +## Test 6: NOT IN with complex expression +############# + +query IT rowsort +SELECT * FROM outer_table WHERE id + 1 NOT IN (SELECT id FROM inner_table_no_null); +---- +2 b +4 d + +############# +## Test 7: NOT IN with complex expression and NULL in subquery +############# + +query IT rowsort +SELECT * FROM outer_table WHERE id + 1 NOT IN (SELECT id FROM inner_table_with_null); +---- + +############# +## Test 8: Multiple NOT IN conditions (AND) +############# + +statement ok +CREATE TABLE inner_table2(id INT) AS VALUES (1), (3); + +query IT rowsort +SELECT * FROM outer_table +WHERE id NOT IN (SELECT id FROM inner_table_no_null) + AND id NOT IN (SELECT id FROM inner_table2); +---- + +############# +## Test 9: Multiple NOT IN conditions (OR) +############# + +# KNOWN LIMITATION: Mark joins used for OR conditions don't support null-aware semantics. +# The NULL row is incorrectly returned here. According to SQL semantics: +# - NULL NOT IN (2, 4) = UNKNOWN +# - NULL NOT IN (1, 3) = UNKNOWN +# - UNKNOWN OR UNKNOWN = UNKNOWN (should be filtered out) +# But mark joins treat NULL keys as non-matching (FALSE), so: +# - NULL mark column = FALSE +# - NOT FALSE OR NOT FALSE = TRUE OR TRUE = TRUE (incorrectly included) +# TODO: Implement null-aware support for mark joins to fix this + +query IT rowsort +SELECT * FROM outer_table +WHERE id NOT IN (SELECT id FROM inner_table_no_null) + OR id NOT IN (SELECT id FROM inner_table2); +---- +1 a +2 b +3 c +4 d +NULL e + +############# +## Test 10: NOT IN with WHERE clause in subquery +############# + +query IT rowsort +SELECT * FROM outer_table +WHERE id NOT IN (SELECT id FROM inner_table_with_null WHERE value = 'x'); +---- +1 a +3 c +4 d + +# Note: The NULL row from inner_table_with_null is filtered out by WHERE clause, +# so this behaves like regular anti join (not null-aware) + +############# +## Test 11: Verify NULL-aware flag is set for LeftAnti joins +############# + +# Check that the physical plan shows null-aware anti join +# Note: The exact format may vary, but we should see LeftAnti join type +query TT +EXPLAIN SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_with_null); +---- +logical_plan +01)LeftAnti Join: outer_table.id = __correlated_sq_1.id +02)--TableScan: outer_table projection=[id, value] +03)--SubqueryAlias: __correlated_sq_1 +04)----TableScan: inner_table_with_null projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +############# +## Test 12: Correlated NOT IN subquery with NULL +############# + +statement ok +CREATE TABLE orders(order_id INT, customer_id INT) AS VALUES +(1, 100), +(2, 200), +(3, 300); + +statement ok +CREATE TABLE payments(payment_id INT, order_id INT) AS VALUES +(1, 1), +(2, NULL); + +# Find orders that don't have payments +# Should return empty because there's a NULL in payments.order_id +query I rowsort +SELECT order_id FROM orders +WHERE order_id NOT IN (SELECT order_id FROM payments); +---- + +############# +## Test 13: NOT IN with DISTINCT in subquery +############# + +statement ok +CREATE TABLE duplicates_with_null(id INT) AS VALUES +(2), +(2), +(NULL), +(NULL); + +query IT rowsort +SELECT * FROM outer_table +WHERE id NOT IN (SELECT DISTINCT id FROM duplicates_with_null); +---- + +############# +## Test 14: NOT EXISTS vs NOT IN - Demonstrating the difference +############# + +# NOT EXISTS should NOT use null-aware semantics +# It uses two-valued logic (TRUE/FALSE), not three-valued logic (TRUE/FALSE/UNKNOWN) + +# Setup tables for comparison +statement ok +CREATE TABLE customers(id INT, name TEXT) AS VALUES +(1, 'Alice'), +(2, 'Bob'), +(3, 'Charlie'), +(NULL, 'Dave'); + +statement ok +CREATE TABLE banned(id INT) AS VALUES +(2), +(NULL); + +# Test 14a: NOT IN with NULL in subquery - Returns EMPTY (null-aware) +query IT rowsort +SELECT * FROM customers WHERE id NOT IN (SELECT id FROM banned); +---- + +# Test 14b: NOT EXISTS with NULL in subquery - Returns rows (NOT null-aware) +# This should return (1, 'Alice'), (3, 'Charlie'), (NULL, 'Dave') +# Because NOT EXISTS uses two-valued logic: NULL = NULL is FALSE, so no match found +query IT rowsort +SELECT * FROM customers c +WHERE NOT EXISTS (SELECT 1 FROM banned b WHERE c.id = b.id); +---- +1 Alice +3 Charlie +NULL Dave + +# Test 14c: Verify with EXPLAIN that NOT EXISTS doesn't use null-aware +query TT +EXPLAIN SELECT * FROM customers c +WHERE NOT EXISTS (SELECT 1 FROM banned b WHERE c.id = b.id); +---- +logical_plan +01)LeftAnti Join: c.id = __correlated_sq_1.id +02)--SubqueryAlias: c +03)----TableScan: customers projection=[id, name] +04)--SubqueryAlias: __correlated_sq_1 +05)----SubqueryAlias: b +06)------TableScan: banned projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(id@0, id@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +############# +## Test 15: NOT EXISTS - No NULLs +############# + +statement ok +CREATE TABLE active_customers(id INT) AS VALUES (1), (3); + +# Should return only Bob (id=2) and Dave (id=NULL) +query IT rowsort +SELECT * FROM customers c +WHERE NOT EXISTS (SELECT 1 FROM active_customers a WHERE c.id = a.id); +---- +2 Bob +NULL Dave + +############# +## Test 16: NOT EXISTS - Correlated subquery +############# + +statement ok +CREATE TABLE orders_test(order_id INT, customer_id INT) AS VALUES +(1, 100), +(2, 200), +(3, NULL); + +statement ok +CREATE TABLE customers_test(customer_id INT, name TEXT) AS VALUES +(100, 'Alice'), +(200, 'Bob'), +(300, 'Charlie'), +(NULL, 'Unknown'); + +# Find customers with no orders +# Should return Charlie (300) and Unknown (NULL) +query IT rowsort +SELECT * FROM customers_test c +WHERE NOT EXISTS ( + SELECT 1 FROM orders_test o WHERE o.customer_id = c.customer_id +); +---- +300 Charlie +NULL Unknown + +############# +## Test 17: NOT EXISTS with all NULL subquery +############# + +statement ok +CREATE TABLE all_null_banned(id INT) AS VALUES (NULL), (NULL); + +# NOT EXISTS should return all rows because NULL = NULL is FALSE (no matches) +query IT rowsort +SELECT * FROM customers c +WHERE NOT EXISTS (SELECT 1 FROM all_null_banned b WHERE c.id = b.id); +---- +1 Alice +2 Bob +3 Charlie +NULL Dave + +# Compare with NOT IN which returns empty +query IT rowsort +SELECT * FROM customers WHERE id NOT IN (SELECT id FROM all_null_banned); +---- + +############# +## Test 18: Nested NOT EXISTS and NOT IN +############# + +# NOT EXISTS outside, NOT IN inside - should work correctly +query IT rowsort +SELECT * FROM customers c +WHERE NOT EXISTS ( + SELECT 1 FROM banned b + WHERE c.id = b.id + AND b.id NOT IN (SELECT id FROM active_customers) +); +---- +1 Alice +3 Charlie +NULL Dave + +############# +## Test from GitHub issue #10583 +## Tests NOT IN with NULL in subquery result - should return empty result +############# + +statement ok +CREATE TABLE test_table(c1 INT, c2 INT) AS VALUES +(1, 1), +(2, 2), +(3, 3), +(4, NULL), +(NULL, 0); + +# When subquery contains NULL, NOT IN should return empty result +# because NULL NOT IN (values including NULL) is UNKNOWN for all rows +query II rowsort +SELECT * FROM test_table WHERE (c1 NOT IN (SELECT c2 FROM test_table)) = true; +---- + +# NOTE: The correlated subquery version from issue #10583: +# SELECT * FROM test_table t1 WHERE c1 NOT IN (SELECT c2 FROM test_table t2 WHERE t1.c1 = t2.c1) +# is not yet supported because it creates a multi-column join (correlation + NOT IN condition). +# This is a known limitation - currently only supports single column null-aware anti joins. +# This will be addressed in next Phase (multi-column support). + +############# +## Cleanup +############# + +statement ok +DROP TABLE test_table; + +statement ok +DROP TABLE outer_table; + +statement ok +DROP TABLE inner_table_no_null; + +statement ok +DROP TABLE inner_table_with_null; + +statement ok +DROP TABLE all_null_table; + +statement ok +DROP TABLE empty_table; + +statement ok +DROP TABLE inner_table2; + +statement ok +DROP TABLE orders; + +statement ok +DROP TABLE payments; + +statement ok +DROP TABLE duplicates_with_null; + +statement ok +DROP TABLE customers; + +statement ok +DROP TABLE banned; + +statement ok +DROP TABLE active_customers; + +statement ok +DROP TABLE orders_test; + +statement ok +DROP TABLE customers_test; + +statement ok +DROP TABLE all_null_banned; diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part index 0ee60a1e8afb2..b01110b567ca8 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part @@ -71,17 +71,18 @@ physical_plan 04)------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)] 05)--------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)] -07)------------AggregateExec: mode=SinglePartitioned, gby=[p_brand@1 as p_brand, p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], aggr=[] -08)--------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(ps_suppkey@0, s_suppkey@0)] -09)----------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4 -10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_partkey@0, p_partkey@0)], projection=[ps_suppkey@1, p_brand@3, p_type@4, p_size@5] -11)--------------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 -12)----------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey], file_type=csv, has_header=false -13)--------------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4 -14)----------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2 NOT LIKE MEDIUM POLISHED% AND p_size@3 IN (SET) ([49, 14, 23, 45, 19, 3, 36, 9]) -15)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -16)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_type, p_size], file_type=csv, has_header=false -17)----------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 -18)------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0] -19)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -20)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_comment], file_type=csv, has_header=false +07)------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[] +08)--------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2, alias1@3], 4), input_partitions=4 +09)----------------AggregateExec: mode=Partial, gby=[p_brand@1 as p_brand, p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], aggr=[] +10)------------------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(ps_suppkey@0, s_suppkey@0)] +11)--------------------CoalescePartitionsExec +12)----------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_partkey@0, p_partkey@0)], projection=[ps_suppkey@1, p_brand@3, p_type@4, p_size@5] +13)------------------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 +14)--------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey], file_type=csv, has_header=false +15)------------------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4 +16)--------------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2 NOT LIKE MEDIUM POLISHED% AND p_size@3 IN (SET) ([49, 14, 23, 45, 19, 3, 36, 9]) +17)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +18)------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_type, p_size], file_type=csv, has_header=false +19)--------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0] +20)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +21)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_comment], file_type=csv, has_header=false