From 2187820cc7a4424b5113b5156d13a11acf99cdad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 11:02:38 -0600 Subject: [PATCH 01/18] Move SupportLevel to QueryPlanSerde --- .../main/scala/org/apache/comet/GenerateDocs.scala | 3 ++- .../org/apache/comet/expressions/CometCast.scala | 11 +---------- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 11 +++++++++++ .../test/scala/org/apache/comet/CometCastSuite.scala | 3 ++- .../apache/spark/sql/CometToPrettyStringSuite.scala | 3 ++- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala index d8cc62cf9b..56fe75179c 100644 --- a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala +++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.catalyst.expressions.Cast -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.{Compatible, Incompatible} /** * Utility for generating markdown documentation from the configs. diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 337eae11db..fcf22c4a04 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,16 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} -sealed trait SupportLevel - -/** We support this feature with full compatibility with Spark */ -case class Compatible(notes: Option[String] = None) extends SupportLevel - -/** We support this feature but results can be different from Spark */ -case class Incompatible(notes: Option[String] = None) extends SupportLevel - -/** We do not support this feature */ -object Unsupported extends SupportLevel +import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported} object CometCast { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 23cf9d313e..3402ca58dd 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2387,6 +2387,17 @@ object QueryPlanSerde extends Logging with CometExprShim { } } +sealed trait SupportLevel + +/** We support this feature with full compatibility with Spark */ +case class Compatible(notes: Option[String] = None) extends SupportLevel + +/** We support this feature but results can be different from Spark */ +case class Incompatible(notes: Option[String] = None) extends SupportLevel + +/** We do not support this feature */ +object Unsupported extends SupportLevel + /** * Trait for providing serialization logic for operators. */ diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index fbf38e2e03..8bad71e081 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType} import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.Compatible class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { diff --git a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala index 7a5af87619..d030106c3e 100644 --- a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql import org.apache.comet.CometConf -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.Compatible import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.TableIdentifier From 1c735e3f37fe39b9d673fd7cb1c3cfb3b3181495 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 11:10:49 -0600 Subject: [PATCH 02/18] improve fallback reporting --- .../apache/comet/serde/QueryPlanSerde.scala | 20 ++++--- .../scala/org/apache/comet/serde/arrays.scala | 52 ++++++++++++++----- .../org/apache/comet/serde/unixtime.scala | 5 +- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3402ca58dd..5525a8d8da 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -642,11 +642,12 @@ object QueryPlanSerde extends Logging with CometExprShim { SQLConf.get def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { - handler match { - case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => + handler.getSupportLevel(expr) match { + case incompat: Incompatible if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => + val optionalNotes = incompat.notes.map(str => s" ($str)").getOrElse("") withInfo( expr, - s"$expr is not fully compatible with Spark. To enable it anyway, set " + + s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, set " + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") None case _ => @@ -2435,6 +2436,16 @@ trait CometOperatorSerde[T <: SparkPlan] { */ trait CometExpressionSerde[T <: Expression] { + /** + * Determine the support level of the expression based on its attributes. + * + * @param expr + * The Spark expression. + * @return + * Support level (Compatible, Incompatible, or Unsupported). + */ + def getSupportLevel(expr: T): SupportLevel = Compatible(None) + /** * Convert a Spark expression into a protocol buffer representation that can be passed into * native code. @@ -2485,9 +2496,6 @@ trait CometAggregateExpressionSerde { conf: SQLConf): Option[ExprOuterClass.AggExpr] } -/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */ -trait IncompatExpr {} - /** Serde for scalar function. */ case class CometScalarFunction[T <: Expression](name: String) extends CometExpressionSerde[T] { override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 2a77d5fa14..d2624ca23c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -93,7 +93,10 @@ object CometArrayRemove extends CometExpressionSerde[ArrayRemove] with CometExpr } } -object CometArrayAppend extends CometExpressionSerde[ArrayAppend] with IncompatExpr { +object CometArrayAppend extends CometExpressionSerde[ArrayAppend] { + + override def getSupportLevel(expr: ArrayAppend): SupportLevel = Incompatible(None) + override def convert( expr: ArrayAppend, inputs: Seq[Attribute], @@ -149,7 +152,10 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] { } } -object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with IncompatExpr { +object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] { + + override def getSupportLevel(expr: ArrayDistinct): SupportLevel = Incompatible(None) + override def convert( expr: ArrayDistinct, inputs: Seq[Attribute], @@ -162,7 +168,10 @@ object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with Incom } } -object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] with IncompatExpr { +object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] { + + override def getSupportLevel(expr: ArrayIntersect): SupportLevel = Incompatible(None) + override def convert( expr: ArrayIntersect, inputs: Seq[Attribute], @@ -201,7 +210,10 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] { } } -object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with IncompatExpr { +object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { + + override def getSupportLevel(expr: ArraysOverlap): SupportLevel = Incompatible(None) + override def convert( expr: ArraysOverlap, inputs: Seq[Attribute], @@ -218,7 +230,10 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with Incom } } -object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatExpr { +object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] { + + override def getSupportLevel(expr: ArrayRepeat): SupportLevel = Incompatible(None) + override def convert( expr: ArrayRepeat, inputs: Seq[Attribute], @@ -232,7 +247,10 @@ object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatE } } -object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatExpr { +object CometArrayCompact extends CometExpressionSerde[Expression] { + + override def getSupportLevel(expr: Expression): SupportLevel = Incompatible(None) + override def convert( expr: Expression, inputs: Seq[Attribute], @@ -252,10 +270,9 @@ object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatE } } -object CometArrayExcept - extends CometExpressionSerde[ArrayExcept] - with CometExprShim - with IncompatExpr { +object CometArrayExcept extends CometExpressionSerde[ArrayExcept] with CometExprShim { + + override def getSupportLevel(expr: ArrayExcept): SupportLevel = Incompatible(None) @tailrec def isTypeSupported(dt: DataType): Boolean = { @@ -292,7 +309,10 @@ object CometArrayExcept } } -object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr { +object CometArrayJoin extends CometExpressionSerde[ArrayJoin] { + + override def getSupportLevel(expr: ArrayJoin): SupportLevel = Incompatible(None) + override def convert( expr: ArrayJoin, inputs: Seq[Attribute], @@ -326,7 +346,10 @@ object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr } } -object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatExpr { +object CometArrayInsert extends CometExpressionSerde[ArrayInsert] { + + override def getSupportLevel(expr: ArrayInsert): SupportLevel = Incompatible(None) + override def convert( expr: ArrayInsert, inputs: Seq[Attribute], @@ -361,7 +384,10 @@ object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatE } } -object CometArrayUnion extends CometExpressionSerde[ArrayUnion] with IncompatExpr { +object CometArrayUnion extends CometExpressionSerde[ArrayUnion] { + + override def getSupportLevel(expr: ArrayUnion): SupportLevel = Incompatible(None) + override def convert( expr: ArrayUnion, inputs: Seq[Attribute], diff --git a/spark/src/main/scala/org/apache/comet/serde/unixtime.scala b/spark/src/main/scala/org/apache/comet/serde/unixtime.scala index 0171d3e1a2..198c7d3101 100644 --- a/spark/src/main/scala/org/apache/comet/serde/unixtime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/unixtime.scala @@ -27,7 +27,10 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn // TODO: DataFusion supports only -8334601211038 <= sec <= 8210266876799 // https://github.com/apache/datafusion/issues/16594 -object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] with IncompatExpr { +object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] { + + override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None) + override def convert( expr: FromUnixTime, inputs: Seq[Attribute], From 320e54ec56789ff299115fa05d49e97e7e0fa85a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 11:16:59 -0600 Subject: [PATCH 03/18] improve reporting --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 5525a8d8da..a3dcd12015 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -643,6 +643,9 @@ object QueryPlanSerde extends Logging with CometExprShim { def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { handler.getSupportLevel(expr) match { + case Unsupported => + withInfo(expr, s"$expr is not supported.") + None case incompat: Incompatible if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => val optionalNotes = incompat.notes.map(str => s" ($str)").getOrElse("") withInfo( @@ -650,7 +653,10 @@ object QueryPlanSerde extends Logging with CometExprShim { s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, set " + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") None - case _ => + case compat: Compatible => + if (compat.notes.isDefined) { + logWarning(s"Comet supports $expr but has notes: ${compat.notes.get}") + } handler.convert(expr, inputs, binding) } } From 89c29118d1a37627fc444227f20b8a0092f67acf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 11:24:22 -0600 Subject: [PATCH 04/18] fix --- .../apache/comet/serde/QueryPlanSerde.scala | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a3dcd12015..90e2831fe8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -646,16 +646,24 @@ object QueryPlanSerde extends Logging with CometExprShim { case Unsupported => withInfo(expr, s"$expr is not supported.") None - case incompat: Incompatible if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => - val optionalNotes = incompat.notes.map(str => s" ($str)").getOrElse("") - withInfo( - expr, - s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") - None - case compat: Compatible => - if (compat.notes.isDefined) { - logWarning(s"Comet supports $expr but has notes: ${compat.notes.get}") + case Incompatible(notes) => + if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { + if (notes.isDefined) { + logWarning( + s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true but has notes: ${notes.get}") + } + handler.convert(expr, inputs, binding) + } else { + val optionalNotes = notes.map(str => s" ($str)").getOrElse("") + withInfo( + expr, + s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, set " + + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") + None + } + case Compatible(notes) => + if (notes.isDefined) { + logWarning(s"Comet supports $expr but has notes: ${notes.get}") } handler.convert(expr, inputs, binding) } From 7ef3abf88fed6c785df4e2dc0061f1fd36334bc9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 11:26:31 -0600 Subject: [PATCH 05/18] format --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 90e2831fe8..109e5aa940 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -650,15 +650,17 @@ object QueryPlanSerde extends Logging with CometExprShim { if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { if (notes.isDefined) { logWarning( - s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true but has notes: ${notes.get}") + s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " + + s"but has notes: ${notes.get}") } handler.convert(expr, inputs, binding) } else { val optionalNotes = notes.map(str => s" ($str)").getOrElse("") withInfo( expr, - s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") + s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " + + s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " + + s"${CometConf.COMPAT_GUIDE}.") None } case Compatible(notes) => From df62af61fe562f7c0c10a9c5961d127703836fda Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 13:13:46 -0600 Subject: [PATCH 06/18] Improve ANSI fallback --- .../scala/org/apache/comet/CometConf.scala | 9 --- .../apache/comet/shims/ShimCometConf.scala | 1 - .../apache/comet/shims/ShimCometConf.scala | 1 - dev/diffs/3.4.3.diff | 17 ++++- dev/diffs/3.5.6.diff | 17 ++++- dev/diffs/4.0.0.diff | 19 +++++- .../apache/comet/rules/CometExecRule.scala | 15 +---- .../org/apache/comet/serde/arithmetic.scala | 62 +++++++++++++++++++ .../org/apache/comet/CometCastSuite.scala | 4 +- .../apache/comet/CometExpressionSuite.scala | 10 +-- 10 files changed, 116 insertions(+), 39 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 0c259be439..367af5f199 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -600,15 +600,6 @@ object CometConf extends ShimCometConf { .toSequence .createWithDefault(Seq("Range,InMemoryTableScan")) - val COMET_ANSI_MODE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.ansi.enabled") - .internal() - .doc( - "Comet does not respect ANSI mode in most cases and by default will not accelerate " + - "queries when ansi mode is enabled. Enable this setting to test Comet's experimental " + - "support for ANSI mode. This should not be used in production.") - .booleanConf - .createWithDefault(COMET_ANSI_MODE_ENABLED_DEFAULT) - val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.caseConversion.enabled") .doc( diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala b/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala index dc84a7525a..6147c18ddb 100644 --- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala +++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala @@ -21,5 +21,4 @@ package org.apache.comet.shims trait ShimCometConf { protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = false - protected val COMET_ANSI_MODE_ENABLED_DEFAULT = false } diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala b/common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala index 13da6bc107..0eb57c52b4 100644 --- a/common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala +++ b/common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala @@ -21,5 +21,4 @@ package org.apache.comet.shims trait ShimCometConf { protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true - protected val COMET_ANSI_MODE_ENABLED_DEFAULT = true } diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 9277e8ea6e..717f8467c4 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index d3544881af1..5cc127f064d 100644 +index d3544881af1..9c174496a4b 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -881,7 +881,7 @@ index b5b34922694..a72403780c4 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 525d97e4998..8a3e7457618 100644 +index 525d97e4998..5e04319dd97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -894,6 +894,19 @@ index 525d97e4998..8a3e7457618 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -4467,7 +4468,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + val msg = intercept[SparkException] { + sql(query).collect() + }.getMessage +- assert(msg.contains(query)) ++ if (!isCometEnabled) { ++ // Comet's error message does not include the original SQL query ++ // https://github.com/apache/datafusion-comet/issues/2215 ++ assert(msg.contains(query)) ++ } + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 48ad10992c5..51d1ee65422 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff index a60e38f801..bb2ba07c2c 100644 --- a/dev/diffs/3.5.6.diff +++ b/dev/diffs/3.5.6.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index 68e2c422a24..fb9c2e88fac 100644 +index 68e2c422a24..d971894ffe6 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,8 @@ @@ -866,7 +866,7 @@ index c26757c9cff..d55775f09d7 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 793a0da6a86..6ccb9d62582 100644 +index 793a0da6a86..e48e74091cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -879,6 +879,19 @@ index 793a0da6a86..6ccb9d62582 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -4497,7 +4498,11 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + val msg = intercept[SparkException] { + sql(query).collect() + }.getMessage +- assert(msg.contains(query)) ++ if (!isCometEnabled) { ++ // Comet's error message does not include the original SQL query ++ // https://github.com/apache/datafusion-comet/issues/2215 ++ assert(msg.contains(query)) ++ } + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala index fa1a64460fc..1d2e215d6a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala diff --git a/dev/diffs/4.0.0.diff b/dev/diffs/4.0.0.diff index 4a4b958c48..6799999b8a 100644 --- a/dev/diffs/4.0.0.diff +++ b/dev/diffs/4.0.0.diff @@ -1057,7 +1057,7 @@ index ad424b3a7cc..4ece0117a34 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index b3fce19979e..345acb4811a 100644 +index b3fce19979e..67edf5eb91c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1524,7 +1524,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -1086,11 +1086,24 @@ index b3fce19979e..345acb4811a 100644 test("SPARK-39175: Query context of Cast should be serialized to executors" + - " when WSCG is off") { + " when WSCG is off", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/2218")) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { -@@ -4497,7 +4500,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark +@@ -4490,14 +4493,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + assert(ex.isInstanceOf[SparkNumberFormatException] || + ex.isInstanceOf[SparkDateTimeException] || + ex.isInstanceOf[SparkRuntimeException]) +- assert(ex.getMessage.contains(query)) ++ ++ if (!isCometEnabled) { ++ // Comet's error message does not include the original SQL query ++ // https://github.com/apache/datafusion-comet/issues/2215 ++ assert(ex.getMessage.contains(query)) ++ } + } + } + } } test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " + diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index c7af3ab770..2c858d7ef0 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType} import org.apache.comet.{CometConf, ExtendedExplainInfo} -import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_EXEC_SHUFFLE_ENABLED} +import org.apache.comet.CometConf.COMET_EXEC_SHUFFLE_ENABLED import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -605,19 +605,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } private def _apply(plan: SparkPlan): SparkPlan = { - // DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is - // enabled. - if (isANSIEnabled(conf)) { - if (COMET_ANSI_MODE_ENABLED.get()) { - if (!isSpark40Plus) { - logWarning("Using Comet's experimental support for ANSI mode.") - } - } else { - logInfo("Comet extension disabled for ANSI mode") - return plan - } - } - // We shouldn't transform Spark query plan if Comet is not loaded. if (!isCometLoaded(conf)) return plan diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index de9275b9cf..e2d4f9968d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -86,6 +86,15 @@ trait MathBase { } object CometAdd extends CometExpressionSerde[Add] with MathBase { + + override def getSupportLevel(expr: Add): SupportLevel = { + if (expr.evalMode == EvalMode.ANSI) { + Incompatible(Some("ANSI mode is not supported")) + } else { + Compatible(None) + } + } + override def convert( expr: Add, inputs: Seq[Attribute], @@ -107,6 +116,15 @@ object CometAdd extends CometExpressionSerde[Add] with MathBase { } object CometSubtract extends CometExpressionSerde[Subtract] with MathBase { + + override def getSupportLevel(expr: Subtract): SupportLevel = { + if (expr.evalMode == EvalMode.ANSI) { + Incompatible(Some("ANSI mode is not supported")) + } else { + Compatible(None) + } + } + override def convert( expr: Subtract, inputs: Seq[Attribute], @@ -128,6 +146,15 @@ object CometSubtract extends CometExpressionSerde[Subtract] with MathBase { } object CometMultiply extends CometExpressionSerde[Multiply] with MathBase { + + override def getSupportLevel(expr: Multiply): SupportLevel = { + if (expr.evalMode == EvalMode.ANSI) { + Incompatible(Some("ANSI mode is not supported")) + } else { + Compatible(None) + } + } + override def convert( expr: Multiply, inputs: Seq[Attribute], @@ -149,6 +176,15 @@ object CometMultiply extends CometExpressionSerde[Multiply] with MathBase { } object CometDivide extends CometExpressionSerde[Divide] with MathBase { + + override def getSupportLevel(expr: Divide): SupportLevel = { + if (expr.evalMode == EvalMode.ANSI) { + Incompatible(Some("ANSI mode is not supported")) + } else { + Compatible(None) + } + } + override def convert( expr: Divide, inputs: Seq[Attribute], @@ -174,6 +210,15 @@ object CometDivide extends CometExpressionSerde[Divide] with MathBase { } object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with MathBase { + + override def getSupportLevel(expr: IntegralDivide): SupportLevel = { + if (expr.evalMode == EvalMode.ANSI) { + Incompatible(Some("ANSI mode is not supported")) + } else { + Compatible(None) + } + } + override def convert( expr: IntegralDivide, inputs: Seq[Attribute], @@ -237,6 +282,15 @@ object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with Mat } object CometRemainder extends CometExpressionSerde[Remainder] with MathBase { + + override def getSupportLevel(expr: Remainder): SupportLevel = { + if (expr.evalMode == EvalMode.ANSI) { + Incompatible(Some("ANSI mode is not supported")) + } else { + Compatible(None) + } + } + override def convert( expr: Remainder, inputs: Seq[Attribute], @@ -264,6 +318,14 @@ object CometRemainder extends CometExpressionSerde[Remainder] with MathBase { object CometRound extends CometExpressionSerde[Round] { + override def getSupportLevel(expr: Round): SupportLevel = { + if (expr.ansiEnabled) { + Incompatible(Some("ANSI mode is not supported")) + } else { + Compatible(None) + } + } + override def convert( r: Round, inputs: Seq[Attribute], diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8bad71e081..04b5fd164b 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1217,8 +1217,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { if (testAnsi) { // with ANSI enabled, we should produce the same exception as Spark withSQLConf( - (SQLConf.ANSI_ENABLED.key, "true"), - (CometConf.COMET_ANSI_MODE_ENABLED.key, "true")) { + SQLConf.ANSI_ENABLED.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { // cast() should throw exception on invalid inputs when ansi mode is enabled val df = data.withColumn("converted", col("a").cast(toType)) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 42058036e4..9789017041 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1334,7 +1334,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( SQLConf.ANSI_ENABLED.key -> "true", - CometConf.COMET_ANSI_MODE_ENABLED.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { testAbsAnsiOverflow(Seq((Byte.MaxValue, Byte.MinValue))) testAbsAnsiOverflow(Seq((Short.MaxValue, Short.MinValue))) testAbsAnsiOverflow(Seq((Int.MaxValue, Int.MinValue))) @@ -1944,7 +1944,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { def withAnsiMode(enabled: Boolean)(f: => Unit): Unit = { withSQLConf( SQLConf.ANSI_ENABLED.key -> enabled.toString, - CometConf.COMET_ANSI_MODE_ENABLED.key -> enabled.toString, + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> enabled.toString, CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true")(f) } @@ -2098,7 +2098,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { def withAnsiMode(enabled: Boolean)(f: => Unit): Unit = { withSQLConf( SQLConf.ANSI_ENABLED.key -> enabled.toString, - CometConf.COMET_ANSI_MODE_ENABLED.key -> enabled.toString, + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> enabled.toString, CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true")(f) } @@ -2161,7 +2161,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding", SQLConf.ANSI_ENABLED.key -> "true", - CometConf.COMET_ANSI_MODE_ENABLED.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true") { for (n <- Seq("2147483647", "-2147483648")) { @@ -2672,7 +2672,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { ansiEnabled => withSQLConf( - CometConf.COMET_ANSI_MODE_ENABLED.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.ANSI_ENABLED.key -> ansiEnabled.toString(), // Prevent the optimizer from collapsing an extract value of a create array SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> SimplifyExtractValueOps.ruleName) { From 76420875a9c24f26bde66ea6afb1b2b23e5de426 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 13:15:21 -0600 Subject: [PATCH 07/18] update wording --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 5f8417b79e..ce02ab288e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2368,13 +2368,13 @@ object QueryPlanSerde extends Logging with CometExprShim { sealed trait SupportLevel -/** We support this feature with full compatibility with Spark */ +/** Comet supports this feature with full (or close enough) compatibility with Spark */ case class Compatible(notes: Option[String] = None) extends SupportLevel -/** We support this feature but results can be different from Spark */ +/** Comet supports this feature but results can be different from Spark */ case class Incompatible(notes: Option[String] = None) extends SupportLevel -/** We do not support this feature */ +/** Comet does not support this feature */ object Unsupported extends SupportLevel /** From 0a0e1d300d48fde7dfa289716304a4dc7678a408 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 13:16:54 -0600 Subject: [PATCH 08/18] fix --- .../src/main/scala/org/apache/comet/expressions/CometCast.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index fcf22c4a04..2610344bf1 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -130,7 +130,7 @@ object CometCast { Compatible(Some("Only supports years between 262143 BC and 262142 AD")) case DataTypes.TimestampType if timeZoneId.exists(tz => tz != "UTC") => Incompatible(Some(s"Cast will use UTC instead of $timeZoneId")) - case DataTypes.TimestampType if evalMode == "ANSI" => + case DataTypes.TimestampType if evalMode == CometEvalMode.ANSI => Incompatible(Some("ANSI mode not supported")) case DataTypes.TimestampType => // https://github.com/apache/datafusion-comet/issues/328 From afbca2748cdde32a9f1ad1af3cbb1b5db2127979 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 14:01:20 -0600 Subject: [PATCH 09/18] refactor Cast serde --- .../scala/org/apache/comet/CometConf.scala | 8 ---- .../apache/comet/expressions/CometCast.scala | 41 ++++++++++++++++++- .../apache/comet/serde/QueryPlanSerde.scala | 15 +++---- .../org/apache/comet/CometCastSuite.scala | 14 +++---- .../apache/comet/CometExpressionSuite.scala | 20 ++++----- .../comet/CometStringExpressionSuite.scala | 4 +- .../comet/exec/CometAggregateSuite.scala | 6 +-- .../apache/comet/exec/CometExecSuite.scala | 4 +- .../sql/comet/CometPlanStabilitySuite.scala | 2 +- 9 files changed, 72 insertions(+), 42 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 367af5f199..02c8718945 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -624,14 +624,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = - conf("spark.comet.cast.allowIncompatible") - .doc( - "Comet is not currently fully compatible with Spark for all cast operations. " + - s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") - .booleanConf - .createWithDefault(false) - val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = conf("spark.comet.regexp.allowIncompatible") .doc( diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 2610344bf1..8e82ab2f9c 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -19,11 +19,16 @@ package org.apache.comet.expressions +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast} import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} -import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported} +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported} +import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType} +import org.apache.comet.shims.CometExprShim -object CometCast { +object CometCast extends CometExpressionSerde[Cast] with CometExprShim { def supportedTypes: Seq[DataType] = Seq( @@ -42,6 +47,38 @@ object CometCast { // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later // https://github.com/apache/datafusion-comet/issues/378 + override def getSupportLevel(cast: Cast): SupportLevel = { + isSupported(cast.child.dataType, cast.dataType, cast.timeZoneId, evalMode(cast)) + } + + override def convert( + cast: Cast, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(cast.child, inputs, binding) + if (childExpr.isDefined) { + serializeDataType(cast.dataType) match { + case Some(dataType) => + val castBuilder = ExprOuterClass.Cast.newBuilder() + castBuilder.setChild(childExpr.get) + castBuilder.setDatatype(dataType) + castBuilder.setEvalMode(evalModeToProto(evalMode(cast))) + castBuilder.setAllowIncompat(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) + castBuilder.setTimezone(cast.timeZoneId.getOrElse("UTC")) + Some( + ExprOuterClass.Expr + .newBuilder() + .setCast(castBuilder) + .build()) + case _ => + withInfo(cast, s"Unsupported datatype: ${cast.dataType}") + None + } + } else { + None + } + } + def isSupported( fromType: DataType, toType: DataType, diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index ce02ab288e..ade001dbb8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -169,7 +169,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[DateAdd] -> CometDateAdd, classOf[DateSub] -> CometDateSub, classOf[TruncDate] -> CometTruncDate, - classOf[TruncTimestamp] -> CometTruncTimestamp) + classOf[TruncTimestamp] -> CometTruncTimestamp, + classOf[Cast] -> CometCast) /** * Mapping of Spark aggregate expression class to Comet expression handler. @@ -517,6 +518,8 @@ object QueryPlanSerde extends Logging with CometExprShim { } } + // TODO this needs to be removed + /** * Wrap an expression in a cast. */ @@ -532,7 +535,7 @@ object QueryPlanSerde extends Logging with CometExprShim { castBuilder.setChild(childExpr) castBuilder.setDatatype(dataType) castBuilder.setEvalMode(evalModeToProto(evalMode)) - castBuilder.setAllowIncompat(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) + castBuilder.setAllowIncompat(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) castBuilder.setTimezone(timeZoneId.getOrElse("UTC")) Some( ExprOuterClass.Expr @@ -545,6 +548,7 @@ object QueryPlanSerde extends Logging with CometExprShim { } } + // TODO this needs to be removed def handleCast( expr: Expression, child: Expression, @@ -569,14 +573,14 @@ object QueryPlanSerde extends Logging with CometExprShim { case Compatible(_) => castToProto(expr, timeZoneId, dt, childExpr.get, evalMode) case Incompatible(reason) => - if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { + if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { logWarning(getIncompatMessage(reason)) castToProto(expr, timeZoneId, dt, childExpr.get, evalMode) } else { withInfo( expr, s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " + - s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true") None } case Unsupported => @@ -696,9 +700,6 @@ object QueryPlanSerde extends Logging with CometExprShim { Some(timeZoneId), CometEvalMode.TRY) - case c @ Cast(child, dt, timeZoneId, _) => - handleCast(expr, child, inputs, binding, dt, timeZoneId, evalMode(c)) - case EqualTo(left, right) => createBinaryExpr( expr, diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 04b5fd164b..621c71e3e1 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -419,7 +419,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast FloatType to DecimalType(10,2) - allow incompat") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { castTest(generateFloats(), DataTypes.createDecimalType(10, 2)) } } @@ -479,7 +479,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast DoubleType to DecimalType(10,2) - allow incompat") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { castTest(generateDoubles(), DataTypes.createDecimalType(10, 2)) } } @@ -646,7 +646,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast StringType to FloatType (partial support)") { withSQLConf( - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.ANSI_ENABLED.key -> "false") { castTest( gen.generateStrings(dataSize, "0123456789.", 8).toDF("a"), @@ -662,7 +662,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast StringType to DoubleType (partial support)") { withSQLConf( - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.ANSI_ENABLED.key -> "false") { castTest( gen.generateStrings(dataSize, "0123456789.", 8).toDF("a"), @@ -679,7 +679,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast StringType to DecimalType(10,2) (partial support)") { withSQLConf( - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.ANSI_ENABLED.key -> "false") { val values = gen .generateStrings(dataSize, "0123456789.", 8) @@ -768,7 +768,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { ignore("cast StringType to TimestampType") { // https://github.com/apache/datafusion-comet/issues/328 - withSQLConf((CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key, "true")) { + withSQLConf((CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key, "true")) { val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ gen.generateStrings( dataSize, timestampPattern, @@ -819,7 +819,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // test for invalid inputs withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val values = Seq("-9?", "1-", "0.5") castTimestampTest(values.toDF("a"), DataTypes.TimestampType) } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 9789017041..f98b18bf46 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -115,7 +115,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("Integral Division Overflow Handling Matches Spark Behavior") { withTable("t1") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val value = Long.MinValue sql("create table t1(c1 long, c2 short) using parquet") sql(s"insert into t1 values($value, -1)") @@ -533,7 +533,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -555,7 +555,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to string") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -577,7 +577,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to long, date") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -665,7 +665,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with timestamp_ntz") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -700,7 +700,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with format array") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val numRows = 1000 Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => @@ -1367,7 +1367,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq("true", "false").foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable( (-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), "tbl", @@ -1918,7 +1918,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2024,7 +2024,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2751,7 +2751,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // this test requires native_comet scan due to unsigned u8/u16 issue withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path1 = new Path(dir.toURI.toString, "test1.parquet") diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index fac80853fa..6d8fa28b04 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -51,7 +51,7 @@ class CometStringExpressionSuite extends CometTestBase { } test("Chr") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col varchar(20)) using parquet") @@ -64,7 +64,7 @@ class CometStringExpressionSuite extends CometTestBase { test("Chr with null character") { // test compatibility with Spark, spark supports chr(0) - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test0" withTable(table) { sql(s"create table $table(c9 int, c4 int) using parquet") diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index be7fe7ee52..6574d9568d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -978,7 +978,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("avg/sum overflow on decimal(38, _)") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "overflow_decimal_38" withTable(table) { sql(s"create table $table(a decimal(38, 2), b INT) using parquet") @@ -1019,7 +1019,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("final decimal avg") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { @@ -1090,7 +1090,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, CometConf.COMET_SHUFFLE_MODE.key -> "native", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") makeParquetFile(path, 1000, 20, dictionaryEnabled) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 339f90e81c..64e832826e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -504,7 +504,7 @@ class CometExecSuite extends CometTestBase { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { var column1 = s"CAST(max(_1) AS $subqueryType)" if (subqueryType == "BINARY") { @@ -1431,7 +1431,7 @@ class CometExecSuite extends CometTestBase { test("SPARK-33474: Support typed literals as partition spec values") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withTable("t1") { val binaryStr = "Spark SQL" val binaryHexStr = Hex.hex(UTF8String.fromString(binaryStr).getBytes).toString diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index aa80251490..9245a107dd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -273,7 +273,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString, CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { val qe = sql(queryString).queryExecution val plan = qe.executedPlan From 3cf99e1082ec296974579b0b7da141e631654916 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 14:02:47 -0600 Subject: [PATCH 10/18] fix --- .../org/apache/spark/sql/comet/CometPlanStabilitySuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index aa80251490..f331248598 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -273,6 +273,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString, CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", // needed for ANSI mode CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { val qe = sql(queryString).queryExecution From 59804633a54f3a3248cdd6e2bf13b93e1e896a95 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 14:34:45 -0600 Subject: [PATCH 11/18] remove handleCast --- .../apache/comet/expressions/CometCast.scala | 31 +++- .../apache/comet/serde/QueryPlanSerde.scala | 141 ++++-------------- .../org/apache/comet/serde/arithmetic.scala | 6 +- .../org/apache/comet/serde/strings.scala | 6 +- 4 files changed, 67 insertions(+), 117 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 8e82ab2f9c..c6a25e21ca 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -19,12 +19,13 @@ package org.apache.comet.expressions -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported} +import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType} import org.apache.comet.shims.CometExprShim @@ -79,6 +80,34 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { } } + /** + * Wrap an already serialized expression in a cast. + */ + def castToProto( + expr: Expression, + timeZoneId: Option[String], + dt: DataType, + childExpr: Expr, + evalMode: CometEvalMode.Value): Option[Expr] = { + serializeDataType(dt) match { + case Some(dataType) => + val castBuilder = ExprOuterClass.Cast.newBuilder() + castBuilder.setChild(childExpr) + castBuilder.setDatatype(dataType) + castBuilder.setEvalMode(evalModeToProto(evalMode)) + castBuilder.setAllowIncompat(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) + castBuilder.setTimezone(timeZoneId.getOrElse("UTC")) + Some( + ExprOuterClass.Expr + .newBuilder() + .setCast(castBuilder) + .build()) + case _ => + withInfo(expr, s"Unsupported datatype in castToProto: $dt") + None + } + } + def isSupported( fromType: DataType, toType: DataType, diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index ade001dbb8..57f76219f0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -518,84 +518,6 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - // TODO this needs to be removed - - /** - * Wrap an expression in a cast. - */ - def castToProto( - expr: Expression, - timeZoneId: Option[String], - dt: DataType, - childExpr: Expr, - evalMode: CometEvalMode.Value): Option[Expr] = { - serializeDataType(dt) match { - case Some(dataType) => - val castBuilder = ExprOuterClass.Cast.newBuilder() - castBuilder.setChild(childExpr) - castBuilder.setDatatype(dataType) - castBuilder.setEvalMode(evalModeToProto(evalMode)) - castBuilder.setAllowIncompat(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) - castBuilder.setTimezone(timeZoneId.getOrElse("UTC")) - Some( - ExprOuterClass.Expr - .newBuilder() - .setCast(castBuilder) - .build()) - case _ => - withInfo(expr, s"Unsupported datatype in castToProto: $dt") - None - } - } - - // TODO this needs to be removed - def handleCast( - expr: Expression, - child: Expression, - inputs: Seq[Attribute], - binding: Boolean, - dt: DataType, - timeZoneId: Option[String], - evalMode: CometEvalMode.Value): Option[Expr] = { - - val childExpr = exprToProtoInternal(child, inputs, binding) - if (childExpr.isDefined) { - val castSupport = - CometCast.isSupported(child.dataType, dt, timeZoneId, evalMode) - - def getIncompatMessage(reason: Option[String]): String = - "Comet does not guarantee correct results for cast " + - s"from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode $evalMode" + - reason.map(str => s" ($str)").getOrElse("") - - castSupport match { - case Compatible(_) => - castToProto(expr, timeZoneId, dt, childExpr.get, evalMode) - case Incompatible(reason) => - if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { - logWarning(getIncompatMessage(reason)) - castToProto(expr, timeZoneId, dt, childExpr.get, evalMode) - } else { - withInfo( - expr, - s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true") - None - } - case Unsupported => - withInfo( - expr, - s"Unsupported cast from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode $evalMode") - None - } - } else { - withInfo(expr, child) - None - } - } - /** * Convert a Spark expression to a protocol-buffer representation of a native Comet/DataFusion * expression. @@ -689,16 +611,11 @@ object QueryPlanSerde extends Logging with CometExprShim { val value = cast.eval() exprToProtoInternal(Literal(value, dataType), inputs, binding) + // TODO move this to shim layer case UnaryExpression(child) if expr.prettyName == "trycast" => val timeZoneId = SQLConf.get.sessionLocalTimeZone - handleCast( - expr, - child, - inputs, - binding, - expr.dataType, - Some(timeZoneId), - CometEvalMode.TRY) + val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY) + convert(cast, CometCast) case EqualTo(left, right) => createBinaryExpr( @@ -884,33 +801,37 @@ object QueryPlanSerde extends Logging with CometExprShim { val child = expr.asInstanceOf[UnaryExpression].child val timezoneId = expr.asInstanceOf[TimeZoneAwareExpression].timeZoneId - handleCast( - expr, - child, - inputs, - binding, + // TODO this is duplicating logic in the `convert` method + val castSupported = CometCast.isSupported( + child.dataType, DataTypes.StringType, timezoneId, - CometEvalMode.TRY) match { - case Some(_) => - exprToProtoInternal(child, inputs, binding) match { - case Some(p) => - val toPrettyString = ExprOuterClass.ToPrettyString + CometEvalMode.TRY) + val x = castSupported match { + case Compatible(_) => true + case Incompatible(notes) => true + case _ => false + } + + if (x) { + exprToProtoInternal(child, inputs, binding) match { + case Some(p) => + val toPrettyString = ExprOuterClass.ToPrettyString + .newBuilder() + .setChild(p) + .setTimezone(timezoneId.getOrElse("UTC")) + .build() + Some( + ExprOuterClass.Expr .newBuilder() - .setChild(p) - .setTimezone(timezoneId.getOrElse("UTC")) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToPrettyString(toPrettyString) - .build()) - case _ => - withInfo(expr, child) - None - } - case None => - None + .setToPrettyString(toPrettyString) + .build()) + case _ => + withInfo(expr, child) + None + } + } else { + None } case StructsToJson(options, child, timezoneId) => diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index e2d4f9968d..52a9370386 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Cast, Divide, import org.apache.spark.sql.types.{ByteType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType} import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.QueryPlanSerde.{castToProto, evalModeToProto, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, serializeDataType} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, serializeDataType} import org.apache.comet.shims.CometEvalModeUtil trait MathBase { @@ -274,7 +274,7 @@ object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with Mat } // cast result to long - castToProto(expr, None, LongType, childExpr.get, CometEvalMode.LEGACY) + CometCast.castToProto(expr, None, LongType, childExpr.get, CometEvalMode.LEGACY) } else { None } diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index de6284a2fd..15046fe092 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -26,9 +26,9 @@ import org.apache.spark.sql.types.{DataTypes, LongType, StringType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.{CometEvalMode, RegExp} +import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{castToProto, createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} +import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} object CometStringRepeat extends CometExpressionSerde[StringRepeat] { @@ -187,7 +187,7 @@ trait CommonStringExprs { // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls // for invalid strings. // Left child is the binary expression. - castToProto( + CometCast.castToProto( expr, None, DataTypes.StringType, From aa90d494155d47fef423792b50591176e521fabf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 14:50:16 -0600 Subject: [PATCH 12/18] cleanup --- .../apache/comet/expressions/CometCast.scala | 18 +----------------- .../apache/comet/serde/QueryPlanSerde.scala | 1 - 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index c6a25e21ca..67541ca572 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -58,23 +58,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { binding: Boolean): Option[ExprOuterClass.Expr] = { val childExpr = exprToProtoInternal(cast.child, inputs, binding) if (childExpr.isDefined) { - serializeDataType(cast.dataType) match { - case Some(dataType) => - val castBuilder = ExprOuterClass.Cast.newBuilder() - castBuilder.setChild(childExpr.get) - castBuilder.setDatatype(dataType) - castBuilder.setEvalMode(evalModeToProto(evalMode(cast))) - castBuilder.setAllowIncompat(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) - castBuilder.setTimezone(cast.timeZoneId.getOrElse("UTC")) - Some( - ExprOuterClass.Expr - .newBuilder() - .setCast(castBuilder) - .build()) - case _ => - withInfo(cast, s"Unsupported datatype: ${cast.dataType}") - None - } + castToProto(cast, cast.timeZoneId, cast.dataType, childExpr.get, evalMode(cast)) } else { None } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 57f76219f0..55c05f40c5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -611,7 +611,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val value = cast.eval() exprToProtoInternal(Literal(value, dataType), inputs, binding) - // TODO move this to shim layer case UnaryExpression(child) if expr.prettyName == "trycast" => val timeZoneId = SQLConf.get.sessionLocalTimeZone val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY) From 81f11e281f912208ed0f2dd4009f9102098e01d1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 26 Aug 2025 16:52:55 -0600 Subject: [PATCH 13/18] Add notes to Unsupported --- .../apache/comet/expressions/CometCast.scala | 33 ++++++++++--------- .../apache/comet/serde/QueryPlanSerde.scala | 6 ++-- .../apache/comet/CometExpressionSuite.scala | 6 ++-- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 67541ca572..dff9ff0ae9 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -60,6 +60,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { if (childExpr.isDefined) { castToProto(cast, cast.timeZoneId, cast.dataType, childExpr.get, evalMode(cast)) } else { + withInfo(cast, cast.child) None } } @@ -112,7 +113,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType => Incompatible() case _ => - Unsupported + Unsupported(Some(s"Cast from $fromType to $toType is not supported")) } case (_: DecimalType, _: DecimalType) => Compatible() @@ -148,7 +149,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { } } Compatible() - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from $fromType to $toType is not supported")) } } @@ -186,7 +187,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { // https://github.com/apache/datafusion-comet/issues/328 Incompatible(Some("Not all valid formats are supported")) case _ => - Unsupported + Unsupported(Some(s"Cast from String to $toType is not supported")) } } @@ -221,13 +222,13 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { isSupported(field.dataType, DataTypes.StringType, timeZoneId, evalMode) match { case s: Incompatible => return s - case Unsupported => - return Unsupported + case u: Unsupported => + return u case _ => } } Compatible() - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from $fromType to String is not supported")) } } @@ -237,13 +238,13 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { DataTypes.IntegerType => // https://github.com/apache/datafusion-comet/issues/352 // this seems like an edge case that isn't important for us to support - Unsupported + Unsupported(Some(s"Cast from Timestamp to $toType is not supported")) case DataTypes.LongType => // https://github.com/apache/datafusion-comet/issues/352 Compatible() case DataTypes.StringType => Compatible() case DataTypes.DateType => Compatible() - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from Timestamp to $toType is not supported")) } } @@ -251,7 +252,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType => Compatible() - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from Boolean to $toType is not supported")) } private def canCastFromByte(toType: DataType): SupportLevel = toType match { @@ -262,7 +263,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() case _ => - Unsupported + Unsupported(Some(s"Cast from Byte to $toType is not supported")) } private def canCastFromShort(toType: DataType): SupportLevel = toType match { @@ -273,7 +274,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() case _ => - Unsupported + Unsupported(Some(s"Cast from Short to $toType is not supported")) } private def canCastFromInt(toType: DataType): SupportLevel = toType match { @@ -286,7 +287,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => Incompatible(Some("No overflow check")) case _ => - Unsupported + Unsupported(Some(s"Cast from Int to $toType is not supported")) } private def canCastFromLong(toType: DataType): SupportLevel = toType match { @@ -299,7 +300,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => Incompatible(Some("No overflow check")) case _ => - Unsupported + Unsupported(Some(s"Cast from Long to $toType is not supported")) } private def canCastFromFloat(toType: DataType): SupportLevel = toType match { @@ -309,7 +310,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => // https://github.com/apache/datafusion-comet/issues/1371 Incompatible(Some("There can be rounding differences")) - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from Float to $toType is not supported")) } private def canCastFromDouble(toType: DataType): SupportLevel = toType match { @@ -319,14 +320,14 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => // https://github.com/apache/datafusion-comet/issues/1371 Incompatible(Some("There can be rounding differences")) - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from Double to $toType is not supported")) } private def canCastFromDecimal(toType: DataType): SupportLevel = toType match { case DataTypes.FloatType | DataTypes.DoubleType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => Compatible() - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from Decimal to $toType is not supported")) } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 55c05f40c5..dd269dddb2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -570,8 +570,8 @@ object QueryPlanSerde extends Logging with CometExprShim { def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { handler.getSupportLevel(expr) match { - case Unsupported => - withInfo(expr, s"$expr is not supported.") + case Unsupported(notes) => + withInfo(expr, notes.getOrElse("")) None case Incompatible(notes) => if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { @@ -2296,7 +2296,7 @@ case class Compatible(notes: Option[String] = None) extends SupportLevel case class Incompatible(notes: Option[String] = None) extends SupportLevel /** Comet does not support this feature */ -object Unsupported extends SupportLevel +case class Unsupported(notes: Option[String]) extends SupportLevel /** * Trait for providing serialization logic for operators. diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index f98b18bf46..1e86b9a0f1 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1856,7 +1856,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq( ( s"SELECT cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as C from $table", - Set("make_interval is not supported")), + Set("Cast from CalendarIntervalType to String is not supported")), ( "SELECT " + "date_part('YEAR', make_interval(c0, c1, c0, c1, c0, c0, c2))" @@ -1875,8 +1875,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as casted from $table) as B " + "where A.c1 = B.c1 ", Set( - "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled", - "make_interval is not supported")), + "Cast from CalendarIntervalType to String is not supported", + "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled")), (s"select * from $table LIMIT 10 OFFSET 3", Set("Comet shuffle is not enabled"))) .foreach(test => { val qry = test._1 From 9669ef912e231cb34b4707d05f194c11b00d0ff6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 27 Aug 2025 13:14:52 -0600 Subject: [PATCH 14/18] prep for review --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e99c8e1781..7ac97ac67e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -800,19 +800,19 @@ object QueryPlanSerde extends Logging with CometExprShim { val child = expr.asInstanceOf[UnaryExpression].child val timezoneId = expr.asInstanceOf[TimeZoneAwareExpression].timeZoneId - // TODO this is duplicating logic in the `convert` method val castSupported = CometCast.isSupported( child.dataType, DataTypes.StringType, timezoneId, CometEvalMode.TRY) - val x = castSupported match { + + val isCastSupported = castSupported match { case Compatible(_) => true - case Incompatible(notes) => true + case Incompatible(_) => true case _ => false } - if (x) { + if (isCastSupported) { exprToProtoInternal(child, inputs, binding) match { case Some(p) => val toPrettyString = ExprOuterClass.ToPrettyString From 03a60fc5b2a194456acc3aa5b7bd90f7c3ec08d0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 27 Aug 2025 17:42:57 -0600 Subject: [PATCH 15/18] Update QueryPlanSerde.scala Co-authored-by: Oleks V --- .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7ac97ac67e..f1d4e4adbe 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2307,7 +2307,7 @@ case class Compatible(notes: Option[String] = None) extends SupportLevel case class Incompatible(notes: Option[String] = None) extends SupportLevel /** Comet does not support this feature */ -case class Unsupported(notes: Option[String]) extends SupportLevel +case class Unsupported(notes: Option[String] = None) extends SupportLevel /** * Trait for providing serialization logic for operators. From 3736758d45f320ac7298041420cf61569fcfb248 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 29 Aug 2025 17:12:18 -0600 Subject: [PATCH 16/18] address feedback --- .../apache/comet/expressions/CometCast.scala | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index dff9ff0ae9..7bc352831a 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -113,7 +113,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType => Incompatible() case _ => - Unsupported(Some(s"Cast from $fromType to $toType is not supported")) + unsupported(fromType, toType) } case (_: DecimalType, _: DecimalType) => Compatible() @@ -149,7 +149,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { } } Compatible() - case _ => Unsupported(Some(s"Cast from $fromType to $toType is not supported")) + case _ => unsupported(fromType, toType) } } @@ -187,7 +187,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { // https://github.com/apache/datafusion-comet/issues/328 Incompatible(Some("Not all valid formats are supported")) case _ => - Unsupported(Some(s"Cast from String to $toType is not supported")) + unsupported(DataTypes.StringType, toType) } } @@ -228,7 +228,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { } } Compatible() - case _ => Unsupported(Some(s"Cast from $fromType to String is not supported")) + case _ => unsupported(fromType, DataTypes.StringType) } } @@ -238,13 +238,13 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { DataTypes.IntegerType => // https://github.com/apache/datafusion-comet/issues/352 // this seems like an edge case that isn't important for us to support - Unsupported(Some(s"Cast from Timestamp to $toType is not supported")) + unsupported(DataTypes.TimestampType, toType) case DataTypes.LongType => // https://github.com/apache/datafusion-comet/issues/352 Compatible() case DataTypes.StringType => Compatible() case DataTypes.DateType => Compatible() - case _ => Unsupported(Some(s"Cast from Timestamp to $toType is not supported")) + case _ => unsupported(DataTypes.TimestampType, toType) } } @@ -252,7 +252,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType => Compatible() - case _ => Unsupported(Some(s"Cast from Boolean to $toType is not supported")) + case _ => unsupported(DataTypes.BooleanType, toType) } private def canCastFromByte(toType: DataType): SupportLevel = toType match { @@ -263,7 +263,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() case _ => - Unsupported(Some(s"Cast from Byte to $toType is not supported")) + unsupported(DataTypes.ByteType, toType) } private def canCastFromShort(toType: DataType): SupportLevel = toType match { @@ -274,7 +274,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() case _ => - Unsupported(Some(s"Cast from Short to $toType is not supported")) + unsupported(DataTypes.ShortType, toType) } private def canCastFromInt(toType: DataType): SupportLevel = toType match { @@ -287,7 +287,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => Incompatible(Some("No overflow check")) case _ => - Unsupported(Some(s"Cast from Int to $toType is not supported")) + unsupported(DataTypes.IntegerType, toType) } private def canCastFromLong(toType: DataType): SupportLevel = toType match { @@ -300,7 +300,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => Incompatible(Some("No overflow check")) case _ => - Unsupported(Some(s"Cast from Long to $toType is not supported")) + unsupported(DataTypes.LongType, toType) } private def canCastFromFloat(toType: DataType): SupportLevel = toType match { @@ -310,7 +310,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => // https://github.com/apache/datafusion-comet/issues/1371 Incompatible(Some("There can be rounding differences")) - case _ => Unsupported(Some(s"Cast from Float to $toType is not supported")) + unsupported(DataTypes.FloatType, toType) } private def canCastFromDouble(toType: DataType): SupportLevel = toType match { @@ -320,14 +320,17 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => // https://github.com/apache/datafusion-comet/issues/1371 Incompatible(Some("There can be rounding differences")) - case _ => Unsupported(Some(s"Cast from Double to $toType is not supported")) + case _ => unsupported(DataTypes.DoubleType, toType) } private def canCastFromDecimal(toType: DataType): SupportLevel = toType match { case DataTypes.FloatType | DataTypes.DoubleType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => Compatible() - case _ => Unsupported(Some(s"Cast from Decimal to $toType is not supported")) + case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported")) } + private def unsupported(fromType: DataType, toType: DataType): Unsupported = { + Unsupported(Some(s"Cast from $fromType to $toType is not supported")) + } } From 9fc884c0ef04decf2773f01ef220a22b91e56603 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 1 Sep 2025 09:23:45 -0600 Subject: [PATCH 17/18] fix regression --- docs/source/user-guide/compatibility.md | 2 +- docs/source/user-guide/configs.md | 1 - .../src/main/scala/org/apache/comet/expressions/CometCast.scala | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index e23dbf238f..5d464692f2 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -75,7 +75,7 @@ The `native_datafusion` scan has some additional limitations: ### S3 Support with `native_iceberg_compat` -- When using the default AWS S3 endpoint (no custom endpoint configured), a valid region is required. Comet +- When using the default AWS S3 endpoint (no custom endpoint configured), a valid region is required. Comet will attempt to resolve the region if it is not provided. ## ANSI Mode diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 434c1934fb..bbfdf29844 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -28,7 +28,6 @@ Comet provides the following configuration settings. |--------|-------------|---------------| | spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 | | spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | -| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false | | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 7bc352831a..3ea4882563 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -310,6 +310,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _: DecimalType => // https://github.com/apache/datafusion-comet/issues/1371 Incompatible(Some("There can be rounding differences")) + case _ => unsupported(DataTypes.FloatType, toType) } From 1b724af9cf64d50ebaaaed77a2fada3bb317b8d7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Sep 2025 16:45:55 -0600 Subject: [PATCH 18/18] fix regression --- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 1e86b9a0f1..bf1733b3ff 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1856,7 +1856,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq( ( s"SELECT cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as C from $table", - Set("Cast from CalendarIntervalType to String is not supported")), + Set("Cast from CalendarIntervalType to StringType is not supported")), ( "SELECT " + "date_part('YEAR', make_interval(c0, c1, c0, c1, c0, c0, c2))" @@ -1875,7 +1875,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as casted from $table) as B " + "where A.c1 = B.c1 ", Set( - "Cast from CalendarIntervalType to String is not supported", + "Cast from CalendarIntervalType to StringType is not supported", "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled")), (s"select * from $table LIMIT 10 OFFSET 3", Set("Comet shuffle is not enabled"))) .foreach(test => {