Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ object CometConf extends ShimCometConf {

val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";

val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator";

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.category(CATEGORY_EXEC)
.doc(
Expand Down Expand Up @@ -747,6 +749,18 @@ object CometConf extends ShimCometConf {
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible"
}

def isOperatorAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = {
getBooleanConf(getOperatorAllowIncompatConfigKey(name), defaultValue = false, conf)
}

def getOperatorAllowIncompatConfigKey(name: String): String = {
s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.$name.allowIncompatible"
}

def getOperatorAllowIncompatConfigKey(exprClass: Class[_]): String = {
s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible"
}

def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = {
conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
*/
trait CometOperatorSerde[T <: SparkPlan] {

/**
* Get the optional Comet configuration entry that is used to enable or disable native support
* for this operator.
*/
def enabledConfig: Option[ConfigEntry[Boolean]]

/**
* Determine the support level of the operator based on its attributes.
*
* @param operator
* The Spark operator.
* @return
* Support level (Compatible, Incompatible, or Unsupported).
*/
def getSupportLevel(operator: T): SupportLevel = Compatible(None)

/**
* Convert a Spark operator into a protocol buffer representation that can be passed into native
* code.
Expand All @@ -49,9 +65,4 @@ trait CometOperatorSerde[T <: SparkPlan] {
builder: Operator.Builder,
childOp: Operator*): Option[OperatorOuterClass.Operator]

/**
* Get the optional Comet configuration entry that is used to enable or disable native support
* for this operator.
*/
def enabledConfig: Option[ConfigEntry[Boolean]]
}
154 changes: 75 additions & 79 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
import org.apache.comet.serde.Types.DataType._
import org.apache.comet.serde.literals.CometLiteral
import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder}
import org.apache.comet.serde.operator._
import org.apache.comet.shims.CometExprShim

/**
Expand All @@ -61,6 +61,15 @@ object QueryPlanSerde extends Logging with CometExprShim {
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(
classOf[ProjectExec] -> CometProject,
classOf[FilterExec] -> CometFilter,
classOf[LocalLimitExec] -> CometLocalLimit,
classOf[GlobalLimitExec] -> CometGlobalLimit,
classOf[ExpandExec] -> CometExpand,
classOf[HashAggregateExec] -> CometHashAggregate,
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
classOf[SortMergeJoinExec] -> CometSortMergeJoin,
classOf[SortExec] -> CometSort,
classOf[LocalTableScanExec] -> CometLocalTableScan)

Expand Down Expand Up @@ -924,51 +933,30 @@ object QueryPlanSerde extends Logging with CometExprShim {
val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
childOp.foreach(builder.addChildren)

// look for registered handler first
val serde = opSerdeMap.get(op.getClass)
serde match {
case Some(handler) if isOperatorEnabled(handler, op) =>
val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
val maybeConverted = opSerde.convert(op, builder, childOp: _*)
if (maybeConverted.isDefined) {
return maybeConverted
}
case _ =>
}

// now handle special cases that cannot be handled as a simple mapping from class name
// and see if operator can be used as a sink
op match {

// Fully native scan for V1
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
CometNativeScan.convert(scan, builder, childOp: _*)

case filter: FilterExec if CometConf.COMET_EXEC_FILTER_ENABLED.get(conf) =>
CometFilter.convert(filter, builder, childOp: _*)

case limit: LocalLimitExec if CometConf.COMET_EXEC_LOCAL_LIMIT_ENABLED.get(conf) =>
CometLocalLimit.convert(limit, builder, childOp: _*)

case globalLimitExec: GlobalLimitExec
if CometConf.COMET_EXEC_GLOBAL_LIMIT_ENABLED.get(conf) =>
CometGlobalLimit.convert(globalLimitExec, builder, childOp: _*)

case expand: ExpandExec if CometConf.COMET_EXEC_EXPAND_ENABLED.get(conf) =>
CometExpand.convert(expand, builder, childOp: _*)

case _: WindowExec if CometConf.COMET_EXEC_WINDOW_ENABLED.get(conf) =>
withInfo(op, "Window expressions are not supported")
None

case aggregate: HashAggregateExec if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) =>
CometHashAggregate.convert(aggregate, builder, childOp: _*)

case aggregate: ObjectHashAggregateExec
if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) =>
CometObjectHashAggregate.convert(aggregate, builder, childOp: _*)

case join: BroadcastHashJoinExec
if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) =>
CometBroadcastHashJoin.convert(join, builder, childOp: _*)

case join: ShuffledHashJoinExec if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) =>
CometShuffleHashJoin.convert(join, builder, childOp: _*)

case join: SortMergeJoinExec =>
if (CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf)) {
CometSortMergeJoin.convert(join, builder, childOp: _*)
} else {
withInfo(join, "SortMergeJoin is not enabled")
None
}

case op if isCometSink(op) =>
val supportedTypes =
op.output.forall(a => supportedDataType(a.dataType, allowComplex = true))
Expand Down Expand Up @@ -1023,28 +1011,58 @@ object QueryPlanSerde extends Logging with CometExprShim {
None
}

case op =>
opSerdeMap.get(op.getClass) match {
case Some(handler) =>
handler.enabledConfig.foreach { enabledConfig =>
if (!enabledConfig.get(op.conf)) {
withInfo(
op,
s"Native support for operator ${op.getClass.getSimpleName} is disabled. " +
s"Set ${enabledConfig.key}=true to enable it.")
return None
}
}
handler.asInstanceOf[CometOperatorSerde[SparkPlan]].convert(op, builder, childOp: _*)
case _ =>
// Emit warning if:
// 1. it is not Spark shuffle operator, which is handled separately
// 2. it is not a Comet operator
if (!op.nodeName.contains("Comet") && !op.isInstanceOf[ShuffleExchangeExec]) {
withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
}
None
case _ =>
// Emit warning if:
// 1. it is not Spark shuffle operator, which is handled separately
// 2. it is not a Comet operator
if (serde.isEmpty && !op.nodeName.contains("Comet") &&
!op.isInstanceOf[ShuffleExchangeExec]) {
withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
}
None
}
}

private def isOperatorEnabled(handler: CometOperatorSerde[_], op: SparkPlan): Boolean = {
val enabled = handler.enabledConfig.forall(_.get(op.conf))
val opName = op.getClass.getSimpleName
if (enabled) {
val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
opSerde.getSupportLevel(op) match {
case Unsupported(notes) =>
withInfo(op, notes.getOrElse(""))
false
case Incompatible(notes) =>
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
val incompatConf = CometConf.getOperatorAllowIncompatConfigKey(opName)
if (allowIncompat) {
if (notes.isDefined) {
logWarning(
s"Comet supports $opName when $incompatConf=true " +
s"but has notes: ${notes.get}")
}
true
} else {
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
withInfo(
op,
s"$opName is not fully compatible with Spark$optionalNotes. " +
s"To enable it anyway, set $incompatConf=true. " +
s"${CometConf.COMPAT_GUIDE}.")
false
}
case Compatible(notes) =>
if (notes.isDefined) {
logWarning(s"Comet supports $opName but has notes: ${notes.get}")
}
true
}
} else {
withInfo(
op,
s"Native support for operator $opName is disabled. " +
s"Set ${handler.enabledConfig.get.key}=true to enable it.")
false
}
}

Expand Down Expand Up @@ -1144,25 +1162,3 @@ object QueryPlanSerde extends Logging with CometExprShim {
}

}

sealed trait SupportLevel

/**
* Comet either supports this feature with full compatibility with Spark, or may have known
* differences in some specific edge cases that are unlikely to be an issue for most users.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Compatible(notes: Option[String] = None) extends SupportLevel

/**
* Comet supports this feature but results can be different from Spark.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** Comet does not support this feature */
case class Unsupported(notes: Option[String] = None) extends SupportLevel
42 changes: 42 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

sealed trait SupportLevel

/**
* Comet either supports this feature with full compatibility with Spark, or may have known
* differences in some specific edge cases that are unlikely to be an issue for most users.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Compatible(notes: Option[String] = None) extends SupportLevel

/**
* Comet supports this feature but results can be different from Spark.
*
* Any compatibility differences are noted in the
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
*/
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** Comet does not support this feature */
case class Unsupported(notes: Option[String] = None) extends SupportLevel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

Expand All @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.MapType

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator}
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

Expand All @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.ExpandExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.exprToProto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import org.apache.spark.sql.execution.FilterExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.exprToProto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import org.apache.spark.sql.execution.GlobalLimitExec

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator

object CometGlobalLimit extends CometOperatorSerde[GlobalLimitExec] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
* under the License.
*/

package org.apache.comet.serde
package org.apache.comet.serde.operator

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec}

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.{BuildSide, JoinType, Operator}
import org.apache.comet.serde.QueryPlanSerde.exprToProto

Expand Down
Loading
Loading