121121use std:: { any:: Any , fmt, hash:: Hash , sync:: Arc } ;
122122
123123use crate :: joins:: utils:: JoinHashMapType ;
124- use crate :: joins:: utils:: NoHashHasher ;
125- use crate :: joins:: utils:: NoHashSet ;
126124use crate :: joins:: PartitionMode ;
127125use crate :: ExecutionPlan ;
128126use crate :: ExecutionPlanProperties ;
@@ -134,10 +132,7 @@ use arrow::{
134132 datatypes:: { DataType , Schema } ,
135133 util:: bit_util,
136134} ;
137- use datafusion_common:: utils:: memory:: estimate_memory_size;
138- use datafusion_common:: HashSet ;
139135use datafusion_common:: { hash_utils:: create_hashes, Result , ScalarValue } ;
140- use datafusion_execution:: memory_pool:: MemoryReservation ;
141136use datafusion_expr:: { ColumnarValue , Operator } ;
142137use datafusion_physical_expr:: expressions:: { lit, BinaryExpr , DynamicFilterPhysicalExpr } ;
143138use datafusion_physical_expr:: PhysicalExpr ;
@@ -231,10 +226,15 @@ struct SharedBuildState {
231226 /// Bounds from completed partitions.
232227 /// Each element represents the column bounds computed by one partition.
233228 bounds : Vec < PartitionBounds > ,
234- /// Hashes from the left (build) side, if enabled
235- left_hashes : NoHashSet < u64 > ,
236- /// Memory reservation tracking the memory used by `left_hashes`
237- reservation : MemoryReservation ,
229+ /// Hash tables from the left (build) side, if enabled.
230+ /// - For CollectLeft mode: Vec with 1 element at index 0
231+ /// - For Partitioned mode: Vec with N elements, where hash_tables[i] is partition i's table
232+ /// Uses Option to handle partitions that haven't reported yet or have no hashes
233+ hash_tables : Vec < Option < Arc < dyn JoinHashMapType > > > ,
234+ /// Partition mode to determine how to create the filter (Single vs Partitioned)
235+ partition_mode : PartitionMode ,
236+ /// Number of partitions (for Partitioned mode hash routing)
237+ num_partitions : usize ,
238238}
239239
240240impl SharedBuildAccumulator {
@@ -270,7 +270,6 @@ impl SharedBuildAccumulator {
270270 dynamic_filter : Arc < DynamicFilterPhysicalExpr > ,
271271 on_right : Vec < Arc < dyn PhysicalExpr > > ,
272272 random_state : & ' static RandomState ,
273- reservation : MemoryReservation ,
274273 ) -> Self {
275274 // Troubleshooting: If partition counts are incorrect, verify this logic matches
276275 // the actual execution pattern in collect_build_side()
@@ -286,13 +285,21 @@ impl SharedBuildAccumulator {
286285 // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two)
287286 PartitionMode :: Auto => unreachable ! ( "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" ) ,
288287 } ;
288+ let num_partitions = match partition_mode {
289+ PartitionMode :: CollectLeft => 1 ,
290+ PartitionMode :: Partitioned => {
291+ left_child. output_partitioning ( ) . partition_count ( )
292+ }
293+ PartitionMode :: Auto => unreachable ! ( ) ,
294+ } ;
295+
289296 Self {
290297 inner : Mutex :: new ( SharedBuildState {
291298 bounds : Vec :: with_capacity ( expected_calls) ,
292- left_hashes : HashSet :: with_hasher ( core :: hash :: BuildHasherDefault :: <
293- NoHashHasher ,
294- > :: default ( ) ) ,
295- reservation ,
299+ // Pre-allocate Vec with None placeholders for each partition
300+ hash_tables : vec ! [ None ; num_partitions ] ,
301+ partition_mode ,
302+ num_partitions ,
296303 } ) ,
297304 barrier : Barrier :: new ( expected_calls) ,
298305 dynamic_filter,
@@ -423,31 +430,51 @@ impl SharedBuildAccumulator {
423430 if let Some ( left_hash_map) = left_hash_map {
424431 if left_hash_map. num_hashes ( ) > 0 {
425432 let mut inner = self . inner . lock ( ) ;
426-
427- let fixed_size = size_of :: < NoHashSet < u64 > > ( ) ;
428-
429- let estimated_additional_size =
430- estimate_memory_size :: < u64 > ( left_hash_map. num_hashes ( ) , fixed_size) ?;
431- inner. reservation . try_grow ( estimated_additional_size) ?;
432- inner. left_hashes . extend ( left_hash_map. hashes ( ) ) ;
433+ // Store hash table at the correct partition index
434+ // This ensures hash_tables[partition_id] corresponds to that partition's data
435+ inner. hash_tables [ left_side_partition_id] =
436+ Some ( Arc :: clone ( & left_hash_map) ) ;
433437 }
434438 }
435439
436440 if self . barrier . wait ( ) . await . is_leader ( ) {
437441 // All partitions have reported, so we can update the filter
438- let mut inner = self . inner . lock ( ) ;
442+ let inner = self . inner . lock ( ) ;
439443 let maybe_bounds_expr = if !inner. bounds . is_empty ( ) {
440444 Some ( self . create_filter_from_partition_bounds ( & inner. bounds ) ?)
441445 } else {
442446 None
443447 } ;
444448
445- let maybe_hash_eval_expr = if !inner. left_hashes . is_empty ( ) {
449+ // Check if we have any hash tables to use
450+ let has_hash_tables = inner. hash_tables . iter ( ) . any ( |opt| opt. is_some ( ) ) ;
451+
452+ let maybe_hash_eval_expr = if has_hash_tables {
453+ // Create the appropriate strategy based on partition mode
454+ let strategy = match inner. partition_mode {
455+ PartitionMode :: CollectLeft => {
456+ // Single hash table shared by all probe partitions
457+ // Unwrap is safe because we checked has_hash_tables above
458+ HashCheckStrategy :: Single ( Arc :: clone (
459+ inner. hash_tables [ 0 ] . as_ref ( ) . unwrap ( ) ,
460+ ) )
461+ }
462+ PartitionMode :: Partitioned => {
463+ // Multiple hash tables - need partition routing
464+ // Keep the Vec structure intact (with Options) so partition_id
465+ // correctly maps to hash_tables[partition_id]
466+ HashCheckStrategy :: Partitioned {
467+ hash_tables : inner. hash_tables . clone ( ) ,
468+ num_partitions : inner. num_partitions ,
469+ }
470+ }
471+ PartitionMode :: Auto => unreachable ! ( ) ,
472+ } ;
473+
446474 Some ( Arc :: new ( HashComparePhysicalExpr :: new (
447475 self . on_right . clone ( ) ,
448- Arc :: new ( std :: mem :: take ( & mut inner . left_hashes ) ) ,
476+ strategy ,
449477 self . random_state ,
450- inner. reservation . take ( ) ,
451478 ) ) as Arc < dyn PhysicalExpr > )
452479 } else {
453480 None
@@ -478,8 +505,28 @@ impl fmt::Debug for SharedBuildAccumulator {
478505 }
479506}
480507
508+ /// Strategy for checking if a hash exists in the build side.
509+ /// Determines whether to use a single hash table or route to multiple partitioned tables.
510+ enum HashCheckStrategy {
511+ /// Single hash table (CollectLeft mode)
512+ /// All probe rows check against one shared hash table
513+ Single ( Arc < dyn JoinHashMapType > ) ,
514+
515+ /// Multiple hash tables with partition routing (Partitioned mode)
516+ /// Probe rows are routed to the correct partition's hash table based on hash value.
517+ ///
518+ /// Uses `Option` because:
519+ /// 1. Empty partitions: A partition with no rows won't report a hash table
520+ /// 2. Zero hashes: A partition where `num_hashes() == 0` won't store a table (see line 437)
521+ /// In evaluate(), we treat `None` as "no matches in this partition" (skip the row).
522+ Partitioned {
523+ hash_tables : Vec < Option < Arc < dyn JoinHashMapType > > > ,
524+ num_partitions : usize ,
525+ } ,
526+ }
527+
481528/// A [`PhysicalExpr`] that evaluates to a boolean array indicating which rows in a batch
482- /// have hashes that exist in a given set of hashes .
529+ /// have hashes that exist in the build-side hash tables .
483530///
484531/// This is currently used to implement hash-based dynamic filters in hash joins. That is,
485532/// this expression can be pushed down to the probe side of a hash join to filter out rows
@@ -488,26 +535,22 @@ impl fmt::Debug for SharedBuildAccumulator {
488535struct HashComparePhysicalExpr {
489536 /// Expressions that will be evaluated to compute hashes for filtering
490537 exprs : Vec < Arc < dyn PhysicalExpr > > ,
491- /// Hashes to filter against
492- hashes : Arc < NoHashSet < u64 > > ,
538+ /// Strategy for checking hashes (single table or partitioned)
539+ strategy : HashCheckStrategy ,
493540 /// Random state for hash computation
494541 random_state : & ' static RandomState ,
495- /// Memory reservation used to track the memory used by `hashes`
496- reservation : MemoryReservation ,
497542}
498543
499544impl HashComparePhysicalExpr {
500545 pub fn new (
501546 exprs : Vec < Arc < dyn PhysicalExpr > > ,
502- hashes : Arc < NoHashSet < u64 > > ,
547+ strategy : HashCheckStrategy ,
503548 random_state : & ' static RandomState ,
504- reservation : MemoryReservation ,
505549 ) -> Self {
506550 Self {
507551 exprs,
508- hashes ,
552+ strategy ,
509553 random_state,
510- reservation,
511554 }
512555 }
513556}
@@ -561,11 +604,25 @@ impl PhysicalExpr for HashComparePhysicalExpr {
561604 self : Arc < Self > ,
562605 children : Vec < Arc < dyn PhysicalExpr > > ,
563606 ) -> Result < Arc < dyn PhysicalExpr > > {
607+ // Clone the strategy - for Single it clones the Arc (cheap),
608+ // for Partitioned it clones the Vec of Arcs (still cheap)
609+ let strategy = match & self . strategy {
610+ HashCheckStrategy :: Single ( table) => {
611+ HashCheckStrategy :: Single ( Arc :: clone ( table) )
612+ }
613+ HashCheckStrategy :: Partitioned {
614+ hash_tables,
615+ num_partitions,
616+ } => HashCheckStrategy :: Partitioned {
617+ hash_tables : hash_tables. clone ( ) ,
618+ num_partitions : * num_partitions,
619+ } ,
620+ } ;
621+
564622 Ok ( Arc :: new ( Self {
565623 exprs : children,
566- hashes : Arc :: clone ( & self . hashes ) ,
624+ strategy ,
567625 random_state : self . random_state ,
568- reservation : self . reservation . new_empty ( ) ,
569626 } ) )
570627 }
571628
@@ -589,15 +646,39 @@ impl PhysicalExpr for HashComparePhysicalExpr {
589646 . map ( |col| col. evaluate ( batch) ?. into_array ( num_rows) )
590647 . collect :: < Result < Vec < _ > > > ( ) ?;
591648
592- // Compute hashes for each row based on the evaluated expressions
649+ // Compute hashes for each probe row based on the evaluated expressions
593650 let mut hashes_buffer = vec ! [ 0 ; num_rows] ;
594651 create_hashes ( & expr_values, self . random_state , & mut hashes_buffer) ?;
595652
596- // Create a boolean array where each position indicates if the corresponding hash is in the set of known hashes
653+ // Create a boolean array based on the strategy
597654 let mut buf = MutableBuffer :: from_len_zeroed ( bit_util:: ceil ( num_rows, 8 ) ) ;
598- for ( idx, hash) in hashes_buffer. into_iter ( ) . enumerate ( ) {
599- if self . hashes . contains ( & hash) {
600- bit_util:: set_bit ( buf. as_slice_mut ( ) , idx) ;
655+
656+ match & self . strategy {
657+ HashCheckStrategy :: Single ( hash_table) => {
658+ // Simple case: check against one hash table
659+ for ( idx, hash) in hashes_buffer. into_iter ( ) . enumerate ( ) {
660+ if hash_table. contains_hash ( & hash) {
661+ bit_util:: set_bit ( buf. as_slice_mut ( ) , idx) ;
662+ }
663+ }
664+ }
665+ HashCheckStrategy :: Partitioned {
666+ hash_tables,
667+ num_partitions,
668+ } => {
669+ // Partitioned case: route to correct hash table based on hash value
670+ for ( idx, hash) in hashes_buffer. into_iter ( ) . enumerate ( ) {
671+ // Determine which partition this hash belongs to
672+ let partition_id = ( hash % ( * num_partitions as u64 ) ) as usize ;
673+
674+ // Check the hash table for that partition (if it exists)
675+ // None means the partition was empty or had no hashes - no match
676+ if let Some ( hash_table) = & hash_tables[ partition_id] {
677+ if hash_table. contains_hash ( & hash) {
678+ bit_util:: set_bit ( buf. as_slice_mut ( ) , idx) ;
679+ }
680+ }
681+ }
601682 }
602683 }
603684
0 commit comments