Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,34 @@ pub fn adjust_input_keys_ordering(
)
.map(Transformed::yes);
}
PartitionMode::LazyPartitioned => {
// LazyPartitioned mode uses the same key reordering as Partitioned,
// but with LazyPartitioned mode preserved
let join_constructor = |new_conditions: (
Vec<(PhysicalExprRef, PhysicalExprRef)>,
Vec<SortOptions>,
)| {
HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
new_conditions.0,
filter.clone(),
join_type,
projection.clone(),
PartitionMode::LazyPartitioned,
*null_equality,
*null_aware,
)
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
on,
&[],
&join_constructor,
)
.map(Transformed::yes);
}
PartitionMode::CollectLeft => {
// Push down requirements to the right side
requirements.children[1].data = match join_type {
Expand Down Expand Up @@ -624,7 +652,10 @@ pub fn reorder_join_keys_to_inputs(
..
}) = plan_any.downcast_ref::<HashJoinExec>()
{
if matches!(mode, PartitionMode::Partitioned) {
if matches!(
mode,
PartitionMode::Partitioned | PartitionMode::LazyPartitioned
) {
let (join_keys, positions) = reorder_current_join_keys(
extract_join_keys(on),
Some(left.output_partitioning()),
Expand All @@ -645,7 +676,7 @@ pub fn reorder_join_keys_to_inputs(
filter.clone(),
join_type,
projection.clone(),
PartitionMode::Partitioned,
*mode,
*null_equality,
*null_aware,
)?));
Expand Down Expand Up @@ -1257,6 +1288,10 @@ pub fn ensure_distribution(
//
// CollectLeft/CollectRight modes are safe because one side is collected
// to a single partition which eliminates partition-to-partition mapping.
//
// LazyPartitioned mode is also safe from this issue because the build side
// is not pre-partitioned; instead, rows are filtered locally during hash
// table construction. Only the probe side is hash-partitioned.
let is_partitioned_join = plan
.as_any()
.downcast_ref::<HashJoinExec>()
Expand Down
7 changes: 5 additions & 2 deletions datafusion/physical-optimizer/src/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ fn statistical_join_selection_subrule(
|| partitioned_hash_join(hash_join).map(Some),
|v| Ok(Some(v)),
)?,
PartitionMode::Partitioned => {
PartitionMode::Partitioned | PartitionMode::LazyPartitioned => {
let left = hash_join.left();
let right = hash_join.right();
// Don't swap null-aware anti joins as they have specific side requirements
Expand All @@ -302,7 +302,7 @@ fn statistical_join_selection_subrule(
&& should_swap_join_order(&**left, &**right)?
{
hash_join
.swap_inputs(PartitionMode::Partitioned)
.swap_inputs(*hash_join.partition_mode())
.map(Some)?
} else {
None
Expand Down Expand Up @@ -540,6 +540,9 @@ pub(crate) fn swap_join_according_to_unboundedness(
(PartitionMode::Partitioned, _) => {
hash_join.swap_inputs(PartitionMode::Partitioned)
}
(PartitionMode::LazyPartitioned, _) => {
hash_join.swap_inputs(PartitionMode::LazyPartitioned)
}
(PartitionMode::CollectLeft, _) => {
hash_join.swap_inputs(PartitionMode::CollectLeft)
}
Expand Down
Loading