From 1e684a1e2ab97129905b2176cfa677d8a3ef828c Mon Sep 17 00:00:00 2001 From: lvlongxiang Date: Fri, 2 Jan 2026 07:19:08 +0000 Subject: [PATCH] perf: optimize HashTableLookupExpr::evaluate by adding set_bits_if_exists --- .../joins/hash_join/partitioned_hash_eval.rs | 12 +--- .../physical-plan/src/joins/join_hash_map.rs | 58 +++++++++++++++++++ .../src/joins/stream_join_utils.rs | 6 +- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 4c437e813139d..aafc2acdd1c74 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -343,16 +343,8 @@ impl PhysicalExpr for HashTableLookupExpr { // Check each hash against the hash table let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8)); - for (idx, hash_value) in hash_array.values().iter().enumerate() { - // Use get_matched_indices to check - if it returns any indices, the hash exists - let (matched_indices, _) = self - .hash_map - .get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None); - - if !matched_indices.is_empty() { - bit_util::set_bit(buf.as_slice_mut(), idx); - } - } + self.hash_map + .set_bits_if_exists(hash_array.values(), buf.as_slice_mut()); Ok(ColumnarValue::Array(Arc::new( BooleanArray::new_from_packed(buf, 0, num_rows), diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index b0ed6dcc7c255..43cd3c052ee12 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -23,6 +23,7 @@ use std::fmt::{self, Debug}; use std::ops::Sub; use arrow::datatypes::ArrowNativeType; +use arrow::util::bit_util; use hashbrown::HashTable; use hashbrown::hash_table::Entry::{Occupied, Vacant}; @@ -124,6 +125,9 @@ pub trait JoinHashMapType: Send + Sync { match_indices: &mut Vec, ) -> Option; + /// Sets bits in the provided buffer if the corresponding hash exists in the map. + fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]); + /// Returns `true` if the join hash map contains no entries. fn is_empty(&self) -> bool; @@ -196,6 +200,10 @@ impl JoinHashMapType for JoinHashMapU32 { ) } + fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) { + set_bits_if_exists::(&self.map, hash_values, buffer); + } + fn is_empty(&self) -> bool { self.map.is_empty() } @@ -270,6 +278,10 @@ impl JoinHashMapType for JoinHashMapU64 { ) } + fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) { + set_bits_if_exists::(&self.map, hash_values, buffer); + } + fn is_empty(&self) -> bool { self.map.is_empty() } @@ -496,3 +508,49 @@ where } None } + +pub fn set_bits_if_exists( + map: &HashTable<(u64, T)>, + hash_values: &[u64], + buffer: &mut [u8], +) { + for (i, &hash) in hash_values.iter().enumerate() { + if map.find(hash, |(h, _)| hash == *h).is_some() { + bit_util::set_bit(buffer, i); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_set_bits_if_exists() { + let mut hash_map = JoinHashMapU32::with_capacity(10); + // Build side: insert 10, 20, 30 + hash_map.update_from_iter( + Box::new([10u64, 20u64, 30u64].iter().enumerate()), + 0, + ); + + // Probe side: test both existing and non-existing hashes + let probe_hashes = vec![10, 11, 20, 21, 30, 31]; + let mut buffer = vec![0u8; 1]; + hash_map.set_bits_if_exists(&probe_hashes, &mut buffer); + + for (i, &hash) in probe_hashes.iter().enumerate() { + if matches!(hash, 10 | 20 | 30) { + assert!( + bit_util::get_bit(&buffer, i), + "Hash {hash} should exist in the map" + ); + } else { + assert!( + !bit_util::get_bit(&buffer, i), + "Hash {hash} should NOT exist in the map" + ); + } + } + } +} diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 22cc82a22db5f..39f5f4c0b19f8 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use crate::joins::join_hash_map::{ JoinHashMapOffset, get_matched_indices, get_matched_indices_with_limit_offset, - update_from_iter, + set_bits_if_exists, update_from_iter, }; use crate::joins::utils::{JoinFilter, JoinHashMapType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder}; @@ -94,6 +94,10 @@ impl JoinHashMapType for PruningJoinHashMap { ) } + fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) { + set_bits_if_exists::(&self.map, hash_values, buffer); + } + fn is_empty(&self) -> bool { self.map.is_empty() }