Skip to content

Commit 79516b6

Browse files
authored
feat: Add dynamic enabled and allowIncompat configs for all supported expressions (#2329)
1 parent 1553caa commit 79516b6

File tree

7 files changed

+100
-25
lines changed

7 files changed

+100
-25
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ object CometConf extends ShimCometConf {
6565

6666
val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";
6767

68+
val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";
69+
6870
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
6971
.doc(
7072
"Whether to enable Comet extension for Spark. When this is turned on, Spark will use " +
@@ -228,8 +230,6 @@ object CometConf extends ShimCometConf {
228230
createExecEnabledConfig("window", defaultValue = true)
229231
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
230232
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
231-
val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
232-
createExecEnabledConfig("initCap", defaultValue = false)
233233

234234
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
235235
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
@@ -664,6 +664,26 @@ object CometConf extends ShimCometConf {
664664
.booleanConf
665665
.createWithDefault(defaultValue)
666666
}
667+
668+
def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
669+
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
670+
}
671+
672+
def getExprEnabledConfigKey(name: String): String = {
673+
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.enabled"
674+
}
675+
676+
def isExprAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = {
677+
getBooleanConf(getExprAllowIncompatConfigKey(name), defaultValue = false, conf)
678+
}
679+
680+
def getExprAllowIncompatConfigKey(name: String): String = {
681+
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.allowIncompatible"
682+
}
683+
684+
def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = {
685+
conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true"
686+
}
667687
}
668688

669689
object ConfigHelpers {

docs/source/user-guide/latest/configs.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ Comet provides the following configuration settings.
4848
| spark.comet.exec.filter.enabled | Whether to enable filter by default. | true |
4949
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
5050
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
51-
| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false |
5251
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
5352
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
5453
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |

docs/source/user-guide/latest/expressions.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,15 @@ Comet supports the following Spark expressions. Expressions that are marked as S
2323
natively in Comet and provide the same results as Spark, or will fall back to Spark for cases that would not
2424
be compatible.
2525

26-
Expressions that are not Spark-compatible are disabled by default and can be enabled by setting
27-
`spark.comet.expression.allowIncompatible=true`.
26+
All expressions are enabled by default, but can be disabled by setting
27+
`spark.comet.expression.EXPRNAME.enabled=false`, where `EXPRNAME` is the expression name as specified in
28+
the following tables, such as `Length`, or `StartsWith`.
29+
30+
Expressions that are not Spark-compatible will fall back to Spark by default and can be enabled by setting
31+
`spark.comet.expression.EXPRNAME.allowIncompatible=true`.
32+
33+
It is also possible to specify `spark.comet.expression.allowIncompatible=true` to enable all
34+
incompatible expressions.
2835

2936
## Conditional Expressions
3037

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -630,15 +630,24 @@ object QueryPlanSerde extends Logging with CometExprShim {
630630
expr: Expression,
631631
inputs: Seq[Attribute],
632632
binding: Boolean): Option[Expr] = {
633-
SQLConf.get
633+
val conf = SQLConf.get
634634

635635
def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = {
636+
val exprConfName = handler.getExprConfigName(expr)
637+
if (!CometConf.isExprEnabled(exprConfName)) {
638+
withInfo(
639+
expr,
640+
"Expression support is disabled. Set " +
641+
s"${CometConf.getExprEnabledConfigKey(exprConfName)}=true to enable it.")
642+
return None
643+
}
636644
handler.getSupportLevel(expr) match {
637645
case Unsupported(notes) =>
638646
withInfo(expr, notes.getOrElse(""))
639647
None
640648
case Incompatible(notes) =>
641-
if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
649+
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
650+
if (exprAllowIncompat || CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
642651
if (notes.isDefined) {
643652
logWarning(
644653
s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " +
@@ -650,8 +659,9 @@ object QueryPlanSerde extends Logging with CometExprShim {
650659
withInfo(
651660
expr,
652661
s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " +
653-
s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " +
654-
s"${CometConf.COMPAT_GUIDE}.")
662+
s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true, or set " +
663+
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to enable all " +
664+
s"incompatible expressions. ${CometConf.COMPAT_GUIDE}.")
655665
None
656666
}
657667
case Compatible(notes) =>
@@ -669,7 +679,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
669679
exprToProtoInternal(Literal(value, dataType), inputs, binding)
670680

671681
case UnaryExpression(child) if expr.prettyName == "trycast" =>
672-
val timeZoneId = SQLConf.get.sessionLocalTimeZone
682+
val timeZoneId = conf.sessionLocalTimeZone
673683
val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY)
674684
convert(cast, CometCast)
675685

@@ -1850,6 +1860,17 @@ trait CometOperatorSerde[T <: SparkPlan] {
18501860
*/
18511861
trait CometExpressionSerde[T <: Expression] {
18521862

1863+
/**
1864+
* Get a short name for the expression that can be used as part of a config key related to the
1865+
* expression, such as enabling or disabling that expression.
1866+
*
1867+
* @param expr
1868+
* The Spark expression.
1869+
* @return
1870+
* Short name for the expression, defaulting to the Spark class name
1871+
*/
1872+
def getExprConfigName(expr: T): String = expr.getClass.getSimpleName
1873+
18531874
/**
18541875
* Determine the support level of the expression based on its attributes.
18551876
*

spark/src/main/scala/org/apache/comet/serde/strings.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,14 @@ object CometLower extends CometCaseConversionBase[Lower]("lower")
6868

6969
object CometInitCap extends CometScalarFunction[InitCap]("initcap") {
7070

71+
override def getSupportLevel(expr: InitCap): SupportLevel = {
72+
// Behavior differs from Spark. One example is that for the input "robert rose-smith", Spark
73+
// will produce "Robert Rose-smith", but Comet will produce "Robert Rose-Smith".
74+
// https://github.com/apache/datafusion-comet/issues/1052
75+
Incompatible(None)
76+
}
77+
7178
override def convert(expr: InitCap, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
72-
if (!CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
73-
withInfo(
74-
expr,
75-
"Comet initCap is not compatible with Spark yet. " +
76-
"See https://github.com/apache/datafusion-comet/issues/1052 ." +
77-
s"Set ${CometConf.COMET_EXEC_INITCAP_ENABLED.key}=true to enable it anyway.")
78-
return None
79-
}
8079
super.convert(expr, inputs, binding)
8180
}
8281
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3333
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
3434
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
3535
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec}
36-
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec}
36+
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec}
3737
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3838
import org.apache.spark.sql.expressions.Window
3939
import org.apache.spark.sql.functions._
@@ -1301,6 +1301,40 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
13011301
}
13021302
}
13031303

1304+
test("disable expression using dynamic config") {
1305+
def countSparkProjectExec(plan: SparkPlan) = {
1306+
plan.collect { case _: ProjectExec =>
1307+
true
1308+
}.length
1309+
}
1310+
withParquetTable(Seq(0, 1, 2).map(n => (n, n)), "tbl") {
1311+
val sql = "select _1+_2 from tbl"
1312+
val (_, cometPlan) = checkSparkAnswer(sql)
1313+
assert(0 == countSparkProjectExec(cometPlan))
1314+
withSQLConf(CometConf.getExprEnabledConfigKey("Add") -> "false") {
1315+
val (_, cometPlan) = checkSparkAnswer(sql)
1316+
assert(1 == countSparkProjectExec(cometPlan))
1317+
}
1318+
}
1319+
}
1320+
1321+
test("enable incompat expression using dynamic config") {
1322+
def countSparkProjectExec(plan: SparkPlan) = {
1323+
plan.collect { case _: ProjectExec =>
1324+
true
1325+
}.length
1326+
}
1327+
withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") {
1328+
val sql = "select initcap(_1) from tbl"
1329+
val (_, cometPlan) = checkSparkAnswer(sql)
1330+
assert(1 == countSparkProjectExec(cometPlan))
1331+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> "true") {
1332+
val (_, cometPlan) = checkSparkAnswer(sql)
1333+
assert(0 == countSparkProjectExec(cometPlan))
1334+
}
1335+
}
1336+
}
1337+
13041338
test("signum") {
13051339
testDoubleScalarExpr("signum")
13061340
}

spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,7 @@ class CometStringExpressionSuite extends CometTestBase {
103103
s"insert into $table values(1, 'james smith'), (2, 'michael rose'), " +
104104
"(3, 'robert williams'), (4, 'rames rose'), (5, 'james smith'), " +
105105
"(6, 'robert rose-smith'), (7, 'james ähtäri')")
106-
if (CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
107-
// TODO: remove this if clause https://github.com/apache/datafusion-comet/issues/1052
108-
checkSparkAnswerAndOperator(s"SELECT initcap(name) FROM $table")
109-
} else {
110-
checkSparkAnswer(s"SELECT initcap(name) FROM $table")
111-
}
106+
checkSparkAnswer(s"SELECT initcap(name) FROM $table")
112107
}
113108
}
114109

0 commit comments

Comments
 (0)