Skip to content

Commit ecaa4ae

Browse files
authored
chore: Refactor CometExecRule handling of BroadcastHashJoin and fix fallback reporting (#2856)
1 parent b5c4b33 commit ecaa4ae

File tree

98 files changed

+216
-258
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+216
-258
lines changed

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 8 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHash
3636
import org.apache.spark.sql.execution.window.WindowExec
3737
import org.apache.spark.sql.types._
3838

39-
import org.apache.comet.{CometConf, ExtendedExplainInfo}
39+
import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
4040
import org.apache.comet.CometSparkSessionExtensions._
4141
import org.apache.comet.rules.CometExecRule.allExecs
4242
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, Unsupported}
43-
import org.apache.comet.serde.OperatorOuterClass.Operator
4443
import org.apache.comet.serde.operator._
4544
import org.apache.comet.serde.operator.CometDataWritingCommand
4645

@@ -206,27 +205,20 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
206205
// broadcast exchange is forced to be enabled by Comet config.
207206
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
208207
val newChildren = plan.children.map {
209-
case b: BroadcastExchangeExec
210-
if isCometNative(b.child) &&
211-
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
212-
operator2Proto(b) match {
213-
case Some(nativeOp) =>
214-
val cometOp = CometBroadcastExchangeExec(b, b.output, b.mode, b.child)
215-
CometSinkPlaceHolder(nativeOp, b, cometOp)
216-
case None => b
217-
}
208+
case b: BroadcastExchangeExec =>
209+
convertToCometIfAllChildrenAreNative(b, CometBroadcastExchangeExec).getOrElse(b)
218210
case other => other
219211
}
220212
if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) {
221213
val newPlan = convertNode(plan.withNewChildren(newChildren))
222214
if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
223215
newPlan
224216
} else {
225-
if (isCometNative(newPlan)) {
226-
val reason =
227-
getCometBroadcastNotEnabledReason(conf).getOrElse("no reason available")
228-
withInfo(plan, s"Broadcast is not enabled: $reason")
229-
}
217+
// copy fallback reasons to the original plan
218+
newPlan
219+
.getTagValue(CometExplainInfo.EXTENSION_INFO)
220+
.foreach(reasons => withInfos(plan, reasons))
221+
// return the original plan
230222
plan
231223
}
232224
} else {
@@ -457,59 +449,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
457449
}
458450
}
459451

460-
/**
461-
* Fallback for handling sinks that have not been handled explicitly. This method should
462-
* eventually be removed once CometExecRule fully uses the operator serde framework.
463-
*/
464-
private def operator2Proto(op: SparkPlan, childOp: Operator*): Option[Operator] = {
465-
466-
def isCometSink(op: SparkPlan): Boolean = {
467-
op match {
468-
case _: CometSparkToColumnarExec => true
469-
case _: CometSinkPlaceHolder => true
470-
case _ => false
471-
}
472-
}
473-
474-
def isExchangeSink(op: SparkPlan): Boolean = {
475-
op match {
476-
case _: ShuffleExchangeExec => true
477-
case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
478-
case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
479-
true
480-
case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
481-
case BroadcastQueryStageExec(
482-
_,
483-
ReusedExchangeExec(_, _: CometBroadcastExchangeExec),
484-
_) =>
485-
true
486-
case _: BroadcastExchangeExec => true
487-
case _ => false
488-
}
489-
}
490-
491-
val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
492-
childOp.foreach(builder.addChildren)
493-
494-
op match {
495-
case op if isExchangeSink(op) =>
496-
CometExchangeSink.convert(op, builder, childOp: _*)
497-
498-
case op if isCometSink(op) =>
499-
CometScanWrapper.convert(op, builder, childOp: _*)
500-
501-
case _ =>
502-
// Emit warning if:
503-
// 1. it is not Spark shuffle operator, which is handled separately
504-
// 2. it is not a Comet operator
505-
if (!op.nodeName.contains("Comet") &&
506-
!op.isInstanceOf[ShuffleExchangeExec]) {
507-
withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
508-
}
509-
None
510-
}
511-
}
512-
513452
/**
514453
* Convert a Spark plan to a Comet plan using the specified serde handler, but only if all
515454
* children are native.

spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.comet.util.Utils
3636
import org.apache.spark.sql.errors.QueryExecutionErrors
3737
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, SQLExecution}
3838
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec}
39-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
39+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ReusedExchangeExec}
4040
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
4141
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
4242
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -45,7 +45,9 @@ import org.apache.spark.util.io.ChunkedByteBuffer
4545

4646
import com.google.common.base.Objects
4747

48-
import org.apache.comet.CometRuntimeException
48+
import org.apache.comet.{CometConf, CometRuntimeException, ConfigEntry}
49+
import org.apache.comet.serde.OperatorOuterClass
50+
import org.apache.comet.serde.operator.CometSink
4951
import org.apache.comet.shims.ShimCometBroadcastExchangeExec
5052

5153
/**
@@ -262,7 +264,24 @@ case class CometBroadcastExchangeExec(
262264
copy(child = newChild)
263265
}
264266

265-
object CometBroadcastExchangeExec {
267+
object CometBroadcastExchangeExec extends CometSink[BroadcastExchangeExec] {
268+
269+
/**
270+
* Exchange data is FFI safe because there is no use of mutable buffers involved.
271+
*
272+
* Source of broadcast exchange batches is ArrowStreamReader.
273+
*/
274+
override def isFfiSafe: Boolean = true
275+
276+
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
277+
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED)
278+
279+
override def createExec(
280+
nativeOp: OperatorOuterClass.Operator,
281+
b: BroadcastExchangeExec): CometNativeExec = {
282+
CometSinkPlaceHolder(nativeOp, b, CometBroadcastExchangeExec(b, b.output, b.mode, b.child))
283+
}
284+
266285
private[comet] val executionContext = ExecutionContext.fromExecutorService(
267286
ThreadUtils.newDaemonCachedThreadPool(
268287
"comet-broadcast-exchange",

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ TakeOrderedAndProject
1010
: :- Project
1111
: : +- Filter
1212
: : +- BroadcastHashJoin
13-
: : :- BroadcastHashJoin
13+
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
1414
: : : :- CometColumnarToRow
1515
: : : : +- CometBroadcastHashJoin
1616
: : : : :- CometFilter

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ TakeOrderedAndProject
1010
: :- Project
1111
: : +- Filter
1212
: : +- BroadcastHashJoin
13-
: : :- BroadcastHashJoin
13+
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
1414
: : : :- CometColumnarToRow
1515
: : : : +- CometBroadcastHashJoin
1616
: : : : :- CometFilter

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q24a.native_iceberg_compat/extended.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Filter
99
: +- CometColumnarExchange
1010
: +- HashAggregate
1111
: +- Project
12-
: +- BroadcastHashJoin
12+
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
1313
: :- CometColumnarToRow
1414
: : +- CometProject
1515
: : +- CometBroadcastHashJoin
@@ -55,7 +55,7 @@ Filter
5555
+- CometColumnarExchange
5656
+- HashAggregate
5757
+- Project
58-
+- BroadcastHashJoin
58+
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
5959
:- CometColumnarToRow
6060
: +- CometProject
6161
: +- CometBroadcastHashJoin

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q24a/extended.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Filter
99
: +- CometColumnarExchange
1010
: +- HashAggregate
1111
: +- Project
12-
: +- BroadcastHashJoin
12+
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
1313
: :- CometColumnarToRow
1414
: : +- CometProject
1515
: : +- CometBroadcastHashJoin
@@ -55,7 +55,7 @@ Filter
5555
+- CometColumnarExchange
5656
+- HashAggregate
5757
+- Project
58-
+- BroadcastHashJoin
58+
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
5959
:- CometColumnarToRow
6060
: +- CometProject
6161
: +- CometBroadcastHashJoin

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q24b.native_iceberg_compat/extended.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Filter
99
: +- CometColumnarExchange
1010
: +- HashAggregate
1111
: +- Project
12-
: +- BroadcastHashJoin
12+
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
1313
: :- CometColumnarToRow
1414
: : +- CometProject
1515
: : +- CometBroadcastHashJoin
@@ -55,7 +55,7 @@ Filter
5555
+- CometColumnarExchange
5656
+- HashAggregate
5757
+- Project
58-
+- BroadcastHashJoin
58+
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
5959
:- CometColumnarToRow
6060
: +- CometProject
6161
: +- CometBroadcastHashJoin

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q24b/extended.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Filter
99
: +- CometColumnarExchange
1010
: +- HashAggregate
1111
: +- Project
12-
: +- BroadcastHashJoin
12+
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
1313
: :- CometColumnarToRow
1414
: : +- CometProject
1515
: : +- CometBroadcastHashJoin
@@ -55,7 +55,7 @@ Filter
5555
+- CometColumnarExchange
5656
+- HashAggregate
5757
+- Project
58-
+- BroadcastHashJoin
58+
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
5959
:- CometColumnarToRow
6060
: +- CometProject
6161
: +- CometBroadcastHashJoin

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28.native_iceberg_compat/extended.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
BroadcastNestedLoopJoin
2-
:- BroadcastNestedLoopJoin
3-
: :- BroadcastNestedLoopJoin
4-
: : :- BroadcastNestedLoopJoin
5-
: : : :- BroadcastNestedLoopJoin
1+
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2+
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3+
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4+
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
5+
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
66
: : : : :- CometColumnarToRow
77
: : : : : +- CometHashAggregate
88
: : : : : +- CometColumnarExchange

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28/extended.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
BroadcastNestedLoopJoin
2-
:- BroadcastNestedLoopJoin
3-
: :- BroadcastNestedLoopJoin
4-
: : :- BroadcastNestedLoopJoin
5-
: : : :- BroadcastNestedLoopJoin
1+
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
2+
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
3+
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
4+
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
5+
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
66
: : : : :- CometColumnarToRow
77
: : : : : +- CometHashAggregate
88
: : : : : +- CometColumnarExchange

0 commit comments

Comments
 (0)