Skip to content

Commit f69739b

Browse files
authored
feat: Improve fallback mechanism for ANSI mode (#2211)
1 parent 775efbf commit f69739b

File tree

237 files changed

+25509
-28192
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

237 files changed

+25509
-28192
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -600,15 +600,6 @@ object CometConf extends ShimCometConf {
600600
.toSequence
601601
.createWithDefault(Seq("Range,InMemoryTableScan"))
602602

603-
val COMET_ANSI_MODE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.ansi.enabled")
604-
.internal()
605-
.doc(
606-
"Comet does not respect ANSI mode in most cases and by default will not accelerate " +
607-
"queries when ansi mode is enabled. Enable this setting to test Comet's experimental " +
608-
"support for ANSI mode. This should not be used in production.")
609-
.booleanConf
610-
.createWithDefault(COMET_ANSI_MODE_ENABLED_DEFAULT)
611-
612603
val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
613604
conf("spark.comet.caseConversion.enabled")
614605
.doc(

common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,4 @@ package org.apache.comet.shims
2121

2222
trait ShimCometConf {
2323
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = false
24-
protected val COMET_ANSI_MODE_ENABLED_DEFAULT = false
2524
}

common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,4 @@ package org.apache.comet.shims
2121

2222
trait ShimCometConf {
2323
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
24-
protected val COMET_ANSI_MODE_ENABLED_DEFAULT = true
2524
}

dev/diffs/3.4.3.diff

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index d3544881af1..5cc127f064d 100644
2+
index d3544881af1..9c174496a4b 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -148,6 +148,8 @@
@@ -881,7 +881,7 @@ index b5b34922694..a72403780c4 100644
881881
protected val baseResourcePath = {
882882
// use the same way as `SQLQueryTestSuite` to get the resource path
883883
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
884-
index 525d97e4998..8a3e7457618 100644
884+
index 525d97e4998..5e04319dd97 100644
885885
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
886886
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
887887
@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -894,6 +894,19 @@ index 525d97e4998..8a3e7457618 100644
894894
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
895895
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
896896
}
897+
@@ -4467,7 +4468,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
898+
val msg = intercept[SparkException] {
899+
sql(query).collect()
900+
}.getMessage
901+
- assert(msg.contains(query))
902+
+ if (!isCometEnabled) {
903+
+ // Comet's error message does not include the original SQL query
904+
+ // https://github.com/apache/datafusion-comet/issues/2215
905+
+ assert(msg.contains(query))
906+
+ }
907+
}
908+
}
909+
}
897910
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
898911
index 48ad10992c5..51d1ee65422 100644
899912
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -2798,10 +2811,10 @@ index dd55fcfe42c..a1d390c93d0 100644
27982811

27992812
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
28002813
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2801-
index ed2e309fa07..a1fb4abe681 100644
2814+
index ed2e309fa07..a5ea58146ad 100644
28022815
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
28032816
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2804-
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
2817+
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
28052818
// this rule may potentially block testing of other optimization rules such as
28062819
// ConstantPropagation etc.
28072820
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2828,7 +2841,6 @@ index ed2e309fa07..a1fb4abe681 100644
28282841
+ if (enableCometAnsiMode) {
28292842
+ conf
28302843
+ .set("spark.sql.ansi.enabled", "true")
2831-
+ .set("spark.comet.ansi.enabled", "true")
28322844
+ }
28332845
+ }
28342846
conf.set(
@@ -2920,10 +2932,10 @@ index a902cb3a69e..800a3acbe99 100644
29202932

29212933
test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
29222934
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
2923-
index 07361cfdce9..b4d53dbe900 100644
2935+
index 07361cfdce9..97dab2a3506 100644
29242936
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
29252937
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
2926-
@@ -55,25 +55,55 @@ object TestHive
2938+
@@ -55,25 +55,54 @@ object TestHive
29272939
new SparkContext(
29282940
System.getProperty("spark.sql.test.master", "local[1]"),
29292941
"TestSQLContext",
@@ -2987,7 +2999,6 @@ index 07361cfdce9..b4d53dbe900 100644
29872999
+ if (a != null && a.toBoolean) {
29883000
+ conf
29893001
+ .set("spark.sql.ansi.enabled", "true")
2990-
+ .set("spark.comet.ansi.enabled", "true")
29913002
+ }
29923003
+ }
29933004

dev/diffs/3.5.6.diff

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index 68e2c422a24..fb9c2e88fac 100644
2+
index 68e2c422a24..d971894ffe6 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -152,6 +152,8 @@
@@ -866,7 +866,7 @@ index c26757c9cff..d55775f09d7 100644
866866
protected val baseResourcePath = {
867867
// use the same way as `SQLQueryTestSuite` to get the resource path
868868
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
869-
index 793a0da6a86..6ccb9d62582 100644
869+
index 793a0da6a86..e48e74091cb 100644
870870
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
871871
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
872872
@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -879,6 +879,19 @@ index 793a0da6a86..6ccb9d62582 100644
879879
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
880880
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
881881
}
882+
@@ -4497,7 +4498,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
883+
val msg = intercept[SparkException] {
884+
sql(query).collect()
885+
}.getMessage
886+
- assert(msg.contains(query))
887+
+ if (!isCometEnabled) {
888+
+ // Comet's error message does not include the original SQL query
889+
+ // https://github.com/apache/datafusion-comet/issues/2215
890+
+ assert(msg.contains(query))
891+
+ }
892+
}
893+
}
894+
}
882895
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
883896
index fa1a64460fc..1d2e215d6a3 100644
884897
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -2770,10 +2783,10 @@ index e937173a590..ca06132102d 100644
27702783

27712784
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
27722785
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2773-
index ed2e309fa07..a1fb4abe681 100644
2786+
index ed2e309fa07..a5ea58146ad 100644
27742787
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
27752788
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2776-
@@ -74,6 +74,32 @@ trait SharedSparkSessionBase
2789+
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
27772790
// this rule may potentially block testing of other optimization rules such as
27782791
// ConstantPropagation etc.
27792792
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2800,7 +2813,6 @@ index ed2e309fa07..a1fb4abe681 100644
28002813
+ if (enableCometAnsiMode) {
28012814
+ conf
28022815
+ .set("spark.sql.ansi.enabled", "true")
2803-
+ .set("spark.comet.ansi.enabled", "true")
28042816
+ }
28052817
+ }
28062818
conf.set(
@@ -2935,10 +2947,10 @@ index 6160c3e5f6c..0956d7d9edc 100644
29352947

29362948
test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
29372949
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
2938-
index 1d646f40b3e..7f2cdb8f061 100644
2950+
index 1d646f40b3e..5babe505301 100644
29392951
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
29402952
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
2941-
@@ -53,25 +53,55 @@ object TestHive
2953+
@@ -53,25 +53,54 @@ object TestHive
29422954
new SparkContext(
29432955
System.getProperty("spark.sql.test.master", "local[1]"),
29442956
"TestSQLContext",
@@ -3002,7 +3014,6 @@ index 1d646f40b3e..7f2cdb8f061 100644
30023014
+ if (a != null && a.toBoolean) {
30033015
+ conf
30043016
+ .set("spark.sql.ansi.enabled", "true")
3005-
+ .set("spark.comet.ansi.enabled", "true")
30063017
+ }
30073018
+ }
30083019

dev/diffs/4.0.0.diff

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,7 +1057,7 @@ index ad424b3a7cc..4ece0117a34 100644
10571057
protected val baseResourcePath = {
10581058
// use the same way as `SQLQueryTestSuite` to get the resource path
10591059
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
1060-
index b3fce19979e..345acb4811a 100644
1060+
index b3fce19979e..67edf5eb91c 100644
10611061
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
10621062
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
10631063
@@ -1524,7 +1524,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -1086,11 +1086,24 @@ index b3fce19979e..345acb4811a 100644
10861086
test("SPARK-39175: Query context of Cast should be serialized to executors" +
10871087
- " when WSCG is off") {
10881088
+ " when WSCG is off",
1089-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
1089+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2218")) {
10901090
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
10911091
SQLConf.ANSI_ENABLED.key -> "true") {
10921092
withTable("t") {
1093-
@@ -4497,7 +4500,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
1093+
@@ -4490,14 +4493,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
1094+
assert(ex.isInstanceOf[SparkNumberFormatException] ||
1095+
ex.isInstanceOf[SparkDateTimeException] ||
1096+
ex.isInstanceOf[SparkRuntimeException])
1097+
- assert(ex.getMessage.contains(query))
1098+
+
1099+
+ if (!isCometEnabled) {
1100+
+ // Comet's error message does not include the original SQL query
1101+
+ // https://github.com/apache/datafusion-comet/issues/2215
1102+
+ assert(ex.getMessage.contains(query))
1103+
+ }
1104+
}
1105+
}
1106+
}
10941107
}
10951108

10961109
test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " +
@@ -3491,10 +3504,10 @@ index f0f3f94b811..d64e4e54e22 100644
34913504

34923505
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
34933506
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
3494-
index 245219c1756..880406011d9 100644
3507+
index 245219c1756..7d2ef1b9145 100644
34953508
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
34963509
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
3497-
@@ -75,6 +75,32 @@ trait SharedSparkSessionBase
3510+
@@ -75,6 +75,31 @@ trait SharedSparkSessionBase
34983511
// this rule may potentially block testing of other optimization rules such as
34993512
// ConstantPropagation etc.
35003513
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -3521,7 +3534,6 @@ index 245219c1756..880406011d9 100644
35213534
+ if (enableCometAnsiMode) {
35223535
+ conf
35233536
+ .set("spark.sql.ansi.enabled", "true")
3524-
+ .set("spark.comet.ansi.enabled", "true")
35253537
+ }
35263538
+ }
35273539
conf.set(
@@ -3630,10 +3642,10 @@ index b67370f6eb9..746b3974b29 100644
36303642
override def beforeEach(): Unit = {
36313643
super.beforeEach()
36323644
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
3633-
index a394d0b7393..8411da928ab 100644
3645+
index a394d0b7393..d29b3058897 100644
36343646
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
36353647
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
3636-
@@ -53,24 +53,48 @@ object TestHive
3648+
@@ -53,24 +53,47 @@ object TestHive
36373649
new SparkContext(
36383650
System.getProperty("spark.sql.test.master", "local[1]"),
36393651
"TestSQLContext",
@@ -3690,7 +3702,6 @@ index a394d0b7393..8411da928ab 100644
36903702
+ if (a != null && a.toBoolean) {
36913703
+ conf
36923704
+ .set("spark.sql.ansi.enabled", "true")
3693-
+ .set("spark.comet.ansi.enabled", "true")
36943705
+ }
36953706
+ }
36963707
+

docs/source/user-guide/compatibility.md

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,19 @@ The `native_datafusion` scan has some additional limitations:
7575

7676
## ANSI Mode
7777

78-
Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default,
79-
Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled,
80-
specify `spark.comet.ansi.enabled=true` in the Spark configuration. Comet's ANSI support is experimental and should not
81-
be used in production.
78+
Comet will fall back to Spark for the following expressions when ANSI mode is enabled, unless
79+
`spark.comet.expression.allowIncompatible=true`.
80+
81+
- Add
82+
- Subtract
83+
- Multiply
84+
- Divide
85+
- IntegralDivide
86+
- Remainder
87+
- Round
88+
- Average
89+
- Sum
90+
- Cast (in some cases)
8291

8392
There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.
8493

spark/src/main/scala/org/apache/comet/expressions/CometCast.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ object CometCast {
130130
Compatible(Some("Only supports years between 262143 BC and 262142 AD"))
131131
case DataTypes.TimestampType if timeZoneId.exists(tz => tz != "UTC") =>
132132
Incompatible(Some(s"Cast will use UTC instead of $timeZoneId"))
133-
case DataTypes.TimestampType if evalMode == "ANSI" =>
133+
case DataTypes.TimestampType if evalMode == CometEvalMode.ANSI =>
134134
Incompatible(Some("ANSI mode not supported"))
135135
case DataTypes.TimestampType =>
136136
// https://github.com/apache/datafusion-comet/issues/328

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.internal.SQLConf
3939
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}
4040

4141
import org.apache.comet.{CometConf, ExtendedExplainInfo}
42-
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_EXEC_SHUFFLE_ENABLED}
42+
import org.apache.comet.CometConf.COMET_EXEC_SHUFFLE_ENABLED
4343
import org.apache.comet.CometSparkSessionExtensions._
4444
import org.apache.comet.serde.OperatorOuterClass.Operator
4545
import org.apache.comet.serde.QueryPlanSerde
@@ -605,19 +605,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
605605
}
606606

607607
private def _apply(plan: SparkPlan): SparkPlan = {
608-
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
609-
// enabled.
610-
if (isANSIEnabled(conf)) {
611-
if (COMET_ANSI_MODE_ENABLED.get()) {
612-
if (!isSpark40Plus) {
613-
logWarning("Using Comet's experimental support for ANSI mode.")
614-
}
615-
} else {
616-
logInfo("Comet extension disabled for ANSI mode")
617-
return plan
618-
}
619-
}
620-
621608
// We shouldn't transform Spark query plan if Comet is not loaded.
622609
if (!isCometLoaded(conf)) return plan
623610

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,13 +2368,24 @@ object QueryPlanSerde extends Logging with CometExprShim {
23682368

23692369
sealed trait SupportLevel
23702370

2371-
/** We support this feature with full compatibility with Spark */
2371+
/**
2372+
* Comet either supports this feature with full compatibility with Spark, or may have known
2373+
* differences in some specific edge cases that are unlikely to be an issue for most users.
2374+
*
2375+
* Any compatibility differences are noted in the
2376+
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
2377+
*/
23722378
case class Compatible(notes: Option[String] = None) extends SupportLevel
23732379

2374-
/** We support this feature but results can be different from Spark */
2380+
/**
2381+
* Comet supports this feature but results can be different from Spark.
2382+
*
2383+
* Any compatibility differences are noted in the
2384+
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
2385+
*/
23752386
case class Incompatible(notes: Option[String] = None) extends SupportLevel
23762387

2377-
/** We do not support this feature */
2388+
/** Comet does not support this feature */
23782389
object Unsupported extends SupportLevel
23792390

23802391
/**

0 commit comments

Comments
 (0)