Skip to content

Commit 4c67d02

Browse files
viiryaclaude
andauthored
feat: Add null-aware anti join support (#19635)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #10583. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> This patch implements null-aware anti join support for HashJoin LeftAnti operations, enabling correct SQL NOT IN subquery semantics with NULL values. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 4e1bc79 commit 4c67d02

File tree

28 files changed

+1075
-25
lines changed

28 files changed

+1075
-25
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ impl DefaultPhysicalPlanner {
10911091
filter,
10921092
join_type,
10931093
null_equality,
1094+
null_aware,
10941095
schema: join_schema,
10951096
..
10961097
}) => {
@@ -1487,6 +1488,8 @@ impl DefaultPhysicalPlanner {
14871488
} else if session_state.config().target_partitions() > 1
14881489
&& session_state.config().repartition_joins()
14891490
&& prefer_hash_join
1491+
&& !*null_aware
1492+
// Null-aware joins must use CollectLeft
14901493
{
14911494
Arc::new(HashJoinExec::try_new(
14921495
physical_left,
@@ -1497,6 +1500,7 @@ impl DefaultPhysicalPlanner {
14971500
None,
14981501
PartitionMode::Auto,
14991502
*null_equality,
1503+
*null_aware,
15001504
)?)
15011505
} else {
15021506
Arc::new(HashJoinExec::try_new(
@@ -1508,6 +1512,7 @@ impl DefaultPhysicalPlanner {
15081512
None,
15091513
PartitionMode::CollectLeft,
15101514
*null_equality,
1515+
*null_aware,
15111516
)?)
15121517
};
15131518

datafusion/core/tests/execution/coop.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@ async fn join_yields(
606606
None,
607607
PartitionMode::CollectLeft,
608608
NullEquality::NullEqualsNull,
609+
false,
609610
)?);
610611

611612
query_yields(join, session_ctx.task_ctx()).await
@@ -655,6 +656,7 @@ async fn join_agg_yields(
655656
None,
656657
PartitionMode::CollectLeft,
657658
NullEquality::NullEqualsNull,
659+
false,
658660
)?);
659661

660662
// Project only one column (“value” from the left side) because we just want to sum that
@@ -720,6 +722,7 @@ async fn hash_join_yields(
720722
None,
721723
PartitionMode::CollectLeft,
722724
NullEquality::NullEqualsNull,
725+
false,
723726
)?);
724727

725728
query_yields(join, session_ctx.task_ctx()).await
@@ -751,9 +754,10 @@ async fn hash_join_without_repartition_and_no_agg(
751754
/* filter */ None,
752755
&JoinType::Inner,
753756
/* output64 */ None,
754-
// Using CollectLeft is fine—just avoid RepartitionExecs partitioned channels.
757+
// Using CollectLeft is fine—just avoid RepartitionExec's partitioned channels.
755758
PartitionMode::CollectLeft,
756759
NullEquality::NullEqualsNull,
760+
false,
757761
)?);
758762

759763
query_yields(join, session_ctx.task_ctx()).await

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,7 @@ impl JoinFuzzTestCase {
849849
None,
850850
PartitionMode::Partitioned,
851851
NullEquality::NullEqualsNothing,
852+
false,
852853
)
853854
.unwrap(),
854855
)

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
233233
None,
234234
PartitionMode::Partitioned,
235235
datafusion_common::NullEquality::NullEqualsNothing,
236+
false,
236237
)
237238
.unwrap(),
238239
);
@@ -354,6 +355,7 @@ async fn test_static_filter_pushdown_through_hash_join() {
354355
None,
355356
PartitionMode::Partitioned,
356357
datafusion_common::NullEquality::NullEqualsNothing,
358+
false,
357359
)
358360
.unwrap(),
359361
);
@@ -418,6 +420,7 @@ async fn test_static_filter_pushdown_through_hash_join() {
418420
None,
419421
PartitionMode::Partitioned,
420422
datafusion_common::NullEquality::NullEqualsNothing,
423+
false,
421424
)
422425
.unwrap(),
423426
);
@@ -981,6 +984,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
981984
None,
982985
PartitionMode::CollectLeft,
983986
datafusion_common::NullEquality::NullEqualsNothing,
987+
false,
984988
)
985989
.unwrap(),
986990
) as Arc<dyn ExecutionPlan>;
@@ -1170,6 +1174,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
11701174
None,
11711175
PartitionMode::Partitioned,
11721176
datafusion_common::NullEquality::NullEqualsNothing,
1177+
false,
11731178
)
11741179
.unwrap(),
11751180
);
@@ -1363,6 +1368,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
13631368
None,
13641369
PartitionMode::CollectLeft,
13651370
datafusion_common::NullEquality::NullEqualsNothing,
1371+
false,
13661372
)
13671373
.unwrap(),
13681374
);
@@ -1531,6 +1537,7 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
15311537
None,
15321538
PartitionMode::Partitioned,
15331539
datafusion_common::NullEquality::NullEqualsNothing,
1540+
false,
15341541
)
15351542
.unwrap(),
15361543
);
@@ -1550,6 +1557,7 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
15501557
None,
15511558
PartitionMode::Partitioned,
15521559
datafusion_common::NullEquality::NullEqualsNothing,
1560+
false,
15531561
)
15541562
.unwrap(),
15551563
) as Arc<dyn ExecutionPlan>;
@@ -1665,6 +1673,7 @@ async fn test_hashjoin_parent_filter_pushdown() {
16651673
None,
16661674
PartitionMode::Partitioned,
16671675
datafusion_common::NullEquality::NullEqualsNothing,
1676+
false,
16681677
)
16691678
.unwrap(),
16701679
);
@@ -2773,6 +2782,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
27732782
None,
27742783
PartitionMode::Partitioned,
27752784
datafusion_common::NullEquality::NullEqualsNothing,
2785+
false,
27762786
)
27772787
.unwrap(),
27782788
);
@@ -2901,6 +2911,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
29012911
None,
29022912
PartitionMode::CollectLeft,
29032913
datafusion_common::NullEquality::NullEqualsNothing,
2914+
false,
29042915
)
29052916
.unwrap(),
29062917
);
@@ -3051,6 +3062,7 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() {
30513062
None,
30523063
PartitionMode::Partitioned,
30533064
datafusion_common::NullEquality::NullEqualsNothing,
3065+
false,
30543066
)
30553067
.unwrap(),
30563068
);
@@ -3201,6 +3213,7 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() {
32013213
None,
32023214
PartitionMode::CollectLeft,
32033215
datafusion_common::NullEquality::NullEqualsNothing,
3216+
false,
32043217
)
32053218
.unwrap(),
32063219
);
@@ -3335,6 +3348,7 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() {
33353348
None,
33363349
PartitionMode::CollectLeft,
33373350
datafusion_common::NullEquality::NullEqualsNothing,
3351+
false,
33383352
)
33393353
.unwrap(),
33403354
);
@@ -3443,6 +3457,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
34433457
None,
34443458
PartitionMode::CollectLeft,
34453459
datafusion_common::NullEquality::NullEqualsNothing,
3460+
false,
34463461
)
34473462
.unwrap(),
34483463
) as Arc<dyn ExecutionPlan>;

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ async fn test_join_with_swap() {
222222
None,
223223
PartitionMode::CollectLeft,
224224
NullEquality::NullEqualsNothing,
225+
false,
225226
)
226227
.unwrap(),
227228
);
@@ -284,6 +285,7 @@ async fn test_left_join_no_swap() {
284285
None,
285286
PartitionMode::CollectLeft,
286287
NullEquality::NullEqualsNothing,
288+
false,
287289
)
288290
.unwrap(),
289291
);
@@ -333,6 +335,7 @@ async fn test_join_with_swap_semi() {
333335
None,
334336
PartitionMode::Partitioned,
335337
NullEquality::NullEqualsNothing,
338+
false,
336339
)
337340
.unwrap();
338341

@@ -388,6 +391,7 @@ async fn test_join_with_swap_mark() {
388391
None,
389392
PartitionMode::Partitioned,
390393
NullEquality::NullEqualsNothing,
394+
false,
391395
)
392396
.unwrap();
393397

@@ -461,6 +465,7 @@ async fn test_nested_join_swap() {
461465
None,
462466
PartitionMode::CollectLeft,
463467
NullEquality::NullEqualsNothing,
468+
false,
464469
)
465470
.unwrap();
466471
let child_schema = child_join.schema();
@@ -478,6 +483,7 @@ async fn test_nested_join_swap() {
478483
None,
479484
PartitionMode::CollectLeft,
480485
NullEquality::NullEqualsNothing,
486+
false,
481487
)
482488
.unwrap();
483489

@@ -518,6 +524,7 @@ async fn test_join_no_swap() {
518524
None,
519525
PartitionMode::CollectLeft,
520526
NullEquality::NullEqualsNothing,
527+
false,
521528
)
522529
.unwrap(),
523530
);
@@ -745,6 +752,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
745752
Some(projection),
746753
PartitionMode::Partitioned,
747754
NullEquality::NullEqualsNothing,
755+
false,
748756
)?);
749757

750758
let swapped = join
@@ -906,6 +914,7 @@ fn check_join_partition_mode(
906914
None,
907915
PartitionMode::Auto,
908916
NullEquality::NullEqualsNothing,
917+
false,
909918
)
910919
.unwrap(),
911920
);
@@ -1554,6 +1563,7 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
15541563
None,
15551564
t.initial_mode,
15561565
NullEquality::NullEqualsNothing,
1566+
false,
15571567
)?) as _;
15581568

15591569
let optimized_join_plan =

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,7 @@ fn test_hash_join_after_projection() -> Result<()> {
12841284
None,
12851285
PartitionMode::Auto,
12861286
NullEquality::NullEqualsNothing,
1287+
false,
12871288
)?);
12881289
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
12891290
vec![

datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,7 @@ fn hash_join_exec(
11391139
None,
11401140
PartitionMode::Partitioned,
11411141
NullEquality::NullEqualsNothing,
1142+
false,
11421143
)
11431144
.unwrap(),
11441145
)

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ pub fn hash_join_exec(
247247
None,
248248
PartitionMode::Partitioned,
249249
NullEquality::NullEqualsNothing,
250+
false,
250251
)?))
251252
}
252253

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,25 @@ impl LogicalPlanBuilder {
10111011
join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
10121012
filter: Option<Expr>,
10131013
null_equality: NullEquality,
1014+
) -> Result<Self> {
1015+
self.join_detailed_with_options(
1016+
right,
1017+
join_type,
1018+
join_keys,
1019+
filter,
1020+
null_equality,
1021+
false,
1022+
)
1023+
}
1024+
1025+
pub fn join_detailed_with_options(
1026+
self,
1027+
right: LogicalPlan,
1028+
join_type: JoinType,
1029+
join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1030+
filter: Option<Expr>,
1031+
null_equality: NullEquality,
1032+
null_aware: bool,
10141033
) -> Result<Self> {
10151034
if join_keys.0.len() != join_keys.1.len() {
10161035
return plan_err!("left_keys and right_keys were not the same length");
@@ -1128,6 +1147,7 @@ impl LogicalPlanBuilder {
11281147
join_constraint: JoinConstraint::On,
11291148
schema: DFSchemaRef::new(join_schema),
11301149
null_equality,
1150+
null_aware,
11311151
})))
11321152
}
11331153

@@ -1201,6 +1221,7 @@ impl LogicalPlanBuilder {
12011221
join_type,
12021222
JoinConstraint::Using,
12031223
NullEquality::NullEqualsNothing,
1224+
false, // null_aware
12041225
)?;
12051226

12061227
Ok(Self::new(LogicalPlan::Join(join)))
@@ -1217,6 +1238,7 @@ impl LogicalPlanBuilder {
12171238
JoinType::Inner,
12181239
JoinConstraint::On,
12191240
NullEquality::NullEqualsNothing,
1241+
false, // null_aware
12201242
)?;
12211243

12221244
Ok(Self::new(LogicalPlan::Join(join)))
@@ -1471,6 +1493,7 @@ impl LogicalPlanBuilder {
14711493
join_type,
14721494
JoinConstraint::On,
14731495
NullEquality::NullEqualsNothing,
1496+
false, // null_aware
14741497
)?;
14751498

14761499
Ok(Self::new(LogicalPlan::Join(join)))

0 commit comments

Comments
 (0)