Skip to content

Commit 4c6a148

Browse files
fix: Issue with JoinSelection and CrossJoinExec when stages have been resoled (#1322)
* fix: Issue with JoinSelection and CrossJoinExec when stage is resolved #1321 * fix clippy --------- Co-authored-by: Marko Milenković <milenkovicm@users.noreply.github.com>
1 parent f305a76 commit 4c6a148

File tree

1 file changed

+35
-1
lines changed

1 file changed

+35
-1
lines changed

ballista/scheduler/src/physical_optimizer/join_selection.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,9 @@ fn statistical_join_selection_subrule(
346346
} else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
347347
let left = cross_join.left();
348348
let right = cross_join.right();
349-
if should_swap_join_order(&**left, &**right)? {
349+
if right.properties().output_partitioning().partition_count() > 1 {
350+
None
351+
} else if should_swap_join_order(&**left, &**right)? {
350352
cross_join.swap_inputs().map(Some)?
351353
} else {
352354
None
@@ -670,6 +672,38 @@ mod test {
670672
);
671673
}
672674

675+
//
676+
// join selection should not change order of joins for
677+
// cross join as we're not able to insert new CoalescePartitions
678+
// when stages are created
679+
//
680+
#[tokio::test]
681+
async fn test_cross_join_with_swap() {
682+
use std::sync::Arc;
683+
684+
use datafusion::{
685+
config::ConfigOptions,
686+
physical_optimizer::PhysicalOptimizerRule,
687+
physical_plan::{displayable, joins::CrossJoinExec},
688+
};
689+
690+
use crate::physical_optimizer::join_selection::JoinSelection;
691+
692+
let (big, small) = create_big_and_small();
693+
694+
let join = Arc::new(CrossJoinExec::new(Arc::clone(&big), Arc::clone(&small)))
695+
as Arc<dyn ExecutionPlan>;
696+
697+
let optimized_join = JoinSelection::new()
698+
.optimize(join.clone(), &ConfigOptions::new())
699+
.unwrap();
700+
701+
assert_eq!(
702+
displayable(join.as_ref()).one_line().to_string(),
703+
displayable(optimized_join.as_ref()).one_line().to_string()
704+
);
705+
}
706+
673707
fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
674708
let big = Arc::new(StatisticsExec::new(
675709
big_statistics(),

0 commit comments

Comments
 (0)