Skip to content

Commit e49ed78

Browse files
committed
perf: optimize HashTableLookupExpr::evaluate by adding set_bits_if_exists
1 parent 9a9ff8d commit e49ed78

File tree

3 files changed

+62
-11
lines changed

3 files changed

+62
-11
lines changed

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -343,16 +343,8 @@ impl PhysicalExpr for HashTableLookupExpr {
343343

344344
// Check each hash against the hash table
345345
let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8));
346-
for (idx, hash_value) in hash_array.values().iter().enumerate() {
347-
// Use get_matched_indices to check - if it returns any indices, the hash exists
348-
let (matched_indices, _) = self
349-
.hash_map
350-
.get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None);
351-
352-
if !matched_indices.is_empty() {
353-
bit_util::set_bit(buf.as_slice_mut(), idx);
354-
}
355-
}
346+
self.hash_map
347+
.set_bits_if_exists(hash_array.values(), buf.as_slice_mut());
356348

357349
Ok(ColumnarValue::Array(Arc::new(
358350
BooleanArray::new_from_packed(buf, 0, num_rows),

datafusion/physical-plan/src/joins/join_hash_map.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::fmt::{self, Debug};
2323
use std::ops::Sub;
2424

2525
use arrow::datatypes::ArrowNativeType;
26+
use arrow::util::bit_util;
2627
use hashbrown::HashTable;
2728
use hashbrown::hash_table::Entry::{Occupied, Vacant};
2829

@@ -124,6 +125,9 @@ pub trait JoinHashMapType: Send + Sync {
124125
match_indices: &mut Vec<u64>,
125126
) -> Option<JoinHashMapOffset>;
126127

128+
/// Sets bits in the provided buffer if the corresponding hash exists in the map.
129+
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]);
130+
127131
/// Returns `true` if the join hash map contains no entries.
128132
fn is_empty(&self) -> bool;
129133

@@ -196,6 +200,10 @@ impl JoinHashMapType for JoinHashMapU32 {
196200
)
197201
}
198202

203+
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) {
204+
set_bits_if_exists::<u32>(&self.map, hash_values, buffer);
205+
}
206+
199207
fn is_empty(&self) -> bool {
200208
self.map.is_empty()
201209
}
@@ -270,6 +278,10 @@ impl JoinHashMapType for JoinHashMapU64 {
270278
)
271279
}
272280

281+
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) {
282+
set_bits_if_exists::<u64>(&self.map, hash_values, buffer);
283+
}
284+
273285
fn is_empty(&self) -> bool {
274286
self.map.is_empty()
275287
}
@@ -496,3 +508,46 @@ where
496508
}
497509
None
498510
}
511+
512+
pub fn set_bits_if_exists<T>(
513+
map: &HashTable<(u64, T)>,
514+
hash_values: &[u64],
515+
buffer: &mut [u8],
516+
) {
517+
for (i, &hash) in hash_values.iter().enumerate() {
518+
if map.find(hash, |(h, _)| hash == *h).is_some() {
519+
bit_util::set_bit(buffer, i);
520+
}
521+
}
522+
}
523+
524+
#[cfg(test)]
525+
mod tests {
526+
use super::*;
527+
528+
#[test]
529+
fn test_set_bits_if_exists() {
530+
let mut hash_map = JoinHashMapU32::with_capacity(10);
531+
// Build side: insert 10, 20, 30
532+
hash_map.update_from_iter(Box::new([10u64, 20u64, 30u64].iter().enumerate()), 0);
533+
534+
// Probe side: test both existing and non-existing hashes
535+
let probe_hashes = vec![10, 11, 20, 21, 30, 31];
536+
let mut buffer = vec![0u8; 1];
537+
hash_map.set_bits_if_exists(&probe_hashes, &mut buffer);
538+
539+
for (i, &hash) in probe_hashes.iter().enumerate() {
540+
if matches!(hash, 10 | 20 | 30) {
541+
assert!(
542+
bit_util::get_bit(&buffer, i),
543+
"Hash {hash} should exist in the map"
544+
);
545+
} else {
546+
assert!(
547+
!bit_util::get_bit(&buffer, i),
548+
"Hash {hash} should NOT exist in the map"
549+
);
550+
}
551+
}
552+
}
553+
}

datafusion/physical-plan/src/joins/stream_join_utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424

2525
use crate::joins::join_hash_map::{
2626
JoinHashMapOffset, get_matched_indices, get_matched_indices_with_limit_offset,
27-
update_from_iter,
27+
set_bits_if_exists, update_from_iter,
2828
};
2929
use crate::joins::utils::{JoinFilter, JoinHashMapType};
3030
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
@@ -94,6 +94,10 @@ impl JoinHashMapType for PruningJoinHashMap {
9494
)
9595
}
9696

97+
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) {
98+
set_bits_if_exists::<u64>(&self.map, hash_values, buffer);
99+
}
100+
97101
fn is_empty(&self) -> bool {
98102
self.map.is_empty()
99103
}

0 commit comments

Comments
 (0)