Skip to content

Commit 07bb819

Browse files
committed
JoinHashMapType::set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) -> contain_hashes(&self, hash_values: &[u64]) -> BooleanArray
1 parent e49ed78 commit 07bb819

File tree

3 files changed

+31
-45
lines changed

3 files changed

+31
-45
lines changed

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,17 @@ use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
2121

2222
use ahash::RandomState;
2323
use arrow::{
24-
array::{BooleanArray, UInt64Array},
25-
buffer::MutableBuffer,
24+
array::UInt64Array,
2625
datatypes::{DataType, Schema},
27-
util::bit_util,
2826
};
27+
use datafusion_common::hash_utils::create_hashes;
2928
use datafusion_common::{Result, internal_datafusion_err, internal_err};
3029
use datafusion_expr::ColumnarValue;
3130
use datafusion_physical_expr_common::physical_expr::{
3231
DynHash, PhysicalExpr, PhysicalExprRef,
3332
};
3433

35-
use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
34+
use crate::joins::utils::JoinHashMapType;
3635

3736
/// RandomState wrapper that preserves the seeds used to create it.
3837
///
@@ -342,13 +341,9 @@ impl PhysicalExpr for HashTableLookupExpr {
342341
)?;
343342

344343
// Check each hash against the hash table
345-
let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8));
346-
self.hash_map
347-
.set_bits_if_exists(hash_array.values(), buf.as_slice_mut());
344+
let array = self.hash_map.contain_hashes(hash_array.values());
348345

349-
Ok(ColumnarValue::Array(Arc::new(
350-
BooleanArray::new_from_packed(buf, 0, num_rows),
351-
)))
346+
Ok(ColumnarValue::Array(Arc::new(array)))
352347
}
353348

354349
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {

datafusion/physical-plan/src/joins/join_hash_map.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222
use std::fmt::{self, Debug};
2323
use std::ops::Sub;
2424

25+
use arrow::array::BooleanArray;
26+
use arrow::buffer::BooleanBuffer;
2527
use arrow::datatypes::ArrowNativeType;
26-
use arrow::util::bit_util;
2728
use hashbrown::HashTable;
2829
use hashbrown::hash_table::Entry::{Occupied, Vacant};
2930

@@ -125,8 +126,8 @@ pub trait JoinHashMapType: Send + Sync {
125126
match_indices: &mut Vec<u64>,
126127
) -> Option<JoinHashMapOffset>;
127128

128-
/// Sets bits in the provided buffer if the corresponding hash exists in the map.
129-
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]);
129+
/// Returns a BooleanArray indicating which of the provided hashes exist in the map.
130+
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray;
130131

131132
/// Returns `true` if the join hash map contains no entries.
132133
fn is_empty(&self) -> bool;
@@ -200,8 +201,8 @@ impl JoinHashMapType for JoinHashMapU32 {
200201
)
201202
}
202203

203-
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) {
204-
set_bits_if_exists::<u32>(&self.map, hash_values, buffer);
204+
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray {
205+
contain_hashes(&self.map, hash_values)
205206
}
206207

207208
fn is_empty(&self) -> bool {
@@ -278,8 +279,8 @@ impl JoinHashMapType for JoinHashMapU64 {
278279
)
279280
}
280281

281-
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) {
282-
set_bits_if_exists::<u64>(&self.map, hash_values, buffer);
282+
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray {
283+
contain_hashes(&self.map, hash_values)
283284
}
284285

285286
fn is_empty(&self) -> bool {
@@ -509,44 +510,33 @@ where
509510
None
510511
}
511512

512-
pub fn set_bits_if_exists<T>(
513-
map: &HashTable<(u64, T)>,
514-
hash_values: &[u64],
515-
buffer: &mut [u8],
516-
) {
517-
for (i, &hash) in hash_values.iter().enumerate() {
518-
if map.find(hash, |(h, _)| hash == *h).is_some() {
519-
bit_util::set_bit(buffer, i);
520-
}
521-
}
513+
pub fn contain_hashes<T>(map: &HashTable<(u64, T)>, hash_values: &[u64]) -> BooleanArray {
514+
let buffer = BooleanBuffer::collect_bool(hash_values.len(), |i| {
515+
let hash = hash_values[i];
516+
map.find(hash, |(h, _)| hash == *h).is_some()
517+
});
518+
BooleanArray::new(buffer, None)
522519
}
523520

524521
#[cfg(test)]
525522
mod tests {
526523
use super::*;
527524

528525
#[test]
529-
fn test_set_bits_if_exists() {
526+
fn test_contain_hashes() {
530527
let mut hash_map = JoinHashMapU32::with_capacity(10);
531-
// Build side: insert 10, 20, 30
532528
hash_map.update_from_iter(Box::new([10u64, 20u64, 30u64].iter().enumerate()), 0);
533529

534-
// Probe side: test both existing and non-existing hashes
535530
let probe_hashes = vec![10, 11, 20, 21, 30, 31];
536-
let mut buffer = vec![0u8; 1];
537-
hash_map.set_bits_if_exists(&probe_hashes, &mut buffer);
531+
let array = hash_map.contain_hashes(&probe_hashes);
532+
533+
assert_eq!(array.len(), probe_hashes.len());
538534

539535
for (i, &hash) in probe_hashes.iter().enumerate() {
540536
if matches!(hash, 10 | 20 | 30) {
541-
assert!(
542-
bit_util::get_bit(&buffer, i),
543-
"Hash {hash} should exist in the map"
544-
);
537+
assert!(array.value(i), "Hash {hash} should exist in the map");
545538
} else {
546-
assert!(
547-
!bit_util::get_bit(&buffer, i),
548-
"Hash {hash} should NOT exist in the map"
549-
);
539+
assert!(!array.value(i), "Hash {hash} should NOT exist in the map");
550540
}
551541
}
552542
}

datafusion/physical-plan/src/joins/stream_join_utils.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ use std::mem::size_of;
2323
use std::sync::Arc;
2424

2525
use crate::joins::join_hash_map::{
26-
JoinHashMapOffset, get_matched_indices, get_matched_indices_with_limit_offset,
27-
set_bits_if_exists, update_from_iter,
26+
JoinHashMapOffset, contain_hashes, get_matched_indices,
27+
get_matched_indices_with_limit_offset, update_from_iter,
2828
};
2929
use crate::joins::utils::{JoinFilter, JoinHashMapType};
3030
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
3131
use crate::{ExecutionPlan, metrics};
3232

3333
use arrow::array::{
34-
ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch,
34+
ArrowPrimitiveType, BooleanArray, BooleanBufferBuilder, NativeAdapter,
35+
PrimitiveArray, RecordBatch,
3536
};
3637
use arrow::compute::concat_batches;
3738
use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
@@ -94,8 +95,8 @@ impl JoinHashMapType for PruningJoinHashMap {
9495
)
9596
}
9697

97-
fn set_bits_if_exists(&self, hash_values: &[u64], buffer: &mut [u8]) {
98-
set_bits_if_exists::<u64>(&self.map, hash_values, buffer);
98+
fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray {
99+
contain_hashes(&self.map, hash_values)
99100
}
100101

101102
fn is_empty(&self) -> bool {

0 commit comments

Comments
 (0)