Skip to content

Commit dadc47c

Browse files
viiryaclaude
andcommitted
Fix null-aware anti join with shared atomic state for partitioned execution
The previous implementation used per-partition flags to track probe side state, which caused incorrect results when hash partitioning distributed rows across multiple partitions. With CollectLeft mode, each output partition only had local knowledge of its own probe data, not global state. This commit fixes the issue by: 1. Adding shared AtomicBool flags to JoinLeftData (probe_side_non_empty, probe_side_has_null) 2. All partitions write to and read from these shared atomic flags 3. Ensures global knowledge of probe side state across all partitions Example of the bug: - With 16 partitions, NULL rows hash to partition 5, value 2 hashes to partition 12 - Partition 5 sees no probe data (local view: empty) - Partition 12 sees probe data (local view: non-empty) - If partition 5 outputs final results, it incorrectly returns NULL rows With shared atomic state, partition 5 now sees the global truth and correctly filters NULL rows when probe side is non-empty. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 537fc50 commit dadc47c

File tree

2 files changed

+22
-21
lines changed

2 files changed

+22
-21
lines changed

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use std::fmt;
1919
use std::mem::size_of;
20-
use std::sync::atomic::{AtomicUsize, Ordering};
20+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2121
use std::sync::{Arc, OnceLock};
2222
use std::{any::Any, vec};
2323

@@ -118,6 +118,11 @@ pub(super) struct JoinLeftData {
118118
/// Membership testing strategy for filter pushdown
119119
/// Contains either InList values for small build sides or hash table reference for large build sides
120120
pub(super) membership: PushdownStrategy,
121+
/// Shared atomic flag indicating if any probe partition saw data (for null-aware anti joins)
122+
/// This is shared across all probe partitions to provide global knowledge
123+
pub(super) probe_side_non_empty: AtomicBool,
124+
/// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins)
125+
pub(super) probe_side_has_null: AtomicBool,
121126
}
122127

123128
impl JoinLeftData {
@@ -1585,6 +1590,8 @@ async fn collect_left_input(
15851590
_reservation: reservation,
15861591
bounds,
15871592
membership,
1593+
probe_side_non_empty: AtomicBool::new(false),
1594+
probe_side_has_null: AtomicBool::new(false),
15881595
};
15891596

15901597
Ok(data)

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//! [`super::HashJoinExec`]. See comments in [`HashJoinStream`] for more details.
2222
2323
use std::sync::Arc;
24+
use std::sync::atomic::Ordering;
2425
use std::task::Poll;
2526

2627
use crate::joins::PartitionMode;
@@ -225,14 +226,6 @@ pub(super) struct HashJoinStream {
225226
output_buffer: Box<BatchCoalescer>,
226227
/// Whether this is a null-aware anti join
227228
null_aware: bool,
228-
/// Whether the probe side (RIGHT) contains any NULL values in join keys
229-
/// Only relevant when null_aware is true.
230-
/// For LeftAnti with null-aware semantics, if probe side has NULL, no rows should be output.
231-
probe_side_has_null: bool,
232-
/// Whether any probe batches were processed (i.e., probe side was non-empty)
233-
/// Only relevant when null_aware is true.
234-
/// Used to distinguish between empty probe side (should return NULL rows) vs non-empty (should filter NULL rows).
235-
probe_side_non_empty: bool,
236229
}
237230

238231
impl RecordBatchStream for HashJoinStream {
@@ -413,8 +406,6 @@ impl HashJoinStream {
413406
mode,
414407
output_buffer,
415408
null_aware,
416-
probe_side_has_null: false,
417-
probe_side_non_empty: false,
418409
}
419410
}
420411

@@ -613,20 +604,21 @@ impl HashJoinStream {
613604
if self.null_aware {
614605
// Mark that we've seen a probe batch with actual rows (probe side is non-empty)
615606
// Only set this if batch has rows - empty batches don't count
607+
// Use shared atomic state so all partitions can see this global information
616608
if state.batch.num_rows() > 0 {
617-
self.probe_side_non_empty = true;
609+
build_side.left_data.probe_side_non_empty.store(true, Ordering::Relaxed);
618610
}
619611

620612
// Check if probe side (RIGHT) contains NULL
621613
// Since null_aware validation ensures single column join, we only check the first column
622614
let probe_key_column = &state.values[0];
623-
if !self.probe_side_has_null && probe_key_column.null_count() > 0 {
624-
// Found NULL in probe side - set flag to prevent any output
625-
self.probe_side_has_null = true;
615+
if probe_key_column.null_count() > 0 {
616+
// Found NULL in probe side - set shared flag to prevent any output
617+
build_side.left_data.probe_side_has_null.store(true, Ordering::Relaxed);
626618
}
627619

628-
// If probe side has NULL (detected in this or previous batch), return empty result
629-
if self.probe_side_has_null {
620+
// If probe side has NULL (detected in this or any other partition), return empty result
621+
if build_side.left_data.probe_side_has_null.load(Ordering::Relaxed) {
630622
timer.done();
631623
self.state = HashJoinStreamState::FetchProbeBatch;
632624
return Ok(StatefulStreamResult::Continue);
@@ -784,14 +776,15 @@ impl HashJoinStream {
784776
return Ok(StatefulStreamResult::Continue);
785777
}
786778

779+
let build_side = self.build_side.try_as_ready()?;
780+
787781
// For null-aware anti join, if probe side had NULL, no rows should be output
788-
if self.null_aware && self.probe_side_has_null {
782+
// Check shared atomic state to get global knowledge across all partitions
783+
if self.null_aware && build_side.left_data.probe_side_has_null.load(Ordering::Relaxed) {
789784
timer.done();
790785
self.state = HashJoinStreamState::Completed;
791786
return Ok(StatefulStreamResult::Continue);
792787
}
793-
794-
let build_side = self.build_side.try_as_ready()?;
795788
if !build_side.left_data.report_probe_completed() {
796789
self.state = HashJoinStreamState::Completed;
797790
return Ok(StatefulStreamResult::Continue);
@@ -807,9 +800,10 @@ impl HashJoinStream {
807800
// For null-aware anti join, filter out LEFT rows with NULL in join keys
808801
// BUT only if the probe side (RIGHT) was non-empty. If probe side is empty,
809802
// NULL NOT IN (empty) = TRUE, so NULL rows should be returned.
803+
// Use shared atomic state to get global knowledge across all partitions
810804
if self.null_aware
811805
&& self.join_type == JoinType::LeftAnti
812-
&& self.probe_side_non_empty
806+
&& build_side.left_data.probe_side_non_empty.load(Ordering::Relaxed)
813807
{
814808
// Since null_aware validation ensures single column join, we only check the first column
815809
let build_key_column = &build_side.left_data.values()[0];

0 commit comments

Comments
 (0)