Skip to content

Commit 4df3b6e

Browse files
authored
chore: Operator serde refactor part 2 (apache#2741)
1 parent fa55a30 commit 4df3b6e

File tree

13 files changed

+168
-96
lines changed

13 files changed

+168
-96
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ object CometConf extends ShimCometConf {
7979

8080
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
8181

82+
val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator";
83+
8284
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
8385
.category(CATEGORY_EXEC)
8486
.doc(
@@ -747,6 +749,18 @@ object CometConf extends ShimCometConf {
747749
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible"
748750
}
749751

752+
def isOperatorAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = {
753+
getBooleanConf(getOperatorAllowIncompatConfigKey(name), defaultValue = false, conf)
754+
}
755+
756+
def getOperatorAllowIncompatConfigKey(name: String): String = {
757+
s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.$name.allowIncompatible"
758+
}
759+
760+
def getOperatorAllowIncompatConfigKey(exprClass: Class[_]): String = {
761+
s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible"
762+
}
763+
750764
def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = {
751765
conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true"
752766
}

spark/src/main/scala/org/apache/comet/serde/CometOperatorSerde.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,22 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
2929
*/
3030
trait CometOperatorSerde[T <: SparkPlan] {
3131

32+
/**
33+
* Get the optional Comet configuration entry that is used to enable or disable native support
34+
* for this operator.
35+
*/
36+
def enabledConfig: Option[ConfigEntry[Boolean]]
37+
38+
/**
39+
* Determine the support level of the operator based on its attributes.
40+
*
41+
* @param operator
42+
* The Spark operator.
43+
* @return
44+
* Support level (Compatible, Incompatible, or Unsupported).
45+
*/
46+
def getSupportLevel(operator: T): SupportLevel = Compatible(None)
47+
3248
/**
3349
* Convert a Spark operator into a protocol buffer representation that can be passed into native
3450
* code.
@@ -49,9 +65,4 @@ trait CometOperatorSerde[T <: SparkPlan] {
4965
builder: Operator.Builder,
5066
childOp: Operator*): Option[OperatorOuterClass.Operator]
5167

52-
/**
53-
* Get the optional Comet configuration entry that is used to enable or disable native support
54-
* for this operator.
55-
*/
56-
def enabledConfig: Option[ConfigEntry[Boolean]]
5768
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 75 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
4747
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
4848
import org.apache.comet.serde.Types.DataType._
4949
import org.apache.comet.serde.literals.CometLiteral
50-
import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder}
50+
import org.apache.comet.serde.operator._
5151
import org.apache.comet.shims.CometExprShim
5252

5353
/**
@@ -61,6 +61,15 @@ object QueryPlanSerde extends Logging with CometExprShim {
6161
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
6262
Map(
6363
classOf[ProjectExec] -> CometProject,
64+
classOf[FilterExec] -> CometFilter,
65+
classOf[LocalLimitExec] -> CometLocalLimit,
66+
classOf[GlobalLimitExec] -> CometGlobalLimit,
67+
classOf[ExpandExec] -> CometExpand,
68+
classOf[HashAggregateExec] -> CometHashAggregate,
69+
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
70+
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
71+
classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
72+
classOf[SortMergeJoinExec] -> CometSortMergeJoin,
6473
classOf[SortExec] -> CometSort,
6574
classOf[LocalTableScanExec] -> CometLocalTableScan)
6675

@@ -924,51 +933,30 @@ object QueryPlanSerde extends Logging with CometExprShim {
924933
val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
925934
childOp.foreach(builder.addChildren)
926935

936+
// look for registered handler first
937+
val serde = opSerdeMap.get(op.getClass)
938+
serde match {
939+
case Some(handler) if isOperatorEnabled(handler, op) =>
940+
val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
941+
val maybeConverted = opSerde.convert(op, builder, childOp: _*)
942+
if (maybeConverted.isDefined) {
943+
return maybeConverted
944+
}
945+
case _ =>
946+
}
947+
948+
// now handle special cases that cannot be handled as a simple mapping from class name
949+
// and see if operator can be used as a sink
927950
op match {
928951

929952
// Fully native scan for V1
930953
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
931954
CometNativeScan.convert(scan, builder, childOp: _*)
932955

933-
case filter: FilterExec if CometConf.COMET_EXEC_FILTER_ENABLED.get(conf) =>
934-
CometFilter.convert(filter, builder, childOp: _*)
935-
936-
case limit: LocalLimitExec if CometConf.COMET_EXEC_LOCAL_LIMIT_ENABLED.get(conf) =>
937-
CometLocalLimit.convert(limit, builder, childOp: _*)
938-
939-
case globalLimitExec: GlobalLimitExec
940-
if CometConf.COMET_EXEC_GLOBAL_LIMIT_ENABLED.get(conf) =>
941-
CometGlobalLimit.convert(globalLimitExec, builder, childOp: _*)
942-
943-
case expand: ExpandExec if CometConf.COMET_EXEC_EXPAND_ENABLED.get(conf) =>
944-
CometExpand.convert(expand, builder, childOp: _*)
945-
946956
case _: WindowExec if CometConf.COMET_EXEC_WINDOW_ENABLED.get(conf) =>
947957
withInfo(op, "Window expressions are not supported")
948958
None
949959

950-
case aggregate: HashAggregateExec if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) =>
951-
CometHashAggregate.convert(aggregate, builder, childOp: _*)
952-
953-
case aggregate: ObjectHashAggregateExec
954-
if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) =>
955-
CometObjectHashAggregate.convert(aggregate, builder, childOp: _*)
956-
957-
case join: BroadcastHashJoinExec
958-
if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) =>
959-
CometBroadcastHashJoin.convert(join, builder, childOp: _*)
960-
961-
case join: ShuffledHashJoinExec if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) =>
962-
CometShuffleHashJoin.convert(join, builder, childOp: _*)
963-
964-
case join: SortMergeJoinExec =>
965-
if (CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf)) {
966-
CometSortMergeJoin.convert(join, builder, childOp: _*)
967-
} else {
968-
withInfo(join, "SortMergeJoin is not enabled")
969-
None
970-
}
971-
972960
case op if isCometSink(op) =>
973961
val supportedTypes =
974962
op.output.forall(a => supportedDataType(a.dataType, allowComplex = true))
@@ -1023,28 +1011,58 @@ object QueryPlanSerde extends Logging with CometExprShim {
10231011
None
10241012
}
10251013

1026-
case op =>
1027-
opSerdeMap.get(op.getClass) match {
1028-
case Some(handler) =>
1029-
handler.enabledConfig.foreach { enabledConfig =>
1030-
if (!enabledConfig.get(op.conf)) {
1031-
withInfo(
1032-
op,
1033-
s"Native support for operator ${op.getClass.getSimpleName} is disabled. " +
1034-
s"Set ${enabledConfig.key}=true to enable it.")
1035-
return None
1036-
}
1037-
}
1038-
handler.asInstanceOf[CometOperatorSerde[SparkPlan]].convert(op, builder, childOp: _*)
1039-
case _ =>
1040-
// Emit warning if:
1041-
// 1. it is not Spark shuffle operator, which is handled separately
1042-
// 2. it is not a Comet operator
1043-
if (!op.nodeName.contains("Comet") && !op.isInstanceOf[ShuffleExchangeExec]) {
1044-
withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
1045-
}
1046-
None
1014+
case _ =>
1015+
// Emit warning if:
1016+
// 1. it is not Spark shuffle operator, which is handled separately
1017+
// 2. it is not a Comet operator
1018+
if (serde.isEmpty && !op.nodeName.contains("Comet") &&
1019+
!op.isInstanceOf[ShuffleExchangeExec]) {
1020+
withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
10471021
}
1022+
None
1023+
}
1024+
}
1025+
1026+
private def isOperatorEnabled(handler: CometOperatorSerde[_], op: SparkPlan): Boolean = {
1027+
val enabled = handler.enabledConfig.forall(_.get(op.conf))
1028+
val opName = op.getClass.getSimpleName
1029+
if (enabled) {
1030+
val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
1031+
opSerde.getSupportLevel(op) match {
1032+
case Unsupported(notes) =>
1033+
withInfo(op, notes.getOrElse(""))
1034+
false
1035+
case Incompatible(notes) =>
1036+
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
1037+
val incompatConf = CometConf.getOperatorAllowIncompatConfigKey(opName)
1038+
if (allowIncompat) {
1039+
if (notes.isDefined) {
1040+
logWarning(
1041+
s"Comet supports $opName when $incompatConf=true " +
1042+
s"but has notes: ${notes.get}")
1043+
}
1044+
true
1045+
} else {
1046+
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
1047+
withInfo(
1048+
op,
1049+
s"$opName is not fully compatible with Spark$optionalNotes. " +
1050+
s"To enable it anyway, set $incompatConf=true. " +
1051+
s"${CometConf.COMPAT_GUIDE}.")
1052+
false
1053+
}
1054+
case Compatible(notes) =>
1055+
if (notes.isDefined) {
1056+
logWarning(s"Comet supports $opName but has notes: ${notes.get}")
1057+
}
1058+
true
1059+
}
1060+
} else {
1061+
withInfo(
1062+
op,
1063+
s"Native support for operator $opName is disabled. " +
1064+
s"Set ${handler.enabledConfig.get.key}=true to enable it.")
1065+
false
10481066
}
10491067
}
10501068

@@ -1144,25 +1162,3 @@ object QueryPlanSerde extends Logging with CometExprShim {
11441162
}
11451163

11461164
}
1147-
1148-
sealed trait SupportLevel
1149-
1150-
/**
1151-
* Comet either supports this feature with full compatibility with Spark, or may have known
1152-
* differences in some specific edge cases that are unlikely to be an issue for most users.
1153-
*
1154-
* Any compatibility differences are noted in the
1155-
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
1156-
*/
1157-
case class Compatible(notes: Option[String] = None) extends SupportLevel
1158-
1159-
/**
1160-
* Comet supports this feature but results can be different from Spark.
1161-
*
1162-
* Any compatibility differences are noted in the
1163-
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
1164-
*/
1165-
case class Incompatible(notes: Option[String] = None) extends SupportLevel
1166-
1167-
/** Comet does not support this feature */
1168-
case class Unsupported(notes: Option[String] = None) extends SupportLevel
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
sealed trait SupportLevel
23+
24+
/**
25+
* Comet either supports this feature with full compatibility with Spark, or may have known
26+
* differences in some specific edge cases that are unlikely to be an issue for most users.
27+
*
28+
* Any compatibility differences are noted in the
29+
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
30+
*/
31+
case class Compatible(notes: Option[String] = None) extends SupportLevel
32+
33+
/**
34+
* Comet supports this feature but results can be different from Spark.
35+
*
36+
* Any compatibility differences are noted in the
37+
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
38+
*/
39+
case class Incompatible(notes: Option[String] = None) extends SupportLevel
40+
41+
/** Comet does not support this feature */
42+
case class Unsupported(notes: Option[String] = None) extends SupportLevel

spark/src/main/scala/org/apache/comet/serde/CometAggregate.scala renamed to spark/src/main/scala/org/apache/comet/serde/operator/CometAggregate.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde
20+
package org.apache.comet.serde.operator
2121

2222
import scala.jdk.CollectionConverters._
2323

@@ -28,6 +28,7 @@ import org.apache.spark.sql.types.MapType
2828

2929
import org.apache.comet.{CometConf, ConfigEntry}
3030
import org.apache.comet.CometSparkSessionExtensions.withInfo
31+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
3132
import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator}
3233
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto}
3334

spark/src/main/scala/org/apache/comet/serde/CometExpand.scala renamed to spark/src/main/scala/org/apache/comet/serde/operator/CometExpand.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde
20+
package org.apache.comet.serde.operator
2121

2222
import scala.jdk.CollectionConverters._
2323

@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.ExpandExec
2626

2727
import org.apache.comet.{CometConf, ConfigEntry}
2828
import org.apache.comet.CometSparkSessionExtensions.withInfo
29+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
2930
import org.apache.comet.serde.OperatorOuterClass.Operator
3031
import org.apache.comet.serde.QueryPlanSerde.exprToProto
3132

spark/src/main/scala/org/apache/comet/serde/CometFilter.scala renamed to spark/src/main/scala/org/apache/comet/serde/operator/CometFilter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde
20+
package org.apache.comet.serde.operator
2121

2222
import org.apache.spark.sql.execution.FilterExec
2323

2424
import org.apache.comet.{CometConf, ConfigEntry}
2525
import org.apache.comet.CometSparkSessionExtensions.withInfo
26+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
2627
import org.apache.comet.serde.OperatorOuterClass.Operator
2728
import org.apache.comet.serde.QueryPlanSerde.exprToProto
2829

spark/src/main/scala/org/apache/comet/serde/CometGlobalLimit.scala renamed to spark/src/main/scala/org/apache/comet/serde/operator/CometGlobalLimit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde
20+
package org.apache.comet.serde.operator
2121

2222
import org.apache.spark.sql.execution.GlobalLimitExec
2323

2424
import org.apache.comet.{CometConf, ConfigEntry}
2525
import org.apache.comet.CometSparkSessionExtensions.withInfo
26+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
2627
import org.apache.comet.serde.OperatorOuterClass.Operator
2728

2829
object CometGlobalLimit extends CometOperatorSerde[GlobalLimitExec] {

spark/src/main/scala/org/apache/comet/serde/CometHashJoin.scala renamed to spark/src/main/scala/org/apache/comet/serde/operator/CometHashJoin.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.serde
20+
package org.apache.comet.serde.operator
2121

2222
import scala.jdk.CollectionConverters._
2323

2424
import org.apache.spark.sql.catalyst.expressions.Expression
2525
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
26-
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
26+
import org.apache.spark.sql.catalyst.plans._
2727
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec}
2828

2929
import org.apache.comet.{CometConf, ConfigEntry}
3030
import org.apache.comet.CometSparkSessionExtensions.withInfo
31+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
3132
import org.apache.comet.serde.OperatorOuterClass.{BuildSide, JoinType, Operator}
3233
import org.apache.comet.serde.QueryPlanSerde.exprToProto
3334

0 commit comments

Comments
 (0)