Skip to content

Commit cc30361

Browse files
committed
fix: Do not replace SMJ with HJ for LeftSemi
1 parent 0ba787f commit cc30361

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -635,15 +635,20 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
635635
plan
636636
}
637637
} else {
638-
val normalizedPlan = if (CometConf.COMET_REPLACE_SMJ.get()) {
639-
normalizePlan(plan).transformUp { case p =>
638+
val normalizedPlan = normalizePlan(plan)
639+
640+
val planWithJoinRewritten = if (CometConf.COMET_REPLACE_SMJ.get()) {
641+
normalizedPlan.transformUp { case p =>
640642
RewriteJoin.rewrite(p)
641643
}
642644
} else {
643-
normalizePlan(plan)
645+
normalizedPlan
644646
}
645647

646-
var newPlan = transform(normalizedPlan)
648+
println(normalizedPlan)
649+
println(planWithJoinRewritten)
650+
651+
var newPlan = transform(planWithJoinRewritten)
647652

648653
// if the plan cannot be run fully natively then explain why (when appropriate
649654
// config is enabled)

spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
package org.apache.comet.rules
2121

2222
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper}
23-
import org.apache.spark.sql.catalyst.plans.LeftAnti
23+
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
2424
import org.apache.spark.sql.catalyst.plans.logical.Join
2525
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
2626
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
2727

28+
import org.apache.comet.CometSparkSessionExtensions.withInfo
29+
2830
/**
2931
* Adapted from equivalent rule in Apache Gluten.
3032
*
@@ -65,8 +67,12 @@ object RewriteJoin extends JoinSelectionHelper {
6567
def rewrite(plan: SparkPlan): SparkPlan = plan match {
6668
case smj: SortMergeJoinExec =>
6769
getSmjBuildSide(smj) match {
68-
case Some(BuildRight) if smj.joinType == LeftAnti =>
69-
// https://github.com/apache/datafusion-comet/issues/457
70+
case Some(BuildRight) if smj.joinType == LeftAnti || smj.joinType == LeftSemi =>
71+
withInfo(
72+
smj,
73+
"Cannot rewrite SortMergeJoin to HashJoin: BuildRight with LeftSemi/LeftAnti is not supported")
74+
// LeftAnti https://github.com/apache/datafusion-comet/issues/457
75+
// LeftSemi https://github.com/apache/datafusion-comet/issues/2667
7076
plan
7177
case Some(buildSide) =>
7278
ShuffledHashJoinExec(

0 commit comments

Comments
 (0)