Skip to content

Commit cf56fc1

Browse files
authored
fix: Do not replace SMJ with HJ for LeftSemi (#2687)
* fix: Do not replace SMJ with HJ for `LeftSemi`
1 parent 2ee198a commit cf56fc1

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -635,15 +635,17 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
635635
plan
636636
}
637637
} else {
638-
val normalizedPlan = if (CometConf.COMET_REPLACE_SMJ.get()) {
639-
normalizePlan(plan).transformUp { case p =>
638+
val normalizedPlan = normalizePlan(plan)
639+
640+
val planWithJoinRewritten = if (CometConf.COMET_REPLACE_SMJ.get()) {
641+
normalizedPlan.transformUp { case p =>
640642
RewriteJoin.rewrite(p)
641643
}
642644
} else {
643-
normalizePlan(plan)
645+
normalizedPlan
644646
}
645647

646-
var newPlan = transform(normalizedPlan)
648+
var newPlan = transform(planWithJoinRewritten)
647649

648650
// if the plan cannot be run fully natively then explain why (when appropriate
649651
// config is enabled)

spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
package org.apache.comet.rules
2121

2222
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper}
23-
import org.apache.spark.sql.catalyst.plans.LeftAnti
23+
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
2424
import org.apache.spark.sql.catalyst.plans.logical.Join
2525
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
2626
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
2727

28+
import org.apache.comet.CometSparkSessionExtensions.withInfo
29+
2830
/**
2931
* Adapted from equivalent rule in Apache Gluten.
3032
*
@@ -65,8 +67,13 @@ object RewriteJoin extends JoinSelectionHelper {
6567
def rewrite(plan: SparkPlan): SparkPlan = plan match {
6668
case smj: SortMergeJoinExec =>
6769
getSmjBuildSide(smj) match {
68-
case Some(BuildRight) if smj.joinType == LeftAnti =>
69-
// https://github.com/apache/datafusion-comet/issues/457
70+
case Some(BuildRight) if smj.joinType == LeftAnti || smj.joinType == LeftSemi =>
71+
// LeftAnti https://github.com/apache/datafusion-comet/issues/457
72+
// LeftSemi https://github.com/apache/datafusion-comet/issues/2667
73+
withInfo(
74+
smj,
75+
"Cannot rewrite SortMergeJoin to HashJoin: " +
76+
s"BuildRight with ${smj.joinType} is not supported")
7077
plan
7178
case Some(buildSide) =>
7279
ShuffledHashJoinExec(

0 commit comments

Comments
 (0)