Skip to content

Commit e49840f

Browse files
adriangbLiaCastaneda
authored andcommitted
Push down InList or hash table references from HashJoinExec depending on the size of the build side (apache#18393)
This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393 (*this PR*). There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 As those are merged I will rebase this PR to keep track of the "remaining work", and we can use this PR to explore big picture ideas or benchmarks of the final state. (cherry picked from commit c0e8bb5)
1 parent ee7d2b5 commit e49840f

File tree

17 files changed

+1333
-258
lines changed

17 files changed

+1333
-258
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,36 @@ config_namespace! {
971971
/// will be collected into a single partition
972972
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
973973

974+
/// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
975+
/// Build sides larger than this will use hash table lookups instead.
976+
/// Set to 0 to always use hash table lookups.
977+
///
978+
/// InList pushdown can be more efficient for small build sides because it can result in better
979+
/// statistics pruning as well as use any bloom filters present on the scan side.
980+
/// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
981+
/// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
982+
///
983+
/// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
984+
///
985+
/// The default is 128kB per partition.
986+
/// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
987+
/// but avoids excessive memory usage or overhead for larger joins.
988+
pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
989+
990+
/// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
991+
/// Build sides with more rows than this will use hash table lookups instead.
992+
/// Set to 0 to always use hash table lookups.
993+
///
994+
/// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
995+
/// very large IN lists that might not provide much benefit over hash table lookups.
996+
///
997+
/// This uses the deduplicated row count once the build side has been evaluated.
998+
///
999+
/// The default is 150 values per partition.
1000+
/// This is inspired by Trino's `max-filter-keys-per-column` setting.
1001+
/// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1002+
pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1003+
9741004
/// The default filter selectivity used by Filter Statistics
9751005
/// when an exact selectivity cannot be determined. Valid values are
9761006
/// between 0 (no selectivity) and 100 (all rows are selected).

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 448 additions & 8 deletions
Large diffs are not rendered by default.

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,14 @@ impl InListExpr {
320320
&self.list
321321
}
322322

323+
pub fn is_empty(&self) -> bool {
324+
self.list.is_empty()
325+
}
326+
327+
pub fn len(&self) -> usize {
328+
self.list.len()
329+
}
330+
323331
/// Is this negated e.g. NOT IN LIST
324332
pub fn negated(&self) -> bool {
325333
self.negated

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ datafusion-common = { workspace = true }
5656
datafusion-common-runtime = { workspace = true, default-features = true }
5757
datafusion-execution = { workspace = true }
5858
datafusion-expr = { workspace = true }
59+
datafusion-functions = { workspace = true }
5960
datafusion-functions-aggregate-common = { workspace = true }
6061
datafusion-functions-window-common = { workspace = true }
6162
datafusion-physical-expr = { workspace = true, default-features = true }

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use crate::filter_pushdown::{
2626
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
2727
FilterPushdownPropagation,
2828
};
29+
use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
2930
use crate::joins::hash_join::shared_bounds::{
30-
ColumnBounds, PartitionBounds, SharedBuildAccumulator,
31+
ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
3132
};
3233
use crate::joins::hash_join::stream::{
3334
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
@@ -85,7 +86,7 @@ use futures::TryStreamExt;
8586
use parking_lot::Mutex;
8687

8788
/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
88-
const HASH_JOIN_SEED: RandomState =
89+
pub(crate) const HASH_JOIN_SEED: RandomState =
8990
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
9091

9192
/// HashTable and input data for the left (build side) of a join
@@ -111,6 +112,9 @@ pub(super) struct JoinLeftData {
111112
/// If the partition is empty (no rows) this will be None.
112113
/// If the partition has some rows this will be Some with the bounds for each join key column.
113114
pub(super) bounds: Option<PartitionBounds>,
115+
/// Membership testing strategy for filter pushdown
116+
/// Contains either InList values for small build sides or hash table reference for large build sides
117+
pub(super) membership: PushdownStrategy,
114118
}
115119

116120
impl JoinLeftData {
@@ -134,6 +138,11 @@ impl JoinLeftData {
134138
&self.visited_indices_bitmap
135139
}
136140

141+
/// returns a reference to the InList values for filter pushdown
142+
pub(super) fn membership(&self) -> &PushdownStrategy {
143+
&self.membership
144+
}
145+
137146
/// Decrements the counter of running threads, and returns `true`
138147
/// if caller is the last running thread
139148
pub(super) fn report_probe_completed(&self) -> bool {
@@ -931,6 +940,16 @@ impl ExecutionPlan for HashJoinExec {
931940
need_produce_result_in_final(self.join_type),
932941
self.right().output_partitioning().partition_count(),
933942
enable_dynamic_filter_pushdown,
943+
context
944+
.session_config()
945+
.options()
946+
.optimizer
947+
.hash_join_inlist_pushdown_max_size,
948+
context
949+
.session_config()
950+
.options()
951+
.optimizer
952+
.hash_join_inlist_pushdown_max_distinct_values,
934953
))
935954
})?,
936955
PartitionMode::Partitioned => {
@@ -949,6 +968,16 @@ impl ExecutionPlan for HashJoinExec {
949968
need_produce_result_in_final(self.join_type),
950969
1,
951970
enable_dynamic_filter_pushdown,
971+
context
972+
.session_config()
973+
.options()
974+
.optimizer
975+
.hash_join_inlist_pushdown_max_size,
976+
context
977+
.session_config()
978+
.options()
979+
.optimizer
980+
.hash_join_inlist_pushdown_max_distinct_values,
952981
))
953982
}
954983
PartitionMode::Auto => {
@@ -1349,6 +1378,8 @@ async fn collect_left_input(
13491378
with_visited_indices_bitmap: bool,
13501379
probe_threads_count: usize,
13511380
should_compute_dynamic_filters: bool,
1381+
max_inlist_size: usize,
1382+
max_inlist_distinct_values: usize,
13521383
) -> Result<JoinLeftData> {
13531384
let schema = left_stream.schema();
13541385

@@ -1472,6 +1503,29 @@ async fn collect_left_input(
14721503
// Convert Box to Arc for sharing with SharedBuildAccumulator
14731504
let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
14741505

1506+
let membership = if num_rows == 0 {
1507+
PushdownStrategy::Empty
1508+
} else {
1509+
// If the build side is small enough we can use IN list pushdown.
1510+
// If it's too big we fall back to pushing down a reference to the hash table.
1511+
// See `PushdownStrategy` for more details.
1512+
let estimated_size = left_values
1513+
.iter()
1514+
.map(|arr| arr.get_array_memory_size())
1515+
.sum::<usize>();
1516+
if left_values.is_empty()
1517+
|| left_values[0].is_empty()
1518+
|| estimated_size > max_inlist_size
1519+
|| hash_map.len() > max_inlist_distinct_values
1520+
{
1521+
PushdownStrategy::HashTable(Arc::clone(&hash_map))
1522+
} else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
1523+
PushdownStrategy::InList(in_list_values)
1524+
} else {
1525+
PushdownStrategy::HashTable(Arc::clone(&hash_map))
1526+
}
1527+
};
1528+
14751529
let data = JoinLeftData {
14761530
hash_map,
14771531
batch,
@@ -1480,6 +1534,7 @@ async fn collect_left_input(
14801534
probe_threads_counter: AtomicUsize::new(probe_threads_count),
14811535
_reservation: reservation,
14821536
bounds,
1537+
membership,
14831538
};
14841539

14851540
Ok(data)
@@ -4525,7 +4580,7 @@ mod tests {
45254580
)?;
45264581
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
45274582
filter: dynamic_filter,
4528-
bounds_accumulator: OnceLock::new(),
4583+
build_accumulator: OnceLock::new(),
45294584
});
45304585

45314586
// Execute the join
@@ -4573,7 +4628,7 @@ mod tests {
45734628
)?;
45744629
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
45754630
filter: dynamic_filter,
4576-
bounds_accumulator: OnceLock::new(),
4631+
build_accumulator: OnceLock::new(),
45774632
});
45784633

45794634
// Execute the join
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Utilities for building InList expressions from hash join build side data
19+
20+
use std::sync::Arc;
21+
22+
use arrow::array::{ArrayRef, StructArray};
23+
use arrow::datatypes::{Field, FieldRef, Fields};
24+
use arrow::downcast_dictionary_array;
25+
use arrow_schema::DataType;
26+
use datafusion_common::Result;
27+
28+
pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result<Fields> {
29+
data_types
30+
.iter()
31+
.enumerate()
32+
.map(|(i, dt)| Ok(Field::new(format!("c{i}"), dt.clone(), true)))
33+
.collect()
34+
}
35+
36+
/// Flattens dictionary-encoded arrays to their underlying value arrays.
37+
/// Non-dictionary arrays are returned as-is.
38+
fn flatten_dictionary_array(array: &ArrayRef) -> ArrayRef {
39+
downcast_dictionary_array! {
40+
array => {
41+
// Recursively flatten in case of nested dictionaries
42+
flatten_dictionary_array(array.values())
43+
}
44+
_ => Arc::clone(array)
45+
}
46+
}
47+
48+
/// Builds InList values from join key column arrays.
49+
///
50+
/// If `join_key_arrays` is:
51+
/// 1. A single array, let's say Int32, this will produce a flat
52+
/// InList expression where the lookup is expected to be scalar Int32 values,
53+
/// that is: this will produce `IN LIST (1, 2, 3)` expected to be used as `2 IN LIST (1, 2, 3)`.
54+
/// 2. An Int32 array and a Utf8 array, this will produce a Struct InList expression
55+
/// where the lookup is expected to be Struct values with two fields (Int32, Utf8),
56+
/// that is: this will produce `IN LIST ((1, "a"), (2, "b"))` expected to be used as `(2, "b") IN LIST ((1, "a"), (2, "b"))`.
57+
/// The field names of the struct are auto-generated as "c0", "c1", ... and should match the struct expression used in the join keys.
58+
///
59+
/// Note that this function does not deduplicate values - deduplication will happen later
60+
/// when building an InList expression from this array via `InListExpr::try_new_from_array`.
61+
///
62+
/// Returns `None` if the estimated size exceeds `max_size_bytes` or if the number of rows
63+
/// exceeds `max_distinct_values`.
64+
pub(super) fn build_struct_inlist_values(
65+
join_key_arrays: &[ArrayRef],
66+
) -> Result<Option<ArrayRef>> {
67+
// Flatten any dictionary-encoded arrays
68+
let flattened_arrays: Vec<ArrayRef> = join_key_arrays
69+
.iter()
70+
.map(flatten_dictionary_array)
71+
.collect();
72+
73+
// Build the source array/struct
74+
let source_array: ArrayRef = if flattened_arrays.len() == 1 {
75+
// Single column: use directly
76+
Arc::clone(&flattened_arrays[0])
77+
} else {
78+
// Multi-column: build StructArray once from all columns
79+
let fields = build_struct_fields(
80+
&flattened_arrays
81+
.iter()
82+
.map(|arr| arr.data_type().clone())
83+
.collect::<Vec<_>>(),
84+
)?;
85+
86+
// Build field references with proper Arc wrapping
87+
let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields
88+
.iter()
89+
.cloned()
90+
.zip(flattened_arrays.iter().cloned())
91+
.collect();
92+
93+
Arc::new(StructArray::from(arrays_with_fields))
94+
};
95+
96+
Ok(Some(source_array))
97+
}
98+
99+
#[cfg(test)]
100+
mod tests {
101+
use super::*;
102+
use arrow::array::{Int32Array, StringArray};
103+
use arrow_schema::DataType;
104+
use std::sync::Arc;
105+
106+
#[test]
107+
fn test_build_single_column_inlist_array() {
108+
let array = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef;
109+
let result = build_struct_inlist_values(std::slice::from_ref(&array))
110+
.unwrap()
111+
.unwrap();
112+
113+
assert!(array.eq(&result));
114+
}
115+
116+
#[test]
117+
fn test_build_multi_column_inlist() {
118+
let array1 = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef;
119+
let array2 =
120+
Arc::new(StringArray::from(vec!["a", "b", "c", "b", "a"])) as ArrayRef;
121+
122+
let result = build_struct_inlist_values(&[array1, array2])
123+
.unwrap()
124+
.unwrap();
125+
126+
assert_eq!(
127+
*result.data_type(),
128+
DataType::Struct(
129+
build_struct_fields(&[DataType::Int32, DataType::Utf8]).unwrap()
130+
)
131+
);
132+
}
133+
}

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
pub use exec::HashJoinExec;
2121

2222
mod exec;
23+
mod inlist_builder;
2324
mod partitioned_hash_eval;
2425
mod shared_bounds;
2526
mod stream;

0 commit comments

Comments
 (0)