Skip to content

Commit 2e3d650

Browse files
LiaCastanedaadriangbdavidhewittalambEmilyMatt
authored
Bring IN LIST Dynamic Filtering work (#63)
* Refactor InListExpr to support structs by re-using existing hashing infrastructure (apache#18449) This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - (This PR): apache#18449 (depends on apache#18448) - apache#18451 - Enhance InListExpr to efficiently store homogeneous lists as arrays and avoid a conversion to Vec<PhysicalExpr> by adding an internal InListStorage enum with Array and Exprs variants - Re-use existing hashing and comparison utilities to support Struct arrays and other complex types - Add public function `in_list_from_array(expr, list_array, negated)` for creating InList from arrays Although the diff looks large most of it is actually tests and docs. I think the actual code change is a negative LOC change, or at least negative complexity (eliminates a trait, a macro, matching on data types). --------- Co-authored-by: David Hewitt <mail@davidhewitt.dev> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> (cherry picked from commit 486c5d8) * feat: Add evaluate_to_arrays function (apache#18446) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#18330 . ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Reduce code duplication. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> A util function replacing many calls which are using the same code. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> No logic should change whatsoever, so each area which now uses this code should have it's own tests and benchmarks unmodified. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Yes, there is now a new pub function. No other changes to API. --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> (cherry picked from commit 76b4156) * Refactor state management in `HashJoinExec` and use CASE expressions for more precise filters (apache#18451) ## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - apache#18449 (depends on apache#18448) - (This PR): apache#18451 ## Changes in this PR This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests. - Refactor internal data structures to clean up state management and make usage more idiomatic (use `Option` instead of comparing integers, etc.) - Uses CASE expressions to evaluate pushed-down filters selectively by partition Example: `CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END` --------- Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com> (cherry picked from commit 5b0aa37) * Push down InList or hash table references from HashJoinExec depending on the size of the build side (apache#18393) This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393 (*this PR*). There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 As those are merged I will rebase this PR to keep track of the "remaining work", and we can use this PR to explore big picture ideas or benchmarks of the final state. (cherry picked from commit c0e8bb5) * fmt * replace HashTableLookupExpr with lit(true) in proto serialization (apache#19300) *errors* when serializing now, and would break any users using joins + protobuf. --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Co-authored-by: David Hewitt <mail@davidhewitt.dev> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Emily Matheys <55631053+EmilyMatt@users.noreply.github.com> Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
1 parent 61aa275 commit 2e3d650

File tree

36 files changed

+4103
-662
lines changed

36 files changed

+4103
-662
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,36 @@ config_namespace! {
971971
/// will be collected into a single partition
972972
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
973973

974+
/// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
975+
/// Build sides larger than this will use hash table lookups instead.
976+
/// Set to 0 to always use hash table lookups.
977+
///
978+
/// InList pushdown can be more efficient for small build sides because it can result in better
979+
/// statistics pruning as well as use any bloom filters present on the scan side.
980+
/// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
981+
/// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
982+
///
983+
/// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
984+
///
985+
/// The default is 128kB per partition.
986+
/// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
987+
/// but avoids excessive memory usage or overhead for larger joins.
988+
pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
989+
990+
/// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
991+
/// Build sides with more rows than this will use hash table lookups instead.
992+
/// Set to 0 to always use hash table lookups.
993+
///
994+
/// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
995+
/// very large IN lists that might not provide much benefit over hash table lookups.
996+
///
997+
/// This uses the deduplicated row count once the build side has been evaluated.
998+
///
999+
/// The default is 150 values per partition.
1000+
/// This is inspired by Trino's `max-filter-keys-per-column` setting.
1001+
/// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1002+
pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1003+
9741004
/// The default filter selectivity used by Filter Statistics
9751005
/// when an exact selectivity cannot be determined. Valid values are
9761006
/// between 0 (no selectivity) and 100 (all rows are selected).

datafusion/common/src/hash_utils.rs

Lines changed: 173 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use crate::cast::{
3131
as_string_array, as_string_view_array, as_struct_array,
3232
};
3333
use crate::error::Result;
34-
#[cfg(not(feature = "force_hash_collisions"))]
35-
use crate::error::_internal_err;
34+
use crate::error::{_internal_datafusion_err, _internal_err};
35+
use std::cell::RefCell;
3636

3737
// Combines two hashes into one hash
3838
#[inline]
@@ -41,6 +41,94 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
4141
hash.wrapping_mul(37).wrapping_add(r)
4242
}
4343

44+
/// Maximum size for the thread-local hash buffer before truncation (4MB = 524,288 u64 elements).
45+
/// The goal of this is to avoid unbounded memory growth that would appear as a memory leak.
46+
/// We allow temporary allocations beyond this size, but after use the buffer is truncated
47+
/// to this size.
48+
const MAX_BUFFER_SIZE: usize = 524_288;
49+
50+
thread_local! {
51+
/// Thread-local buffer for hash computations to avoid repeated allocations.
52+
/// The buffer is reused across calls and truncated if it exceeds MAX_BUFFER_SIZE.
53+
/// Defaults to a capacity of 8192 u64 elements which is the default batch size.
54+
/// This corresponds to 64KB of memory.
55+
static HASH_BUFFER: RefCell<Vec<u64>> = const { RefCell::new(Vec::new()) };
56+
}
57+
58+
/// Creates hashes for the given arrays using a thread-local buffer, then calls the provided callback
59+
/// with an immutable reference to the computed hashes.
60+
///
61+
/// This function manages a thread-local buffer to avoid repeated allocations. The buffer is automatically
62+
/// truncated if it exceeds `MAX_BUFFER_SIZE` after use.
63+
///
64+
/// # Arguments
65+
/// * `arrays` - The arrays to hash (must contain at least one array)
66+
/// * `random_state` - The random state for hashing
67+
/// * `callback` - A function that receives an immutable reference to the hash slice and returns a result
68+
///
69+
/// # Errors
70+
/// Returns an error if:
71+
/// - No arrays are provided
72+
/// - The function is called reentrantly (i.e., the callback invokes `with_hashes` again on the same thread)
73+
/// - The function is called during or after thread destruction
74+
///
75+
/// # Example
76+
/// ```ignore
77+
/// use datafusion_common::hash_utils::{with_hashes, RandomState};
78+
/// use arrow::array::{Int32Array, ArrayRef};
79+
/// use std::sync::Arc;
80+
///
81+
/// let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
82+
/// let random_state = RandomState::new();
83+
///
84+
/// let result = with_hashes([&array], &random_state, |hashes| {
85+
/// // Use the hashes here
86+
/// Ok(hashes.len())
87+
/// })?;
88+
/// ```
89+
pub fn with_hashes<I, T, F, R>(
90+
arrays: I,
91+
random_state: &RandomState,
92+
callback: F,
93+
) -> Result<R>
94+
where
95+
I: IntoIterator<Item = T>,
96+
T: AsDynArray,
97+
F: FnOnce(&[u64]) -> Result<R>,
98+
{
99+
// Peek at the first array to determine buffer size without fully collecting
100+
let mut iter = arrays.into_iter().peekable();
101+
102+
// Get the required size from the first array
103+
let required_size = match iter.peek() {
104+
Some(arr) => arr.as_dyn_array().len(),
105+
None => return _internal_err!("with_hashes requires at least one array"),
106+
};
107+
108+
HASH_BUFFER.try_with(|cell| {
109+
let mut buffer = cell.try_borrow_mut()
110+
.map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?;
111+
112+
// Ensure buffer has sufficient length, clearing old values
113+
buffer.clear();
114+
buffer.resize(required_size, 0);
115+
116+
// Create hashes in the buffer - this consumes the iterator
117+
create_hashes(iter, random_state, &mut buffer[..required_size])?;
118+
119+
// Execute the callback with an immutable slice
120+
let result = callback(&buffer[..required_size])?;
121+
122+
// Cleanup: truncate if buffer grew too large
123+
if buffer.capacity() > MAX_BUFFER_SIZE {
124+
buffer.truncate(MAX_BUFFER_SIZE);
125+
buffer.shrink_to_fit();
126+
}
127+
128+
Ok(result)
129+
}).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))?
130+
}
131+
44132
#[cfg(not(feature = "force_hash_collisions"))]
45133
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
46134
if mul_col {
@@ -478,8 +566,8 @@ impl AsDynArray for &ArrayRef {
478566
pub fn create_hashes<'a, I, T>(
479567
arrays: I,
480568
random_state: &RandomState,
481-
hashes_buffer: &'a mut Vec<u64>,
482-
) -> Result<&'a mut Vec<u64>>
569+
hashes_buffer: &'a mut [u64],
570+
) -> Result<&'a mut [u64]>
483571
where
484572
I: IntoIterator<Item = T>,
485573
T: AsDynArray,
@@ -522,7 +610,7 @@ mod tests {
522610
fn create_hashes_for_empty_fixed_size_lit() -> Result<()> {
523611
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
524612
let random_state = RandomState::with_seeds(0, 0, 0, 0);
525-
let hashes_buff = &mut vec![0; 0];
613+
let hashes_buff = &mut [0; 0];
526614
let hashes = create_hashes(
527615
&[Arc::new(empty_array) as ArrayRef],
528616
&random_state,
@@ -1000,4 +1088,84 @@ mod tests {
10001088

10011089
assert_eq!(hashes1, hashes2);
10021090
}
1091+
1092+
#[test]
1093+
fn test_with_hashes() {
1094+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1095+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1096+
1097+
// Test that with_hashes produces the same results as create_hashes
1098+
let mut expected_hashes = vec![0; array.len()];
1099+
create_hashes([&array], &random_state, &mut expected_hashes).unwrap();
1100+
1101+
let result = with_hashes([&array], &random_state, |hashes| {
1102+
assert_eq!(hashes.len(), 4);
1103+
// Verify hashes match expected values
1104+
assert_eq!(hashes, &expected_hashes[..]);
1105+
// Return a copy of the hashes
1106+
Ok(hashes.to_vec())
1107+
})
1108+
.unwrap();
1109+
1110+
// Verify callback result is returned correctly
1111+
assert_eq!(result, expected_hashes);
1112+
}
1113+
1114+
#[test]
1115+
fn test_with_hashes_multi_column() {
1116+
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1117+
let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1118+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1119+
1120+
// Test multi-column hashing
1121+
let mut expected_hashes = vec![0; int_array.len()];
1122+
create_hashes(
1123+
[&int_array, &str_array],
1124+
&random_state,
1125+
&mut expected_hashes,
1126+
)
1127+
.unwrap();
1128+
1129+
with_hashes([&int_array, &str_array], &random_state, |hashes| {
1130+
assert_eq!(hashes.len(), 3);
1131+
assert_eq!(hashes, &expected_hashes[..]);
1132+
Ok(())
1133+
})
1134+
.unwrap();
1135+
}
1136+
1137+
#[test]
1138+
fn test_with_hashes_empty_arrays() {
1139+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1140+
1141+
// Test that passing no arrays returns an error
1142+
let empty: [&ArrayRef; 0] = [];
1143+
let result = with_hashes(empty, &random_state, |_hashes| Ok(()));
1144+
1145+
assert!(result.is_err());
1146+
assert!(result
1147+
.unwrap_err()
1148+
.to_string()
1149+
.contains("requires at least one array"));
1150+
}
1151+
1152+
#[test]
1153+
fn test_with_hashes_reentrancy() {
1154+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1155+
let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
1156+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1157+
1158+
// Test that reentrant calls return an error instead of panicking
1159+
let result = with_hashes([&array], &random_state, |_hashes| {
1160+
// Try to call with_hashes again inside the callback
1161+
with_hashes([&array2], &random_state, |_inner_hashes| Ok(()))
1162+
});
1163+
1164+
assert!(result.is_err());
1165+
let err_msg = result.unwrap_err().to_string();
1166+
assert!(
1167+
err_msg.contains("reentrantly") || err_msg.contains("cannot be called"),
1168+
"Error message should mention reentrancy: {err_msg}",
1169+
);
1170+
}
10031171
}

0 commit comments

Comments
 (0)