Skip to content

Commit 95c8c7d

Browse files
authored
minor: Small refactor in CometExecRule to remove confusing code and fix more fallback reporting (#2860)
1 parent 60f420e commit 95c8c7d

File tree

13 files changed

+88
-95
lines changed

13 files changed

+88
-95
lines changed

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 28 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -191,22 +191,22 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
191191

192192
// For AQE broadcast stage on a Comet broadcast exchange
193193
case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
194-
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
194+
convertToComet(s, CometExchangeSink).getOrElse(s)
195195

196196
case s @ BroadcastQueryStageExec(
197197
_,
198198
ReusedExchangeExec(_, _: CometBroadcastExchangeExec),
199199
_) =>
200-
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
200+
convertToComet(s, CometExchangeSink).getOrElse(s)
201201

202202
// `CometBroadcastExchangeExec`'s broadcast output is not compatible with Spark's broadcast
203203
// exchange. It is only used for Comet native execution. We only transform Spark broadcast
204204
// exchange to Comet broadcast exchange if its downstream is a Comet native plan or if the
205205
// broadcast exchange is forced to be enabled by Comet config.
206206
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
207207
val newChildren = plan.children.map {
208-
case b: BroadcastExchangeExec =>
209-
convertToCometIfAllChildrenAreNative(b, CometBroadcastExchangeExec).getOrElse(b)
208+
case b: BroadcastExchangeExec if b.children.forall(_.isInstanceOf[CometNativeExec]) =>
209+
convertToComet(b, CometBroadcastExchangeExec).getOrElse(b)
210210
case other => other
211211
}
212212
if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) {
@@ -227,42 +227,49 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
227227

228228
// For AQE shuffle stage on a Comet shuffle exchange
229229
case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
230-
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
230+
convertToComet(s, CometExchangeSink).getOrElse(s)
231231

232232
// For AQE shuffle stage on a reused Comet shuffle exchange
233233
// Note that we don't need to handle `ReusedExchangeExec` for non-AQE case, because
234234
// the query plan won't be re-optimized/planned in non-AQE mode.
235235
case s @ ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
236-
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
236+
convertToComet(s, CometExchangeSink).getOrElse(s)
237237

238238
case s: ShuffleExchangeExec =>
239239
convertToComet(s, CometShuffleExchangeExec).getOrElse(s)
240240

241241
case op =>
242-
val handler = allExecs
243-
.get(op.getClass)
244-
.map(_.asInstanceOf[CometOperatorSerde[SparkPlan]])
245-
handler match {
246-
case Some(handler) =>
247-
return convertToCometIfAllChildrenAreNative(op, handler).getOrElse(op)
248-
case _ =>
242+
// if all children are native (or if this is a leaf node) then see if there is a
243+
// registered handler for creating a fully native plan
244+
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
245+
val handler = allExecs
246+
.get(op.getClass)
247+
.map(_.asInstanceOf[CometOperatorSerde[SparkPlan]])
248+
handler match {
249+
case Some(handler) =>
250+
return convertToComet(op, handler).getOrElse(op)
251+
case _ =>
252+
}
249253
}
250254

251255
op match {
252256
case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec |
253-
_: BroadcastQueryStageExec | _: AdaptiveSparkPlanExec =>
257+
_: BroadcastQueryStageExec | _: AdaptiveSparkPlanExec | _: ExecutedCommandExec |
258+
_: V2CommandExec =>
254259
// Some execs should never be replaced. We include
255260
// these cases specially here so we do not add a misleading 'info' message
256261
op
257-
case _: ExecutedCommandExec | _: V2CommandExec =>
258-
// Some execs that comet will not accelerate, such as command execs.
259-
op
260262
case _ =>
261-
if (!hasExplainInfo(op)) {
262-
// An operator that is not supported by Comet
263+
// The operator was not converted to a Comet plan. Possible reasons for this happening:
264+
// 1. Comet does not support this operator.
265+
// 2. The operator could not be supported based on query context and current
266+
// configs. In this case, it should have already been tagged with fallback
267+
// reasons.
268+
// 3. The operator has children that could not be converted, so execution
269+
// has already fallen back to Spark.
270+
if (op.children.forall(_.isInstanceOf[CometNativeExec]) && !hasExplainInfo(op)) {
263271
withInfo(op, s"${op.nodeName} is not supported")
264272
} else {
265-
// Already has fallback reason, do not override it
266273
op
267274
}
268275
}
@@ -449,26 +456,12 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
449456
}
450457
}
451458

452-
/**
453-
* Convert a Spark plan to a Comet plan using the specified serde handler, but only if all
454-
* children are native.
455-
*/
456-
private def convertToCometIfAllChildrenAreNative(
457-
op: SparkPlan,
458-
handler: CometOperatorSerde[_]): Option[SparkPlan] = {
459-
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
460-
convertToComet(op, handler)
461-
} else {
462-
None
463-
}
464-
}
465-
466459
/** Convert a Spark plan to a Comet plan using the specified serde handler */
467460
private def convertToComet(op: SparkPlan, handler: CometOperatorSerde[_]): Option[SparkPlan] = {
468461
val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
469462
if (isOperatorEnabled(serde, op)) {
470463
val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
471-
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
464+
if (op.children.nonEmpty && op.children.forall(_.isInstanceOf[CometNativeExec])) {
472465
val childOp = op.children.map(_.asInstanceOf[CometNativeExec].nativeOp)
473466
childOp.foreach(builder.addChildren)
474467
return serde

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28.native_iceberg_compat/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
55
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
66
: : : : :- CometColumnarToRow
77
: : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
55
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
66
: : : : :- CometColumnarToRow
77
: : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88.native_iceberg_compat/extended.txt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
5-
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
6-
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
5+
: : : :- BroadcastNestedLoopJoin
6+
: : : : :- BroadcastNestedLoopJoin
77
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
88
: : : : : : :- CometColumnarToRow
99
: : : : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88/extended.txt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
5-
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
6-
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
5+
: : : :- BroadcastNestedLoopJoin
6+
: : : : :- BroadcastNestedLoopJoin
77
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
88
: : : : : : :- CometColumnarToRow
99
: : : : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28.native_iceberg_compat/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
55
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
66
: : : : :- CometColumnarToRow
77
: : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
55
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
66
: : : : :- CometColumnarToRow
77
: : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88.native_iceberg_compat/extended.txt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
5-
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
6-
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
5+
: : : :- BroadcastNestedLoopJoin
6+
: : : : :- BroadcastNestedLoopJoin
77
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
88
: : : : : : :- CometColumnarToRow
99
: : : : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88/extended.txt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
5-
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
6-
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
5+
: : : :- BroadcastNestedLoopJoin
6+
: : : : :- BroadcastNestedLoopJoin
77
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
88
: : : : : : :- CometColumnarToRow
99
: : : : : : : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28.native_iceberg_compat/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2-
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3-
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4-
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
1+
BroadcastNestedLoopJoin
2+
:- BroadcastNestedLoopJoin
3+
: :- BroadcastNestedLoopJoin
4+
: : :- BroadcastNestedLoopJoin
55
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
66
: : : : :- CometColumnarToRow
77
: : : : : +- CometHashAggregate

0 commit comments

Comments
 (0)