Skip to content

Commit 6d62650

Browse files
feat: add LazyPartitioned mode for hash join to reduce RepartitionExec overhead
This commit adds a new PartitionMode::LazyPartitioned that avoids the full build-side RepartitionExec when executing partitioned hash joins. Instead of pre-repartitioning all columns of the build table, rows are filtered lazily during hash table construction using hash(join_keys) % partition_count. Key changes: - Add LazyPartitioned variant to PartitionMode enum - Build side requests UnspecifiedDistribution (merged, no repartition) - Probe side still requests HashPartitioned distribution - Add filter_batch_by_partition() to filter build rows per partition - Update collect_left_input to accept optional partition filter - Add protobuf serialization support for new mode - Update optimizer to handle LazyPartitioned in key reordering This optimization is beneficial for wide build tables where copying all columns in RepartitionExec is expensive. Closes #19789
1 parent 472a729 commit 6d62650

File tree

10 files changed

+299
-45
lines changed

10 files changed

+299
-45
lines changed

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,34 @@ pub fn adjust_input_keys_ordering(
327327
)
328328
.map(Transformed::yes);
329329
}
330+
PartitionMode::LazyPartitioned => {
331+
// LazyPartitioned mode uses the same key reordering as Partitioned,
332+
// but with LazyPartitioned mode preserved
333+
let join_constructor = |new_conditions: (
334+
Vec<(PhysicalExprRef, PhysicalExprRef)>,
335+
Vec<SortOptions>,
336+
)| {
337+
HashJoinExec::try_new(
338+
Arc::clone(left),
339+
Arc::clone(right),
340+
new_conditions.0,
341+
filter.clone(),
342+
join_type,
343+
projection.clone(),
344+
PartitionMode::LazyPartitioned,
345+
*null_equality,
346+
*null_aware,
347+
)
348+
.map(|e| Arc::new(e) as _)
349+
};
350+
return reorder_partitioned_join_keys(
351+
requirements,
352+
on,
353+
&[],
354+
&join_constructor,
355+
)
356+
.map(Transformed::yes);
357+
}
330358
PartitionMode::CollectLeft => {
331359
// Push down requirements to the right side
332360
requirements.children[1].data = match join_type {
@@ -624,7 +652,10 @@ pub fn reorder_join_keys_to_inputs(
624652
..
625653
}) = plan_any.downcast_ref::<HashJoinExec>()
626654
{
627-
if matches!(mode, PartitionMode::Partitioned) {
655+
if matches!(
656+
mode,
657+
PartitionMode::Partitioned | PartitionMode::LazyPartitioned
658+
) {
628659
let (join_keys, positions) = reorder_current_join_keys(
629660
extract_join_keys(on),
630661
Some(left.output_partitioning()),
@@ -645,7 +676,7 @@ pub fn reorder_join_keys_to_inputs(
645676
filter.clone(),
646677
join_type,
647678
projection.clone(),
648-
PartitionMode::Partitioned,
679+
*mode,
649680
*null_equality,
650681
*null_aware,
651682
)?));
@@ -1257,6 +1288,10 @@ pub fn ensure_distribution(
12571288
//
12581289
// CollectLeft/CollectRight modes are safe because one side is collected
12591290
// to a single partition which eliminates partition-to-partition mapping.
1291+
//
1292+
// LazyPartitioned mode is also safe from this issue because the build side
1293+
// is not pre-partitioned; instead, rows are filtered locally during hash
1294+
// table construction. Only the probe side is hash-partitioned.
12601295
let is_partitioned_join = plan
12611296
.as_any()
12621297
.downcast_ref::<HashJoinExec>()

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ fn statistical_join_selection_subrule(
293293
|| partitioned_hash_join(hash_join).map(Some),
294294
|v| Ok(Some(v)),
295295
)?,
296-
PartitionMode::Partitioned => {
296+
PartitionMode::Partitioned | PartitionMode::LazyPartitioned => {
297297
let left = hash_join.left();
298298
let right = hash_join.right();
299299
// Don't swap null-aware anti joins as they have specific side requirements
@@ -302,7 +302,7 @@ fn statistical_join_selection_subrule(
302302
&& should_swap_join_order(&**left, &**right)?
303303
{
304304
hash_join
305-
.swap_inputs(PartitionMode::Partitioned)
305+
.swap_inputs(*hash_join.partition_mode())
306306
.map(Some)?
307307
} else {
308308
None
@@ -540,6 +540,9 @@ pub(crate) fn swap_join_according_to_unboundedness(
540540
(PartitionMode::Partitioned, _) => {
541541
hash_join.swap_inputs(PartitionMode::Partitioned)
542542
}
543+
(PartitionMode::LazyPartitioned, _) => {
544+
hash_join.swap_inputs(PartitionMode::LazyPartitioned)
545+
}
543546
(PartitionMode::CollectLeft, _) => {
544547
hash_join.swap_inputs(PartitionMode::CollectLeft)
545548
}

0 commit comments

Comments
 (0)