Skip to content

Commit d73f0e8

Browse files
feat: Finalize support for RightMark join + Mark join swap (#16488)
* feat: Finalize support for `RightMark` join * Update utils.rs * add `join_selection` tests * fmt * Update join_selection.rs --------- Co-authored-by: Oleks V <[email protected]>
1 parent 586a88c commit d73f0e8

File tree

11 files changed

+181
-52
lines changed

11 files changed

+181
-52
lines changed

datafusion/common/src/join_type.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ impl JoinType {
109109
| JoinType::RightSemi
110110
| JoinType::LeftAnti
111111
| JoinType::RightAnti
112+
| JoinType::LeftMark
113+
| JoinType::RightMark
112114
)
113115
}
114116
}

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ async fn test_left_mark_join_1k_filtered() {
305305
.await
306306
}
307307

308-
// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
309308
#[tokio::test]
310309
async fn test_right_mark_join_1k() {
311310
JoinFuzzTestCase::new(
@@ -314,7 +313,7 @@ async fn test_right_mark_join_1k() {
314313
JoinType::RightMark,
315314
None,
316315
)
317-
.run_test(&[NljHj], false)
316+
.run_test(&[HjSmj, NljHj], false)
318317
.await
319318
}
320319

@@ -326,7 +325,7 @@ async fn test_right_mark_join_1k_filtered() {
326325
JoinType::RightMark,
327326
Some(Box::new(col_lt_col_filter)),
328327
)
329-
.run_test(&[NljHj], false)
328+
.run_test(&[HjSmj, NljHj], false)
330329
.await
331330
}
332331

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,61 @@ async fn test_join_with_swap_semi() {
371371
}
372372
}
373373

374+
#[tokio::test]
375+
async fn test_join_with_swap_mark() {
376+
let join_types = [JoinType::LeftMark];
377+
for join_type in join_types {
378+
let (big, small) = create_big_and_small();
379+
380+
let join = HashJoinExec::try_new(
381+
Arc::clone(&big),
382+
Arc::clone(&small),
383+
vec![(
384+
Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()),
385+
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()),
386+
)],
387+
None,
388+
&join_type,
389+
None,
390+
PartitionMode::Partitioned,
391+
NullEquality::NullEqualsNothing,
392+
)
393+
.unwrap();
394+
395+
let original_schema = join.schema();
396+
397+
let optimized_join = JoinSelection::new()
398+
.optimize(Arc::new(join), &ConfigOptions::new())
399+
.unwrap();
400+
401+
let swapped_join = optimized_join
402+
.as_any()
403+
.downcast_ref::<HashJoinExec>()
404+
.expect(
405+
"A proj is not required to swap columns back to their original order",
406+
);
407+
408+
assert_eq!(swapped_join.schema().fields().len(), 2);
409+
assert_eq!(
410+
swapped_join
411+
.left()
412+
.partition_statistics(None)
413+
.unwrap()
414+
.total_byte_size,
415+
Precision::Inexact(8192)
416+
);
417+
assert_eq!(
418+
swapped_join
419+
.right()
420+
.partition_statistics(None)
421+
.unwrap()
422+
.total_byte_size,
423+
Precision::Inexact(2097152)
424+
);
425+
assert_eq!(original_schema, swapped_join.schema());
426+
}
427+
}
428+
374429
/// Compare the input plan with the plan after running the probe order optimizer.
375430
macro_rules! assert_optimized {
376431
($EXPECTED_LINES: expr, $PLAN: expr) => {
@@ -577,8 +632,10 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
577632
join_type,
578633
case::left_semi(JoinType::LeftSemi),
579634
case::left_anti(JoinType::LeftAnti),
635+
case::left_mark(JoinType::LeftMark),
580636
case::right_semi(JoinType::RightSemi),
581-
case::right_anti(JoinType::RightAnti)
637+
case::right_anti(JoinType::RightAnti),
638+
case::right_mark(JoinType::RightMark)
582639
)]
583640
#[tokio::test]
584641
async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1652,7 +1652,10 @@ pub fn build_join_schema(
16521652
);
16531653

16541654
let (schema1, schema2) = match join_type {
1655-
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
1655+
JoinType::Right
1656+
| JoinType::RightSemi
1657+
| JoinType::RightAnti
1658+
| JoinType::RightMark => (left, right),
16561659
_ => (right, left),
16571660
};
16581661

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ pub fn hash_join_swap_subrule(
539539
| JoinType::Left
540540
| JoinType::LeftSemi
541541
| JoinType::LeftAnti
542+
| JoinType::LeftMark
542543
)
543544
{
544545
input = swap_join_according_to_unboundedness(hash_join)?;
@@ -549,10 +550,10 @@ pub fn hash_join_swap_subrule(
549550

550551
/// This function swaps sides of a hash join to make it runnable even if one of
551552
/// its inputs are infinite. Note that this is not always possible; i.e.
552-
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
553-
/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
554-
/// we swap join sides. Therefore, we do not consider them here.
555-
/// This function is crate public as it is useful for downstream projects
553+
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`],
554+
/// [`JoinType::RightSemi`], and [`JoinType::RightMark`] can not run with an
555+
/// unbounded left side, even if we swap join sides. Therefore, we do not consider
556+
/// them here. This function is crate public as it is useful for downstream projects
556557
/// to implement, or experiment with, their own join selection rules.
557558
pub(crate) fn swap_join_according_to_unboundedness(
558559
hash_join: &HashJoinExec,
@@ -562,7 +563,11 @@ pub(crate) fn swap_join_according_to_unboundedness(
562563
match (*partition_mode, *join_type) {
563564
(
564565
_,
565-
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
566+
JoinType::Right
567+
| JoinType::RightSemi
568+
| JoinType::RightAnti
569+
| JoinType::RightMark
570+
| JoinType::Full,
566571
) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
567572
(PartitionMode::Partitioned, _) => {
568573
hash_join.swap_inputs(PartitionMode::Partitioned)

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,13 +618,16 @@ impl HashJoinExec {
618618
partition_mode,
619619
self.null_equality(),
620620
)?;
621-
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
621+
622+
// In case of Anti/Semi/Mark joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
622623
if matches!(
623624
self.join_type(),
624625
JoinType::LeftSemi
625626
| JoinType::RightSemi
626627
| JoinType::LeftAnti
627628
| JoinType::RightAnti
629+
| JoinType::LeftMark
630+
| JoinType::RightMark
628631
) || self.projection.is_some()
629632
{
630633
Ok(Arc::new(new_join))

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,14 +371,16 @@ impl NestedLoopJoinExec {
371371
),
372372
)?;
373373

374-
// For Semi/Anti joins, swap result will produce same output schema,
374+
// For Semi/Anti/Mark joins, swap result will produce same output schema,
375375
// no need to wrap them into additional projection
376376
let plan: Arc<dyn ExecutionPlan> = if matches!(
377377
self.join_type(),
378378
JoinType::LeftSemi
379379
| JoinType::RightSemi
380380
| JoinType::LeftAnti
381381
| JoinType::RightAnti
382+
| JoinType::LeftMark
383+
| JoinType::RightMark
382384
) || self.projection.is_some()
383385
{
384386
Arc::new(new_join)

0 commit comments

Comments
 (0)