Skip to content

Commit cbd1b15

Browse files
viiryaclaude
andcommitted
fix: Force CollectLeft partition mode for null-aware anti joins
Null-aware anti joins must use PartitionMode::CollectLeft instead of PartitionMode::Partitioned because they track probe-side state (probe_side_non_empty, probe_side_has_null) per-partition, but require global knowledge for correct NULL handling. The problem with partitioned mode: - Hash joins partition rows by hash(join_key) - Row with NULL key goes to partition X (hash(NULL)) - Row with value 2 goes to partition Y (hash(2)) - Partition X doesn't see any probe rows, even though probe side is globally non-empty - This causes partition X to incorrectly return NULL rows Example that failed in CI: SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner WHERE value = 'x'); - Subquery returns [2] - Row (NULL, 'e') from outer_table hashes to different partition than 2 - That partition sees no probe rows and incorrectly returns (NULL, 'e') The fix: - Force PartitionMode::CollectLeft for null-aware anti joins - This collects the left side (outer table) into a single partition - All partitions see the same complete probe side - Correct global state tracking for null handling Trade-off: Null-aware anti joins lose parallelism on the build side, but gain correctness. This is acceptable since null-aware anti joins are typically used for NOT IN subqueries which are less common and often involve smaller datasets. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 2eb483f commit cbd1b15

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,24 @@ pub(crate) fn partitioned_hash_join(
244244
{
245245
hash_join.swap_inputs(PartitionMode::Partitioned)
246246
} else {
247+
// Null-aware anti joins must use CollectLeft mode because they track probe-side state
248+
// (probe_side_non_empty, probe_side_has_null) per-partition, but need global knowledge
249+
// for correct null handling. With partitioning, a partition might not see probe rows
250+
// even if the probe side is globally non-empty, leading to incorrect NULL row handling.
251+
let partition_mode = if hash_join.null_aware {
252+
PartitionMode::CollectLeft
253+
} else {
254+
PartitionMode::Partitioned
255+
};
256+
247257
Ok(Arc::new(HashJoinExec::try_new(
248258
Arc::clone(left),
249259
Arc::clone(right),
250260
hash_join.on().to_vec(),
251261
hash_join.filter().cloned(),
252262
hash_join.join_type(),
253263
hash_join.projection.clone(),
254-
PartitionMode::Partitioned,
264+
partition_mode,
255265
hash_join.null_equality(),
256266
hash_join.null_aware,
257267
)?))

0 commit comments

Comments
 (0)