Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -624,14 +624,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.cast.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all cast operations. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
.doc(
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ The `native_datafusion` scan has some additional limitations:

### S3 Support with `native_iceberg_compat`

- When using the default AWS S3 endpoint (no custom endpoint configured), a valid region is required. Comet
- When using the default AWS S3 endpoint (no custom endpoint configured), a valid region is required. Comet
will attempt to resolve the region if it is not provided.

## ANSI Mode
Expand Down
1 change: 0 additions & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ Comet provides the following configuration settings.
|--------|-------------|---------------|
| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 |
| spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false |
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 |
Expand Down
91 changes: 73 additions & 18 deletions spark/src/main/scala/org/apache/comet/expressions/CometCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

package org.apache.comet.expressions

import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression}
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType}

import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported}
import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported}
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType}
import org.apache.comet.shims.CometExprShim

object CometCast {
object CometCast extends CometExpressionSerde[Cast] with CometExprShim {

def supportedTypes: Seq[DataType] =
Seq(
Expand All @@ -42,6 +48,51 @@ object CometCast {
// TODO add DataTypes.TimestampNTZType for Spark 3.4 and later
// https://github.com/apache/datafusion-comet/issues/378

override def getSupportLevel(cast: Cast): SupportLevel = {
isSupported(cast.child.dataType, cast.dataType, cast.timeZoneId, evalMode(cast))
}

override def convert(
cast: Cast,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val childExpr = exprToProtoInternal(cast.child, inputs, binding)
if (childExpr.isDefined) {
castToProto(cast, cast.timeZoneId, cast.dataType, childExpr.get, evalMode(cast))
} else {
withInfo(cast, cast.child)
None
}
}

/**
* Wrap an already serialized expression in a cast.
*/
def castToProto(
expr: Expression,
timeZoneId: Option[String],
dt: DataType,
childExpr: Expr,
evalMode: CometEvalMode.Value): Option[Expr] = {
serializeDataType(dt) match {
case Some(dataType) =>
val castBuilder = ExprOuterClass.Cast.newBuilder()
castBuilder.setChild(childExpr)
castBuilder.setDatatype(dataType)
castBuilder.setEvalMode(evalModeToProto(evalMode))
castBuilder.setAllowIncompat(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get())
castBuilder.setTimezone(timeZoneId.getOrElse("UTC"))
Some(
ExprOuterClass.Expr
.newBuilder()
.setCast(castBuilder)
.build())
case _ =>
withInfo(expr, s"Unsupported datatype in castToProto: $dt")
None
}
}

def isSupported(
fromType: DataType,
toType: DataType,
Expand All @@ -62,7 +113,7 @@ object CometCast {
case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType =>
Incompatible()
case _ =>
Unsupported
unsupported(fromType, toType)
}
case (_: DecimalType, _: DecimalType) =>
Compatible()
Expand Down Expand Up @@ -98,7 +149,7 @@ object CometCast {
}
}
Compatible()
case _ => Unsupported
case _ => unsupported(fromType, toType)
}
}

Expand Down Expand Up @@ -136,7 +187,7 @@ object CometCast {
// https://github.com/apache/datafusion-comet/issues/328
Incompatible(Some("Not all valid formats are supported"))
case _ =>
Unsupported
unsupported(DataTypes.StringType, toType)
}
}

Expand Down Expand Up @@ -171,13 +222,13 @@ object CometCast {
isSupported(field.dataType, DataTypes.StringType, timeZoneId, evalMode) match {
case s: Incompatible =>
return s
case Unsupported =>
return Unsupported
case u: Unsupported =>
return u
case _ =>
}
}
Compatible()
case _ => Unsupported
case _ => unsupported(fromType, DataTypes.StringType)
}
}

Expand All @@ -187,21 +238,21 @@ object CometCast {
DataTypes.IntegerType =>
// https://github.com/apache/datafusion-comet/issues/352
// this seems like an edge case that isn't important for us to support
Unsupported
unsupported(DataTypes.TimestampType, toType)
case DataTypes.LongType =>
// https://github.com/apache/datafusion-comet/issues/352
Compatible()
case DataTypes.StringType => Compatible()
case DataTypes.DateType => Compatible()
case _ => Unsupported
case _ => unsupported(DataTypes.TimestampType, toType)
}
}

private def canCastFromBoolean(toType: DataType): SupportLevel = toType match {
case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType |
DataTypes.FloatType | DataTypes.DoubleType =>
Compatible()
case _ => Unsupported
case _ => unsupported(DataTypes.BooleanType, toType)
}

private def canCastFromByte(toType: DataType): SupportLevel = toType match {
Expand All @@ -212,7 +263,7 @@ object CometCast {
case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType =>
Compatible()
case _ =>
Unsupported
unsupported(DataTypes.ByteType, toType)
}

private def canCastFromShort(toType: DataType): SupportLevel = toType match {
Expand All @@ -223,7 +274,7 @@ object CometCast {
case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType =>
Compatible()
case _ =>
Unsupported
unsupported(DataTypes.ShortType, toType)
}

private def canCastFromInt(toType: DataType): SupportLevel = toType match {
Expand All @@ -236,7 +287,7 @@ object CometCast {
case _: DecimalType =>
Incompatible(Some("No overflow check"))
case _ =>
Unsupported
unsupported(DataTypes.IntegerType, toType)
}

private def canCastFromLong(toType: DataType): SupportLevel = toType match {
Expand All @@ -249,7 +300,7 @@ object CometCast {
case _: DecimalType =>
Incompatible(Some("No overflow check"))
case _ =>
Unsupported
unsupported(DataTypes.LongType, toType)
}

private def canCastFromFloat(toType: DataType): SupportLevel = toType match {
Expand All @@ -259,7 +310,8 @@ object CometCast {
case _: DecimalType =>
// https://github.com/apache/datafusion-comet/issues/1371
Incompatible(Some("There can be rounding differences"))
case _ => Unsupported
case _ =>
unsupported(DataTypes.FloatType, toType)
}

private def canCastFromDouble(toType: DataType): SupportLevel = toType match {
Expand All @@ -269,14 +321,17 @@ object CometCast {
case _: DecimalType =>
// https://github.com/apache/datafusion-comet/issues/1371
Incompatible(Some("There can be rounding differences"))
case _ => Unsupported
case _ => unsupported(DataTypes.DoubleType, toType)
}

private def canCastFromDecimal(toType: DataType): SupportLevel = toType match {
case DataTypes.FloatType | DataTypes.DoubleType | DataTypes.ByteType | DataTypes.ShortType |
DataTypes.IntegerType | DataTypes.LongType =>
Compatible()
case _ => Unsupported
case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported"))
}

private def unsupported(fromType: DataType, toType: DataType): Unsupported = {
Unsupported(Some(s"Cast from $fromType to $toType is not supported"))
}
}
Loading
Loading