diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 367af5f199..02c8718945 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -624,14 +624,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = - conf("spark.comet.cast.allowIncompatible") - .doc( - "Comet is not currently fully compatible with Spark for all cast operations. " + - s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") - .booleanConf - .createWithDefault(false) - val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = conf("spark.comet.regexp.allowIncompatible") .doc( diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index e23dbf238f..5d464692f2 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -75,7 +75,7 @@ The `native_datafusion` scan has some additional limitations: ### S3 Support with `native_iceberg_compat` -- When using the default AWS S3 endpoint (no custom endpoint configured), a valid region is required. Comet +- When using the default AWS S3 endpoint (no custom endpoint configured), a valid region is required. Comet will attempt to resolve the region if it is not provided. ## ANSI Mode diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 434c1934fb..bbfdf29844 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -28,7 +28,6 @@ Comet provides the following configuration settings. |--------|-------------|---------------| | spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 | | spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | -| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false | | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 2610344bf1..3ea4882563 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -19,11 +19,17 @@ package org.apache.comet.expressions +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} -import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported} +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported} +import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType} +import org.apache.comet.shims.CometExprShim -object CometCast { +object CometCast extends CometExpressionSerde[Cast] with CometExprShim { def supportedTypes: Seq[DataType] = Seq( @@ -42,6 +48,51 @@ object CometCast { // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later // https://github.com/apache/datafusion-comet/issues/378 + override def getSupportLevel(cast: Cast): SupportLevel = { + isSupported(cast.child.dataType, cast.dataType, cast.timeZoneId, evalMode(cast)) + } + + override def convert( + cast: Cast, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(cast.child, inputs, binding) + if (childExpr.isDefined) { + castToProto(cast, cast.timeZoneId, cast.dataType, childExpr.get, evalMode(cast)) + } else { + withInfo(cast, cast.child) + None + } + } + + /** + * Wrap an already serialized expression in a cast. + */ + def castToProto( + expr: Expression, + timeZoneId: Option[String], + dt: DataType, + childExpr: Expr, + evalMode: CometEvalMode.Value): Option[Expr] = { + serializeDataType(dt) match { + case Some(dataType) => + val castBuilder = ExprOuterClass.Cast.newBuilder() + castBuilder.setChild(childExpr) + castBuilder.setDatatype(dataType) + castBuilder.setEvalMode(evalModeToProto(evalMode)) + castBuilder.setAllowIncompat(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) + castBuilder.setTimezone(timeZoneId.getOrElse("UTC")) + Some( + ExprOuterClass.Expr + .newBuilder() + .setCast(castBuilder) + .build()) + case _ => + withInfo(expr, s"Unsupported datatype in castToProto: $dt") + None + } + } + def isSupported( fromType: DataType, toType: DataType, @@ -62,7 +113,7 @@ object CometCast { case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType => Incompatible() case _ => - Unsupported + unsupported(fromType, toType) } case (_: DecimalType, _: DecimalType) => Compatible() @@ -98,7 +149,7 @@ object CometCast { } } Compatible() - case _ => Unsupported + case _ => unsupported(fromType, toType) } } @@ -136,7 +187,7 @@ object CometCast { // https://github.com/apache/datafusion-comet/issues/328 Incompatible(Some("Not all valid formats are supported")) case _ => - Unsupported + unsupported(DataTypes.StringType, toType) } } @@ -171,13 +222,13 @@ object CometCast { isSupported(field.dataType, DataTypes.StringType, timeZoneId, evalMode) match { case s: Incompatible => return s - case Unsupported => - return Unsupported + case u: Unsupported => + return u case _ => } } Compatible() - case _ => Unsupported + case _ => unsupported(fromType, DataTypes.StringType) } } @@ -187,13 +238,13 @@ object CometCast { DataTypes.IntegerType => // https://github.com/apache/datafusion-comet/issues/352 // this seems like an edge case that isn't important for us to support - Unsupported + unsupported(DataTypes.TimestampType, toType) case DataTypes.LongType => // https://github.com/apache/datafusion-comet/issues/352 Compatible() case DataTypes.StringType => Compatible() case DataTypes.DateType => Compatible() - case _ => Unsupported + case _ => unsupported(DataTypes.TimestampType, toType) } } @@ -201,7 +252,7 @@ object CometCast { case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType => Compatible() - case _ => Unsupported + case _ => unsupported(DataTypes.BooleanType, toType) } private def canCastFromByte(toType: DataType): SupportLevel = toType match { @@ -212,7 +263,7 @@ object CometCast { case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() case _ => - Unsupported + unsupported(DataTypes.ByteType, toType) } private def canCastFromShort(toType: DataType): SupportLevel = toType match { @@ -223,7 +274,7 @@ object CometCast { case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() case _ => - Unsupported + unsupported(DataTypes.ShortType, toType) } private def canCastFromInt(toType: DataType): SupportLevel = toType match { @@ -236,7 +287,7 @@ object CometCast { case _: DecimalType => Incompatible(Some("No overflow check")) case _ => - Unsupported + unsupported(DataTypes.IntegerType, toType) } private def canCastFromLong(toType: DataType): SupportLevel = toType match { @@ -249,7 +300,7 @@ object CometCast { case _: DecimalType => Incompatible(Some("No overflow check")) case _ => - Unsupported + unsupported(DataTypes.LongType, toType) } private def canCastFromFloat(toType: DataType): SupportLevel = toType match { @@ -259,7 +310,8 @@ object CometCast { case _: DecimalType => // https://github.com/apache/datafusion-comet/issues/1371 Incompatible(Some("There can be rounding differences")) - case _ => Unsupported + case _ => + unsupported(DataTypes.FloatType, toType) } private def canCastFromDouble(toType: DataType): SupportLevel = toType match { @@ -269,14 +321,17 @@ object CometCast { case _: DecimalType => // https://github.com/apache/datafusion-comet/issues/1371 Incompatible(Some("There can be rounding differences")) - case _ => Unsupported + case _ => unsupported(DataTypes.DoubleType, toType) } private def canCastFromDecimal(toType: DataType): SupportLevel = toType match { case DataTypes.FloatType | DataTypes.DoubleType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => Compatible() - case _ => Unsupported + case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported")) } + private def unsupported(fromType: DataType, toType: DataType): Unsupported = { + Unsupported(Some(s"Cast from $fromType to $toType is not supported")) + } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index bfb3b853b2..2719d8c052 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -178,6 +178,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[DateSub] -> CometDateSub, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, + classOf[Cast] -> CometCast, classOf[CreateNamedStruct] -> CometCreateNamedStruct, classOf[GetStructField] -> CometGetStructField, classOf[GetArrayStructFields] -> CometGetArrayStructFields, @@ -541,81 +542,6 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - /** - * Wrap an expression in a cast. - */ - def castToProto( - expr: Expression, - timeZoneId: Option[String], - dt: DataType, - childExpr: Expr, - evalMode: CometEvalMode.Value): Option[Expr] = { - serializeDataType(dt) match { - case Some(dataType) => - val castBuilder = ExprOuterClass.Cast.newBuilder() - castBuilder.setChild(childExpr) - castBuilder.setDatatype(dataType) - castBuilder.setEvalMode(evalModeToProto(evalMode)) - castBuilder.setAllowIncompat(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) - castBuilder.setTimezone(timeZoneId.getOrElse("UTC")) - Some( - ExprOuterClass.Expr - .newBuilder() - .setCast(castBuilder) - .build()) - case _ => - withInfo(expr, s"Unsupported datatype in castToProto: $dt") - None - } - } - - def handleCast( - expr: Expression, - child: Expression, - inputs: Seq[Attribute], - binding: Boolean, - dt: DataType, - timeZoneId: Option[String], - evalMode: CometEvalMode.Value): Option[Expr] = { - - val childExpr = exprToProtoInternal(child, inputs, binding) - if (childExpr.isDefined) { - val castSupport = - CometCast.isSupported(child.dataType, dt, timeZoneId, evalMode) - - def getIncompatMessage(reason: Option[String]): String = - "Comet does not guarantee correct results for cast " + - s"from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode $evalMode" + - reason.map(str => s" ($str)").getOrElse("") - - castSupport match { - case Compatible(_) => - castToProto(expr, timeZoneId, dt, childExpr.get, evalMode) - case Incompatible(reason) => - if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { - logWarning(getIncompatMessage(reason)) - castToProto(expr, timeZoneId, dt, childExpr.get, evalMode) - } else { - withInfo( - expr, - s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " + - s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") - None - } - case Unsupported => - withInfo( - expr, - s"Unsupported cast from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode $evalMode") - None - } - } else { - withInfo(expr, child) - None - } - } - /** * Convert a Spark expression to a protocol-buffer representation of a native Comet/DataFusion * expression. @@ -668,8 +594,8 @@ object QueryPlanSerde extends Logging with CometExprShim { def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { handler.getSupportLevel(expr) match { - case Unsupported => - withInfo(expr, s"$expr is not supported.") + case Unsupported(notes) => + withInfo(expr, notes.getOrElse("")) None case Incompatible(notes) => if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { @@ -711,17 +637,8 @@ object QueryPlanSerde extends Logging with CometExprShim { case UnaryExpression(child) if expr.prettyName == "trycast" => val timeZoneId = SQLConf.get.sessionLocalTimeZone - handleCast( - expr, - child, - inputs, - binding, - expr.dataType, - Some(timeZoneId), - CometEvalMode.TRY) - - case c @ Cast(child, dt, timeZoneId, _) => - handleCast(expr, child, inputs, binding, dt, timeZoneId, evalMode(c)) + val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY) + convert(cast, CometCast) case Literal(value, dataType) if supportedDataType( @@ -871,33 +788,37 @@ object QueryPlanSerde extends Logging with CometExprShim { val child = expr.asInstanceOf[UnaryExpression].child val timezoneId = expr.asInstanceOf[TimeZoneAwareExpression].timeZoneId - handleCast( - expr, - child, - inputs, - binding, + val castSupported = CometCast.isSupported( + child.dataType, DataTypes.StringType, timezoneId, - CometEvalMode.TRY) match { - case Some(_) => - exprToProtoInternal(child, inputs, binding) match { - case Some(p) => - val toPrettyString = ExprOuterClass.ToPrettyString + CometEvalMode.TRY) + + val isCastSupported = castSupported match { + case Compatible(_) => true + case Incompatible(_) => true + case _ => false + } + + if (isCastSupported) { + exprToProtoInternal(child, inputs, binding) match { + case Some(p) => + val toPrettyString = ExprOuterClass.ToPrettyString + .newBuilder() + .setChild(p) + .setTimezone(timezoneId.getOrElse("UTC")) + .build() + Some( + ExprOuterClass.Expr .newBuilder() - .setChild(p) - .setTimezone(timezoneId.getOrElse("UTC")) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToPrettyString(toPrettyString) - .build()) - case _ => - withInfo(expr, child) - None - } - case None => - None + .setToPrettyString(toPrettyString) + .build()) + case _ => + withInfo(expr, child) + None + } + } else { + None } case SortOrder(child, direction, nullOrdering, _) => @@ -2083,7 +2004,7 @@ case class Compatible(notes: Option[String] = None) extends SupportLevel case class Incompatible(notes: Option[String] = None) extends SupportLevel /** Comet does not support this feature */ -object Unsupported extends SupportLevel +case class Unsupported(notes: Option[String] = None) extends SupportLevel /** * Trait for providing serialization logic for operators. diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index e2d4f9968d..52a9370386 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Cast, Divide, import org.apache.spark.sql.types.{ByteType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType} import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.QueryPlanSerde.{castToProto, evalModeToProto, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, serializeDataType} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, serializeDataType} import org.apache.comet.shims.CometEvalModeUtil trait MathBase { @@ -274,7 +274,7 @@ object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with Mat } // cast result to long - castToProto(expr, None, LongType, childExpr.get, CometEvalMode.LEGACY) + CometCast.castToProto(expr, None, LongType, childExpr.get, CometEvalMode.LEGACY) } else { None } diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index de6284a2fd..15046fe092 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -26,9 +26,9 @@ import org.apache.spark.sql.types.{DataTypes, LongType, StringType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.{CometEvalMode, RegExp} +import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{castToProto, createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} +import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} object CometStringRepeat extends CometExpressionSerde[StringRepeat] { @@ -187,7 +187,7 @@ trait CommonStringExprs { // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls // for invalid strings. // Left child is the binary expression. - castToProto( + CometCast.castToProto( expr, None, DataTypes.StringType, diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 33aadcf153..772cc064ad 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -419,7 +419,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast FloatType to DecimalType(10,2) - allow incompat") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { castTest(generateFloats(), DataTypes.createDecimalType(10, 2)) } } @@ -479,7 +479,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast DoubleType to DecimalType(10,2) - allow incompat") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { castTest(generateDoubles(), DataTypes.createDecimalType(10, 2)) } } @@ -646,7 +646,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast StringType to FloatType (partial support)") { withSQLConf( - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.ANSI_ENABLED.key -> "false") { castTest( gen.generateStrings(dataSize, "0123456789.", 8).toDF("a"), @@ -662,7 +662,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast StringType to DoubleType (partial support)") { withSQLConf( - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.ANSI_ENABLED.key -> "false") { castTest( gen.generateStrings(dataSize, "0123456789.", 8).toDF("a"), @@ -679,7 +679,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast StringType to DecimalType(10,2) (partial support)") { withSQLConf( - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.ANSI_ENABLED.key -> "false") { val values = gen .generateStrings(dataSize, "0123456789.", 8) @@ -768,7 +768,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { ignore("cast StringType to TimestampType") { // https://github.com/apache/datafusion-comet/issues/328 - withSQLConf((CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key, "true")) { + withSQLConf((CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key, "true")) { val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ gen.generateStrings( dataSize, timestampPattern, @@ -819,7 +819,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // test for invalid inputs withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val values = Seq("-9?", "1-", "0.5") castTimestampTest(values.toDF("a"), DataTypes.TimestampType) } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 9789017041..bf1733b3ff 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -115,7 +115,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("Integral Division Overflow Handling Matches Spark Behavior") { withTable("t1") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val value = Long.MinValue sql("create table t1(c1 long, c2 short) using parquet") sql(s"insert into t1 values($value, -1)") @@ -533,7 +533,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -555,7 +555,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to string") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -577,7 +577,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to long, date") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -665,7 +665,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with timestamp_ntz") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -700,7 +700,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with format array") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val numRows = 1000 Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => @@ -1367,7 +1367,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq("true", "false").foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable( (-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), "tbl", @@ -1856,7 +1856,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq( ( s"SELECT cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as C from $table", - Set("make_interval is not supported")), + Set("Cast from CalendarIntervalType to StringType is not supported")), ( "SELECT " + "date_part('YEAR', make_interval(c0, c1, c0, c1, c0, c0, c2))" @@ -1875,8 +1875,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as casted from $table) as B " + "where A.c1 = B.c1 ", Set( - "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled", - "make_interval is not supported")), + "Cast from CalendarIntervalType to StringType is not supported", + "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled")), (s"select * from $table LIMIT 10 OFFSET 3", Set("Comet shuffle is not enabled"))) .foreach(test => { val qry = test._1 @@ -1918,7 +1918,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2024,7 +2024,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2751,7 +2751,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // this test requires native_comet scan due to unsigned u8/u16 issue withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path1 = new Path(dir.toURI.toString, "test1.parquet") diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index fac80853fa..6d8fa28b04 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -51,7 +51,7 @@ class CometStringExpressionSuite extends CometTestBase { } test("Chr") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col varchar(20)) using parquet") @@ -64,7 +64,7 @@ class CometStringExpressionSuite extends CometTestBase { test("Chr with null character") { // test compatibility with Spark, spark supports chr(0) - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test0" withTable(table) { sql(s"create table $table(c9 int, c4 int) using parquet") diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index be7fe7ee52..6574d9568d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -978,7 +978,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("avg/sum overflow on decimal(38, _)") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "overflow_decimal_38" withTable(table) { sql(s"create table $table(a decimal(38, 2), b INT) using parquet") @@ -1019,7 +1019,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("final decimal avg") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { @@ -1090,7 +1090,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, CometConf.COMET_SHUFFLE_MODE.key -> "native", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") makeParquetFile(path, 1000, 20, dictionaryEnabled) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 339f90e81c..64e832826e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -504,7 +504,7 @@ class CometExecSuite extends CometTestBase { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { var column1 = s"CAST(max(_1) AS $subqueryType)" if (subqueryType == "BINARY") { @@ -1431,7 +1431,7 @@ class CometExecSuite extends CometTestBase { test("SPARK-33474: Support typed literals as partition spec values") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withTable("t1") { val binaryStr = "Spark SQL" val binaryHexStr = Hex.hex(UTF8String.fromString(binaryStr).getBytes).toString diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 9154840c9c..5117859448 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -267,8 +267,9 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true", - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", // needed for Spark 4.0.0 / ANSI - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 + // COMET_EXPR_ALLOW_INCOMPATIBLE is needed for Spark 4.0.0 / ANSI support + // as well as for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { val qe = sql(queryString).queryExecution val plan = qe.executedPlan