Skip to content

Commit 91cfb69

Browse files
adriangbclaude
andauthored
feat(proto): Add protobuf serialization for HashExpr (#19379)
## Summary This PR adds protobuf serialization/deserialization support for `HashExpr`, enabling distributed query execution to serialize hash expressions used in hash joins and repartitioning. This is a followup to #18393 which introduced `HashExpr` but did not add serialization support. This causes errors when serialization is triggered on a query that pushes down dynamic filters from a `HashJoinExec`. As of #18393 `HashJoinExec` produces filters of the form: ```sql CASE (hash_repartition % 2) WHEN 0 THEN a >= ab AND a <= ab AND b >= bb AND b <= bb AND hash_lookup(a,b) WHEN 1 THEN a >= aa AND a <= aa AND b >= ba AND b <= ba AND hash_lookup(a,b) ELSE FALSE END ``` Where `hash_lookup` is an expression that holds a reference to a given partitions hash join hash table and will check for membership. Since we created these new expressions but didn't make any of them serializable any attempt to do a distributed query or similar would run into errors. In #19300 we fixed `hash_lookup` by replacing it with `true` since it can't be serialized across the wire (we'd have to send the entire hash table). The logic was that this preserves the bounds checks, which as still valuable. This PR handles `hash_repartition` which determines which partition (and hence which branch of the `CASE` expression) the row belongs to. For this expression we *can* serialize it, so that's what I'm doing in this PR. ### Key Changes - **SeededRandomState wrapper**: Added a `SeededRandomState` struct that wraps `ahash::RandomState` while preserving the seeds used to create it. This is necessary because `RandomState` doesn't expose seeds after creation, but we need them for serialization. - **Updated seed constants**: Changed `HASH_JOIN_SEED` and `REPARTITION_RANDOM_STATE` constants to use `SeededRandomState` instead of raw `RandomState`. - **HashExpr enhancements**: - Changed `HashExpr` to use `SeededRandomState` - Added getter methods: `on_columns()`, `seeds()`, `description()` - Exported `HashExpr` and `SeededRandomState` from the joins module - **Protobuf support**: - Added `PhysicalHashExprNode` message to `datafusion.proto` with fields for `on_columns`, seeds (4 `u64` values), and `description` - Implemented serialization in `to_proto.rs` - Implemented deserialization in `from_proto.rs` ## Test plan - [x] Added roundtrip test in `roundtrip_physical_plan.rs` that creates a `HashExpr`, serializes it, deserializes it, and verifies the result - [x] All existing hash join tests pass (583 tests) - [x] All proto roundtrip tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 887aa9f commit 91cfb69

File tree

13 files changed

+670
-29
lines changed

13 files changed

+670
-29
lines changed

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,11 @@ use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8686
use futures::TryStreamExt;
8787
use parking_lot::Mutex;
8888

89+
use super::partitioned_hash_eval::SeededRandomState;
90+
8991
/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
90-
pub(crate) const HASH_JOIN_SEED: RandomState =
91-
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
92+
pub(crate) const HASH_JOIN_SEED: SeededRandomState =
93+
SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
9294

9395
/// HashTable and input data for the left (build side) of a join
9496
pub(super) struct JoinLeftData {
@@ -334,8 +336,8 @@ pub struct HashJoinExec {
334336
/// Each output stream waits on the `OnceAsync` to signal the completion of
335337
/// the hash table creation.
336338
left_fut: Arc<OnceAsync<JoinLeftData>>,
337-
/// Shared the `RandomState` for the hashing algorithm
338-
random_state: RandomState,
339+
/// Shared the `SeededRandomState` for the hashing algorithm (seeds preserved for serialization)
340+
random_state: SeededRandomState,
339341
/// Partitioning mode to use
340342
pub mode: PartitionMode,
341343
/// Execution metrics
@@ -930,7 +932,7 @@ impl ExecutionPlan for HashJoinExec {
930932
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
931933

932934
Ok(collect_left_input(
933-
self.random_state.clone(),
935+
self.random_state.random_state().clone(),
934936
left_stream,
935937
on_left.clone(),
936938
join_metrics.clone(),
@@ -958,7 +960,7 @@ impl ExecutionPlan for HashJoinExec {
958960
.register(context.memory_pool());
959961

960962
OnceFut::new(collect_left_input(
961-
self.random_state.clone(),
963+
self.random_state.random_state().clone(),
962964
left_stream,
963965
on_left.clone(),
964966
join_metrics.clone(),
@@ -1041,7 +1043,7 @@ impl ExecutionPlan for HashJoinExec {
10411043
self.filter.clone(),
10421044
self.join_type,
10431045
right_stream,
1044-
self.random_state.clone(),
1046+
self.random_state.random_state().clone(),
10451047
join_metrics,
10461048
column_indices_after_projection,
10471049
self.null_equality,

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! [`HashJoinExec`] Partitioned Hash Join Operator
1919
2020
pub use exec::HashJoinExec;
21-
pub use partitioned_hash_eval::HashTableLookupExpr;
21+
pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState};
2222

2323
mod exec;
2424
mod inlist_builder;

0 commit comments

Comments
 (0)