Skip to content

Commit 8d772e5

Browse files
authored
Revert "feat: Finalize support for RightMark join + Mark join swap (#16488)" (#16597)
This reverts commit d73f0e8.
1 parent 1de4d0e commit 8d772e5

File tree

11 files changed

+52
-181
lines changed

11 files changed

+52
-181
lines changed

datafusion/common/src/join_type.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,6 @@ impl JoinType {
109109
| JoinType::RightSemi
110110
| JoinType::LeftAnti
111111
| JoinType::RightAnti
112-
| JoinType::LeftMark
113-
| JoinType::RightMark
114112
)
115113
}
116114
}

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ async fn test_left_mark_join_1k_filtered() {
305305
.await
306306
}
307307

308+
// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
308309
#[tokio::test]
309310
async fn test_right_mark_join_1k() {
310311
JoinFuzzTestCase::new(
@@ -313,7 +314,7 @@ async fn test_right_mark_join_1k() {
313314
JoinType::RightMark,
314315
None,
315316
)
316-
.run_test(&[HjSmj, NljHj], false)
317+
.run_test(&[NljHj], false)
317318
.await
318319
}
319320

@@ -325,7 +326,7 @@ async fn test_right_mark_join_1k_filtered() {
325326
JoinType::RightMark,
326327
Some(Box::new(col_lt_col_filter)),
327328
)
328-
.run_test(&[HjSmj, NljHj], false)
329+
.run_test(&[NljHj], false)
329330
.await
330331
}
331332

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -371,61 +371,6 @@ async fn test_join_with_swap_semi() {
371371
}
372372
}
373373

374-
#[tokio::test]
375-
async fn test_join_with_swap_mark() {
376-
let join_types = [JoinType::LeftMark];
377-
for join_type in join_types {
378-
let (big, small) = create_big_and_small();
379-
380-
let join = HashJoinExec::try_new(
381-
Arc::clone(&big),
382-
Arc::clone(&small),
383-
vec![(
384-
Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()),
385-
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()),
386-
)],
387-
None,
388-
&join_type,
389-
None,
390-
PartitionMode::Partitioned,
391-
NullEquality::NullEqualsNothing,
392-
)
393-
.unwrap();
394-
395-
let original_schema = join.schema();
396-
397-
let optimized_join = JoinSelection::new()
398-
.optimize(Arc::new(join), &ConfigOptions::new())
399-
.unwrap();
400-
401-
let swapped_join = optimized_join
402-
.as_any()
403-
.downcast_ref::<HashJoinExec>()
404-
.expect(
405-
"A proj is not required to swap columns back to their original order",
406-
);
407-
408-
assert_eq!(swapped_join.schema().fields().len(), 2);
409-
assert_eq!(
410-
swapped_join
411-
.left()
412-
.partition_statistics(None)
413-
.unwrap()
414-
.total_byte_size,
415-
Precision::Inexact(8192)
416-
);
417-
assert_eq!(
418-
swapped_join
419-
.right()
420-
.partition_statistics(None)
421-
.unwrap()
422-
.total_byte_size,
423-
Precision::Inexact(2097152)
424-
);
425-
assert_eq!(original_schema, swapped_join.schema());
426-
}
427-
}
428-
429374
/// Compare the input plan with the plan after running the probe order optimizer.
430375
macro_rules! assert_optimized {
431376
($EXPECTED_LINES: expr, $PLAN: expr) => {
@@ -632,10 +577,8 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
632577
join_type,
633578
case::left_semi(JoinType::LeftSemi),
634579
case::left_anti(JoinType::LeftAnti),
635-
case::left_mark(JoinType::LeftMark),
636580
case::right_semi(JoinType::RightSemi),
637-
case::right_anti(JoinType::RightAnti),
638-
case::right_mark(JoinType::RightMark)
581+
case::right_anti(JoinType::RightAnti)
639582
)]
640583
#[tokio::test]
641584
async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,10 +1648,7 @@ pub fn build_join_schema(
16481648
);
16491649

16501650
let (schema1, schema2) = match join_type {
1651-
JoinType::Right
1652-
| JoinType::RightSemi
1653-
| JoinType::RightAnti
1654-
| JoinType::RightMark => (left, right),
1651+
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
16551652
_ => (right, left),
16561653
};
16571654

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,6 @@ pub fn hash_join_swap_subrule(
539539
| JoinType::Left
540540
| JoinType::LeftSemi
541541
| JoinType::LeftAnti
542-
| JoinType::LeftMark
543542
)
544543
{
545544
input = swap_join_according_to_unboundedness(hash_join)?;
@@ -550,10 +549,10 @@ pub fn hash_join_swap_subrule(
550549

551550
/// This function swaps sides of a hash join to make it runnable even if one of
552551
/// its inputs are infinite. Note that this is not always possible; i.e.
553-
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`],
554-
/// [`JoinType::RightSemi`], and [`JoinType::RightMark`] can not run with an
555-
/// unbounded left side, even if we swap join sides. Therefore, we do not consider
556-
/// them here. This function is crate public as it is useful for downstream projects
552+
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
553+
/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
554+
/// we swap join sides. Therefore, we do not consider them here.
555+
/// This function is crate public as it is useful for downstream projects
557556
/// to implement, or experiment with, their own join selection rules.
558557
pub(crate) fn swap_join_according_to_unboundedness(
559558
hash_join: &HashJoinExec,
@@ -563,11 +562,7 @@ pub(crate) fn swap_join_according_to_unboundedness(
563562
match (*partition_mode, *join_type) {
564563
(
565564
_,
566-
JoinType::Right
567-
| JoinType::RightSemi
568-
| JoinType::RightAnti
569-
| JoinType::RightMark
570-
| JoinType::Full,
565+
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
571566
) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
572567
(PartitionMode::Partitioned, _) => {
573568
hash_join.swap_inputs(PartitionMode::Partitioned)

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -618,16 +618,13 @@ impl HashJoinExec {
618618
partition_mode,
619619
self.null_equality(),
620620
)?;
621-
622-
// In case of Anti/Semi/Mark joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
621+
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
623622
if matches!(
624623
self.join_type(),
625624
JoinType::LeftSemi
626625
| JoinType::RightSemi
627626
| JoinType::LeftAnti
628627
| JoinType::RightAnti
629-
| JoinType::LeftMark
630-
| JoinType::RightMark
631628
) || self.projection.is_some()
632629
{
633630
Ok(Arc::new(new_join))

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,16 +371,14 @@ impl NestedLoopJoinExec {
371371
),
372372
)?;
373373

374-
// For Semi/Anti/Mark joins, swap result will produce same output schema,
374+
// For Semi/Anti joins, swap result will produce same output schema,
375375
// no need to wrap them into additional projection
376376
let plan: Arc<dyn ExecutionPlan> = if matches!(
377377
self.join_type(),
378378
JoinType::LeftSemi
379379
| JoinType::RightSemi
380380
| JoinType::LeftAnti
381381
| JoinType::RightAnti
382-
| JoinType::LeftMark
383-
| JoinType::RightMark
384382
) || self.projection.is_some()
385383
{
386384
Arc::new(new_join)

0 commit comments

Comments
 (0)