Skip to content
Open
14 changes: 14 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ object CometConf extends ShimCometConf {

val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";

val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator";

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.category(CATEGORY_EXEC)
.doc(
Expand Down Expand Up @@ -747,6 +749,18 @@ object CometConf extends ShimCometConf {
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible"
}

def isOperatorAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = {
getBooleanConf(getOperatorAllowIncompatConfigKey(name), defaultValue = false, conf)
}

def getOperatorAllowIncompatConfigKey(name: String): String = {
s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.$name.allowIncompatible"
}

def getOperatorAllowIncompatConfigKey(exprClass: Class[_]): String = {
s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible"
}

def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = {
conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
*/
trait CometOperatorSerde[T <: SparkPlan] {

/**
* Get the optional Comet configuration entry that is used to enable or disable native support
* for this operator.
*/
def enabledConfig: Option[ConfigEntry[Boolean]]

/**
* Determine the support level of the operator based on its attributes.
*
* @param operator
* The Spark operator.
* @return
* Support level (Compatible, Incompatible, or Unsupported).
*/
def getSupportLevel(operator: T): SupportLevel = Compatible(None)

/**
* Convert a Spark operator into a protocol buffer representation that can be passed into native
* code.
Expand All @@ -49,9 +65,4 @@ trait CometOperatorSerde[T <: SparkPlan] {
builder: Operator.Builder,
childOp: Operator*): Option[OperatorOuterClass.Operator]

/**
* Get the optional Comet configuration entry that is used to enable or disable native support
* for this operator.
*/
def enabledConfig: Option[ConfigEntry[Boolean]]
}
133 changes: 69 additions & 64 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
import org.apache.comet.serde.Types.DataType._
import org.apache.comet.serde.literals.CometLiteral
import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder}
import org.apache.comet.serde.operator._
import org.apache.comet.shims.CometExprShim

/**
Expand All @@ -61,8 +61,18 @@ object QueryPlanSerde extends Logging with CometExprShim {
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(
classOf[ProjectExec] -> CometProject,
classOf[FilterExec] -> CometFilter,
classOf[LocalLimitExec] -> CometLocalLimit,
classOf[GlobalLimitExec] -> CometGlobalLimit,
classOf[ExpandExec] -> CometExpand,
classOf[HashAggregateExec] -> CometHashAggregate,
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
classOf[SortMergeJoinExec] -> CometSortMergeJoin,
classOf[SortExec] -> CometSort,
classOf[LocalTableScanExec] -> CometLocalTableScan)
classOf[LocalTableScanExec] -> CometLocalTableScan,
classOf[WindowExec] -> CometWindow)

private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[ArrayAppend] -> CometArrayAppend,
Expand Down Expand Up @@ -924,51 +934,26 @@ object QueryPlanSerde extends Logging with CometExprShim {
val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
childOp.foreach(builder.addChildren)

// look for registered handler first
val serde = opSerdeMap.get(op.getClass)
serde match {
case Some(handler) if isOperatorEnabled(handler, op) =>
val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
val maybeConverted = opSerde.convert(op, builder, childOp: _*)
if (maybeConverted.isDefined) {
return maybeConverted
}
case _ =>
}

// now handle special cases that cannot be handled as a simple mapping from class name
// and see if operator can be used as a sink
op match {

// Fully native scan for V1
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
CometNativeScan.convert(scan, builder, childOp: _*)

case filter: FilterExec if CometConf.COMET_EXEC_FILTER_ENABLED.get(conf) =>
CometFilter.convert(filter, builder, childOp: _*)

case limit: LocalLimitExec if CometConf.COMET_EXEC_LOCAL_LIMIT_ENABLED.get(conf) =>
CometLocalLimit.convert(limit, builder, childOp: _*)

case globalLimitExec: GlobalLimitExec
if CometConf.COMET_EXEC_GLOBAL_LIMIT_ENABLED.get(conf) =>
CometGlobalLimit.convert(globalLimitExec, builder, childOp: _*)

case expand: ExpandExec if CometConf.COMET_EXEC_EXPAND_ENABLED.get(conf) =>
CometExpand.convert(expand, builder, childOp: _*)

case _: WindowExec if CometConf.COMET_EXEC_WINDOW_ENABLED.get(conf) =>
withInfo(op, "Window expressions are not supported")
None

case aggregate: HashAggregateExec if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) =>
CometHashAggregate.convert(aggregate, builder, childOp: _*)

case aggregate: ObjectHashAggregateExec
if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) =>
CometObjectHashAggregate.convert(aggregate, builder, childOp: _*)

case join: BroadcastHashJoinExec
if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) =>
CometBroadcastHashJoin.convert(join, builder, childOp: _*)

case join: ShuffledHashJoinExec if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) =>
CometShuffleHashJoin.convert(join, builder, childOp: _*)

case join: SortMergeJoinExec =>
if (CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf)) {
CometSortMergeJoin.convert(join, builder, childOp: _*)
} else {
withInfo(join, "SortMergeJoin is not enabled")
None
}

case op if isCometSink(op) =>
val supportedTypes =
op.output.forall(a => supportedDataType(a.dataType, allowComplex = true))
Expand Down Expand Up @@ -1048,6 +1033,49 @@ object QueryPlanSerde extends Logging with CometExprShim {
}
}

private def isOperatorEnabled(handler: CometOperatorSerde[_], op: SparkPlan): Boolean = {
val enabled = handler.enabledConfig.forall(_.get(op.conf))
val opName = op.getClass.getSimpleName
if (enabled) {
val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
opSerde.getSupportLevel(op) match {
case Unsupported(notes) =>
withInfo(op, notes.getOrElse(""))
false
case Incompatible(notes) =>
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
val incompatConf = CometConf.getOperatorAllowIncompatConfigKey(opName)
if (allowIncompat) {
if (notes.isDefined) {
logWarning(
s"Comet supports $opName when $incompatConf=true " +
s"but has notes: ${notes.get}")
}
true
} else {
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
withInfo(
op,
s"$opName is not fully compatible with Spark$optionalNotes. " +
s"To enable it anyway, set $incompatConf=true. " +
s"${CometConf.COMPAT_GUIDE}.")
false
}
case Compatible(notes) =>
if (notes.isDefined) {
logWarning(s"Comet supports $opName but has notes: ${notes.get}")
}
true
}
} else {
withInfo(
op,
s"Native support for operator $opName is disabled. " +
s"Set ${handler.enabledConfig.get.key}=true to enable it.")
false
}
}

/**
* Whether the input Spark operator `op` can be considered as a Comet sink, i.e., the start of
* native execution. If it is true, we'll wrap `op` with `CometScanWrapper` or
Expand All @@ -1066,7 +1094,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
case _: CollectLimitExec => true
case _: UnionExec => true
case _: TakeOrderedAndProjectExec => true
case _: WindowExec => true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this line fixes #2737. I wish I could explain why, but I cannot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WindowExec was added here because it was related to TakeOrderedProjectExec which was added here previuosly, but you are right, it is most likely the reason, why the operator doesn't fallback to Spark in clean way

case _ => false
}
}
Expand Down Expand Up @@ -1144,25 +1171,3 @@ object QueryPlanSerde extends Logging with CometExprShim {
}

}

sealed trait SupportLevel

/**
* Comet either supports this feature with full compatibility with Spark, or may have known
* differences in some specific edge cases that are unlikely to be an issue for most users.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Compatible(notes: Option[String] = None) extends SupportLevel

/**
* Comet supports this feature but results can be different from Spark.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** Comet does not support this feature */
case class Unsupported(notes: Option[String] = None) extends SupportLevel
42 changes: 42 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

sealed trait SupportLevel

/**
* Comet either supports this feature with full compatibility with Spark, or may have known
* differences in some specific edge cases that are unlikely to be an issue for most users.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Compatible(notes: Option[String] = None) extends SupportLevel

/**
* Comet supports this feature but results can be different from Spark.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** Comet does not support this feature */
case class Unsupported(notes: Option[String] = None) extends SupportLevel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

Expand All @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.MapType

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator}
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

Expand All @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.ExpandExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.exprToProto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import org.apache.spark.sql.execution.FilterExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.exprToProto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import org.apache.spark.sql.execution.GlobalLimitExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator

object CometGlobalLimit extends CometOperatorSerde[GlobalLimitExec] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec}

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.{BuildSide, JoinType, Operator}
import org.apache.comet.serde.QueryPlanSerde.exprToProto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import org.apache.spark.sql.execution.LocalLimitExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator

object CometLocalLimit extends CometOperatorSerde[LocalLimitExec] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
Expand All @@ -35,6 +35,7 @@ import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.CometParquetUtils
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
Expand Down
Loading
Loading