Skip to content

Commit 2274ee5

Browse files
authored
perf: Improve BroadcastExchangeExec conversion (#2417)
1 parent ba4e362 commit 2274ee5

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
154154
operator2Proto(op).map(fun).getOrElse(op)
155155
}
156156

157-
plan.transformUp {
157+
def convertNode(op: SparkPlan): SparkPlan = op match {
158158
// Fully native scan for V1
159159
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
160160
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
@@ -446,7 +446,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
446446
case other => other
447447
}
448448
if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) {
449-
val newPlan = apply(plan.withNewChildren(newChildren))
449+
val newPlan = convertNode(plan.withNewChildren(newChildren))
450450
if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
451451
newPlan
452452
} else {
@@ -554,6 +554,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
554554
}
555555
}
556556
}
557+
558+
plan.transformUp { case op =>
559+
convertNode(op)
560+
}
557561
}
558562

559563
private def normalizePlan(plan: SparkPlan): SparkPlan = {

0 commit comments

Comments
 (0)