Skip to content

Commit 5c2ee36

Browse files
authored
perf: optimize HashTableLookupExpr::evaluate (#19602)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The previous implementation of `HashTableLookupExpr::evaluate` relied on per-row calls to `get_matched_indices`, which incurred unnecessary performance overhead: 1. **Memory Overhead**: Each per-row call triggered small `Vec` allocations and potential resizes, leading to pressure on the memory allocator. 2. **Redundant Computation**: `get_matched_indices` traverses the entire hash chain to find all matches, which is unnecessary when we only need to verify the existence of a key. ### Performance Results (TPC-H) The following TPC-H results were obtained with **`DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true`:** ``` ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ baseline@9a9ff ┃ optimized ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 679.51 ms │ 728.06 ms │ 1.07x slower │ │ QQuery 2 │ 388.33 ms │ 384.11 ms │ no change │ │ QQuery 3 │ 864.38 ms │ 856.27 ms │ no change │ │ QQuery 4 │ 458.46 ms │ 468.26 ms │ no change │ │ QQuery 5 │ 1614.26 ms │ 1525.65 ms │ +1.06x faster │ │ QQuery 6 │ 611.20 ms │ 610.06 ms │ no change │ │ QQuery 7 │ 950.39 ms │ 940.13 ms │ no change │ │ QQuery 8 │ 1214.86 ms │ 1218.21 ms │ no change │ │ QQuery 9 │ 2657.61 ms │ 2482.09 ms │ +1.07x faster │ │ QQuery 10 │ 1050.70 ms │ 1001.96 ms │ no change │ │ QQuery 11 │ 383.92 ms │ 347.27 ms │ +1.11x faster │ │ QQuery 12 │ 963.14 ms │ 920.78 ms │ no change │ │ QQuery 13 │ 473.68 ms │ 480.97 ms │ no change │ │ QQuery 14 │ 363.36 ms │ 345.27 ms │ no change │ │ QQuery 15 │ 960.56 ms │ 955.05 ms │ no change │ │ QQuery 16 │ 281.95 ms │ 267.34 ms │ +1.05x faster │ │ QQuery 17 │ 5306.43 ms │ 4983.21 ms │ +1.06x faster │ │ QQuery 18 │ 3415.11 ms │ 3016.52 ms │ +1.13x faster │ │ QQuery 19 │ 761.67 ms │ 759.49 ms │ no change │ │ QQuery 20 │ 650.20 ms │ 642.40 ms │ no change │ │ QQuery 21 │ 3111.85 ms │ 2833.05 ms │ +1.10x faster │ │ QQuery 22 │ 141.75 ms │ 143.06 ms │ no change │ └──────────────┴────────────────┴────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ Total Time (baseline@9a9ff) │ 27303.30ms │ │ Total Time (optimized) │ 25909.21ms │ │ Average Time (baseline@9a9ff) │ 1241.06ms │ │ Average Time (optimized) │ 1177.69ms │ │ Queries Faster │ 7 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 14 │ │ Queries with Failure │ 0 │ └───────────────────────────────┴────────────┘ ``` Note that Q1 does not involve `HashJoin`. #### Note on Configuration Benchmarks were conducted with `DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true` because `HashTableLookupExpr::evaluate` is **NOT** invoked under default settings. I manually added `dbg!(&num_rows)` at [L335 in `partitioned_hash_eval.rs`](https://github.com/apache/datafusion/blob/9a9ff8d6162b7391736b0b7c82c00cb35b0652a1/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs#L335) and confirmed that the logic path is only triggered when this flag is enabled. Under default settings, `HashTableLookupExpr::evaluate` is not called; . I am uncertain if this current behavior is intentional. ## What changes are included in this PR? - Added `JoinHashMapType::contain_hashes`: A new trait method that processes a batch of hashes and updates a bitmask for existing keys. - Refactored `HashTableLookupExpr::evaluate`: Switched from per-row lookups to the new batch API. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? NO <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent adf00a6 commit 5c2ee36

File tree

5 files changed

+128
-113
lines changed

5 files changed

+128
-113
lines changed

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 71 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,18 @@ use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
2121

2222
use ahash::RandomState;
2323
use arrow::{
24-
array::{BooleanArray, UInt64Array},
25-
buffer::MutableBuffer,
24+
array::{ArrayRef, UInt64Array},
2625
datatypes::{DataType, Schema},
27-
util::bit_util,
26+
record_batch::RecordBatch,
2827
};
29-
use datafusion_common::{Result, internal_datafusion_err, internal_err};
28+
use datafusion_common::Result;
29+
use datafusion_common::hash_utils::{create_hashes, with_hashes};
3030
use datafusion_expr::ColumnarValue;
3131
use datafusion_physical_expr_common::physical_expr::{
3232
DynHash, PhysicalExpr, PhysicalExprRef,
3333
};
3434

35-
use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
35+
use crate::joins::utils::JoinHashMapType;
3636

3737
/// RandomState wrapper that preserves the seeds used to create it.
3838
///
@@ -181,18 +181,11 @@ impl PhysicalExpr for HashExpr {
181181
Ok(false)
182182
}
183183

184-
fn evaluate(
185-
&self,
186-
batch: &arrow::record_batch::RecordBatch,
187-
) -> Result<ColumnarValue> {
184+
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
188185
let num_rows = batch.num_rows();
189186

190187
// Evaluate columns
191-
let keys_values = self
192-
.on_columns
193-
.iter()
194-
.map(|c| c.evaluate(batch)?.into_array(num_rows))
195-
.collect::<Result<Vec<_>>>()?;
188+
let keys_values = evaluate_columns(&self.on_columns, batch)?;
196189

197190
// Compute hashes
198191
let mut hashes_buffer = vec![0; num_rows];
@@ -217,8 +210,10 @@ impl PhysicalExpr for HashExpr {
217210
/// Takes a UInt64Array of hash values and checks membership in a hash table.
218211
/// Returns a BooleanArray indicating which hashes exist.
219212
pub struct HashTableLookupExpr {
220-
/// Expression that computes hash values (should be a HashExpr)
221-
hash_expr: PhysicalExprRef,
213+
/// Columns to hash
214+
on_columns: Vec<PhysicalExprRef>,
215+
/// Random state for hashing (with seeds preserved for serialization)
216+
random_state: SeededRandomState,
222217
/// Hash table to check against
223218
hash_map: Arc<dyn JoinHashMapType>,
224219
/// Description for display
@@ -229,20 +224,23 @@ impl HashTableLookupExpr {
229224
/// Create a new HashTableLookupExpr
230225
///
231226
/// # Arguments
232-
/// * `hash_expr` - Expression that computes hash values
227+
/// * `on_columns` - Columns to hash
228+
/// * `random_state` - SeededRandomState for hashing
233229
/// * `hash_map` - Hash table to check membership
234230
/// * `description` - Description for debugging
235231
///
236232
/// # Note
237233
/// This is public for internal testing purposes only and is not
238234
/// guaranteed to be stable across versions.
239235
pub fn new(
240-
hash_expr: PhysicalExprRef,
236+
on_columns: Vec<PhysicalExprRef>,
237+
random_state: SeededRandomState,
241238
hash_map: Arc<dyn JoinHashMapType>,
242239
description: String,
243240
) -> Self {
244241
Self {
245-
hash_expr,
242+
on_columns,
243+
random_state,
246244
hash_map,
247245
description,
248246
}
@@ -251,14 +249,22 @@ impl HashTableLookupExpr {
251249

252250
impl std::fmt::Debug for HashTableLookupExpr {
253251
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254-
write!(f, "{}({:?})", self.description, self.hash_expr)
252+
let cols = self
253+
.on_columns
254+
.iter()
255+
.map(|e| e.to_string())
256+
.collect::<Vec<_>>()
257+
.join(", ");
258+
let (s1, s2, s3, s4) = self.random_state.seeds();
259+
write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description)
255260
}
256261
}
257262

258263
impl Hash for HashTableLookupExpr {
259264
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
260-
self.hash_expr.dyn_hash(state);
265+
self.on_columns.dyn_hash(state);
261266
self.description.hash(state);
267+
self.random_state.seeds().hash(state);
262268
// Note that we compare hash_map by pointer equality.
263269
// Actually comparing the contents of the hash maps would be expensive.
264270
// The way these hash maps are used in actuality is that HashJoinExec creates
@@ -279,8 +285,9 @@ impl PartialEq for HashTableLookupExpr {
279285
// hash maps to have the same content in practice.
280286
// Theoretically this is a public API and users could create identical hash maps,
281287
// but that seems unlikely and not worth paying the cost of deep comparison all the time.
282-
self.hash_expr.as_ref() == other.hash_expr.as_ref()
288+
self.on_columns == other.on_columns
283289
&& self.description == other.description
290+
&& self.random_state.seeds() == other.random_state.seeds()
284291
&& Arc::ptr_eq(&self.hash_map, &other.hash_map)
285292
}
286293
}
@@ -299,21 +306,16 @@ impl PhysicalExpr for HashTableLookupExpr {
299306
}
300307

301308
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
302-
vec![&self.hash_expr]
309+
self.on_columns.iter().collect()
303310
}
304311

305312
fn with_new_children(
306313
self: Arc<Self>,
307314
children: Vec<Arc<dyn PhysicalExpr>>,
308315
) -> Result<Arc<dyn PhysicalExpr>> {
309-
if children.len() != 1 {
310-
return internal_err!(
311-
"HashTableLookupExpr expects exactly 1 child, got {}",
312-
children.len()
313-
);
314-
}
315316
Ok(Arc::new(HashTableLookupExpr::new(
316-
Arc::clone(&children[0]),
317+
children,
318+
self.random_state.clone(),
317319
Arc::clone(&self.hash_map),
318320
self.description.clone(),
319321
)))
@@ -327,43 +329,32 @@ impl PhysicalExpr for HashTableLookupExpr {
327329
Ok(false)
328330
}
329331

330-
fn evaluate(
331-
&self,
332-
batch: &arrow::record_batch::RecordBatch,
333-
) -> Result<ColumnarValue> {
334-
let num_rows = batch.num_rows();
335-
336-
// Evaluate hash expression to get hash values
337-
let hash_array = self.hash_expr.evaluate(batch)?.into_array(num_rows)?;
338-
let hash_array = hash_array.as_any().downcast_ref::<UInt64Array>().ok_or(
339-
internal_datafusion_err!(
340-
"HashTableLookupExpr expects UInt64Array from hash expression"
341-
),
342-
)?;
343-
344-
// Check each hash against the hash table
345-
let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8));
346-
for (idx, hash_value) in hash_array.values().iter().enumerate() {
347-
// Use get_matched_indices to check - if it returns any indices, the hash exists
348-
let (matched_indices, _) = self
349-
.hash_map
350-
.get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None);
351-
352-
if !matched_indices.is_empty() {
353-
bit_util::set_bit(buf.as_slice_mut(), idx);
354-
}
355-
}
332+
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
333+
// Evaluate columns
334+
let keys_values = evaluate_columns(&self.on_columns, batch)?;
356335

357-
Ok(ColumnarValue::Array(Arc::new(
358-
BooleanArray::new_from_packed(buf, 0, num_rows),
359-
)))
336+
with_hashes(&keys_values, self.random_state.random_state(), |hashes| {
337+
let array = self.hash_map.contain_hashes(hashes);
338+
Ok(ColumnarValue::Array(Arc::new(array)))
339+
})
360340
}
361341

362342
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363343
write!(f, "{}", self.description)
364344
}
365345
}
366346

347+
fn evaluate_columns(
348+
columns: &[PhysicalExprRef],
349+
batch: &RecordBatch,
350+
) -> Result<Vec<ArrayRef>> {
351+
let num_rows = batch.num_rows();
352+
columns
353+
.iter()
354+
.map(|c| c.evaluate(batch)?.into_array(num_rows))
355+
.collect()
356+
}
357+
367358
#[cfg(test)]
368359
mod tests {
369360
use super::*;
@@ -482,22 +473,19 @@ mod tests {
482473
#[test]
483474
fn test_hash_table_lookup_expr_eq_same() {
484475
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
485-
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
486-
vec![Arc::clone(&col_a)],
487-
SeededRandomState::with_seeds(1, 2, 3, 4),
488-
"inner_hash".to_string(),
489-
));
490476
let hash_map: Arc<dyn JoinHashMapType> =
491477
Arc::new(JoinHashMapU32::with_capacity(10));
492478

493479
let expr1 = HashTableLookupExpr::new(
494-
Arc::clone(&hash_expr),
480+
vec![Arc::clone(&col_a)],
481+
SeededRandomState::with_seeds(1, 2, 3, 4),
495482
Arc::clone(&hash_map),
496483
"lookup".to_string(),
497484
);
498485

499486
let expr2 = HashTableLookupExpr::new(
500-
Arc::clone(&hash_expr),
487+
vec![Arc::clone(&col_a)],
488+
SeededRandomState::with_seeds(1, 2, 3, 4),
501489
Arc::clone(&hash_map),
502490
"lookup".to_string(),
503491
);
@@ -506,33 +494,23 @@ mod tests {
506494
}
507495

508496
#[test]
509-
fn test_hash_table_lookup_expr_eq_different_hash_expr() {
497+
fn test_hash_table_lookup_expr_eq_different_columns() {
510498
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
511499
let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
512500

513-
let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new(
514-
vec![Arc::clone(&col_a)],
515-
SeededRandomState::with_seeds(1, 2, 3, 4),
516-
"inner_hash".to_string(),
517-
));
518-
519-
let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new(
520-
vec![Arc::clone(&col_b)],
521-
SeededRandomState::with_seeds(1, 2, 3, 4),
522-
"inner_hash".to_string(),
523-
));
524-
525501
let hash_map: Arc<dyn JoinHashMapType> =
526502
Arc::new(JoinHashMapU32::with_capacity(10));
527503

528504
let expr1 = HashTableLookupExpr::new(
529-
Arc::clone(&hash_expr1),
505+
vec![Arc::clone(&col_a)],
506+
SeededRandomState::with_seeds(1, 2, 3, 4),
530507
Arc::clone(&hash_map),
531508
"lookup".to_string(),
532509
);
533510

534511
let expr2 = HashTableLookupExpr::new(
535-
Arc::clone(&hash_expr2),
512+
vec![Arc::clone(&col_b)],
513+
SeededRandomState::with_seeds(1, 2, 3, 4),
536514
Arc::clone(&hash_map),
537515
"lookup".to_string(),
538516
);
@@ -543,22 +521,19 @@ mod tests {
543521
#[test]
544522
fn test_hash_table_lookup_expr_eq_different_description() {
545523
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
546-
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
547-
vec![Arc::clone(&col_a)],
548-
SeededRandomState::with_seeds(1, 2, 3, 4),
549-
"inner_hash".to_string(),
550-
));
551524
let hash_map: Arc<dyn JoinHashMapType> =
552525
Arc::new(JoinHashMapU32::with_capacity(10));
553526

554527
let expr1 = HashTableLookupExpr::new(
555-
Arc::clone(&hash_expr),
528+
vec![Arc::clone(&col_a)],
529+
SeededRandomState::with_seeds(1, 2, 3, 4),
556530
Arc::clone(&hash_map),
557531
"lookup_one".to_string(),
558532
);
559533

560534
let expr2 = HashTableLookupExpr::new(
561-
Arc::clone(&hash_expr),
535+
vec![Arc::clone(&col_a)],
536+
SeededRandomState::with_seeds(1, 2, 3, 4),
562537
Arc::clone(&hash_map),
563538
"lookup_two".to_string(),
564539
);
@@ -569,11 +544,6 @@ mod tests {
569544
#[test]
570545
fn test_hash_table_lookup_expr_eq_different_hash_map() {
571546
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
572-
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
573-
vec![Arc::clone(&col_a)],
574-
SeededRandomState::with_seeds(1, 2, 3, 4),
575-
"inner_hash".to_string(),
576-
));
577547

578548
// Two different Arc pointers (even with same content) should not be equal
579549
let hash_map1: Arc<dyn JoinHashMapType> =
@@ -582,13 +552,15 @@ mod tests {
582552
Arc::new(JoinHashMapU32::with_capacity(10));
583553

584554
let expr1 = HashTableLookupExpr::new(
585-
Arc::clone(&hash_expr),
555+
vec![Arc::clone(&col_a)],
556+
SeededRandomState::with_seeds(1, 2, 3, 4),
586557
hash_map1,
587558
"lookup".to_string(),
588559
);
589560

590561
let expr2 = HashTableLookupExpr::new(
591-
Arc::clone(&hash_expr),
562+
vec![Arc::clone(&col_a)],
563+
SeededRandomState::with_seeds(1, 2, 3, 4),
592564
hash_map2,
593565
"lookup".to_string(),
594566
);
@@ -600,22 +572,19 @@ mod tests {
600572
#[test]
601573
fn test_hash_table_lookup_expr_hash_consistency() {
602574
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
603-
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
604-
vec![Arc::clone(&col_a)],
605-
SeededRandomState::with_seeds(1, 2, 3, 4),
606-
"inner_hash".to_string(),
607-
));
608575
let hash_map: Arc<dyn JoinHashMapType> =
609576
Arc::new(JoinHashMapU32::with_capacity(10));
610577

611578
let expr1 = HashTableLookupExpr::new(
612-
Arc::clone(&hash_expr),
579+
vec![Arc::clone(&col_a)],
580+
SeededRandomState::with_seeds(1, 2, 3, 4),
613581
Arc::clone(&hash_map),
614582
"lookup".to_string(),
615583
);
616584

617585
let expr2 = HashTableLookupExpr::new(
618-
Arc::clone(&hash_expr),
586+
vec![Arc::clone(&col_a)],
587+
SeededRandomState::with_seeds(1, 2, 3, 4),
619588
Arc::clone(&hash_map),
620589
"lookup".to_string(),
621590
);

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,9 @@ fn create_membership_predicate(
129129
}
130130
// Use hash table lookup for large build sides
131131
PushdownStrategy::HashTable(hash_map) => {
132-
let lookup_hash_expr = Arc::new(HashExpr::new(
132+
Ok(Some(Arc::new(HashTableLookupExpr::new(
133133
on_right.to_vec(),
134134
random_state.clone(),
135-
"hash_join".to_string(),
136-
)) as Arc<dyn PhysicalExpr>;
137-
138-
Ok(Some(Arc::new(HashTableLookupExpr::new(
139-
lookup_hash_expr,
140135
hash_map,
141136
"hash_lookup".to_string(),
142137
)) as Arc<dyn PhysicalExpr>))

0 commit comments

Comments
 (0)