diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index 9adf829580..901702c378 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -24,8 +24,9 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE} +import org.apache.comet.shims.CometTypeShim -trait DataTypeSupport { +trait DataTypeSupport extends CometTypeShim { /** * Checks if this schema is supported by checking if each field in the schema is supported. @@ -49,6 +50,10 @@ trait DataTypeSupport { fallbackReasons: ListBuffer[String]): Boolean = { dt match { + case dt if isStringCollationType(dt) => + // we don't need specific support for collation in scans, but this + // is a convenient place to force the whole query to fall back to Spark for now + false case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType | StringType | _: DecimalType | DateType | TimestampType | TimestampNTZType => diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 69bce75559..c0f5040fdd 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -48,7 +48,7 @@ import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti import org.apache.comet.objectstore.NativeConfig import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet} import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} -import org.apache.comet.serde.operator.CometNativeScan +import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan} import org.apache.comet.shims.CometTypeShim /** @@ -272,9 +272,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com return withInfos(scanExec, fallbackReasons.toSet) } - val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION) val schemaSupported = - typeChecker.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons) + CometNativeScan.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons) if (!schemaSupported) { fallbackReasons += "Comet extension is not enabled for " + @@ -586,40 +585,17 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3" } - val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT) val schemaSupported = - typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) + CometIcebergNativeScan.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) val partitionSchemaSupported = - typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) - - def hasUnsupportedType(dataType: DataType): Boolean = { - dataType match { - case s: StructType => s.exists(field => hasUnsupportedType(field.dataType)) - case a: ArrayType => hasUnsupportedType(a.elementType) - case m: MapType => - // maps containing complex types are not supported - isComplexType(m.keyType) || isComplexType(m.valueType) || - hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType) - case dt if isStringCollationType(dt) => true - case _ => false - } - } - - val knownIssues = - scanExec.requiredSchema.exists(field => hasUnsupportedType(field.dataType)) || - partitionSchema.exists(field => hasUnsupportedType(field.dataType)) - - if (knownIssues) { - fallbackReasons += "Schema contains data types that are not supported by " + - s"$SCAN_NATIVE_ICEBERG_COMPAT" - } + CometIcebergNativeScan.isSchemaSupported(partitionSchema, fallbackReasons) val cometExecEnabled = COMET_EXEC_ENABLED.get() if (!cometExecEnabled) { fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true" } - if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues && + if (cometExecEnabled && schemaSupported && partitionSchemaSupported && fallbackReasons.isEmpty) { logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT") SCAN_NATIVE_ICEBERG_COMPAT @@ -660,23 +636,18 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C dt: DataType, name: String, fallbackReasons: ListBuffer[String]): Boolean = { - dt match { - case ByteType | ShortType - if scanImpl != CometConf.SCAN_NATIVE_COMET && - !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => - fallbackReasons += s"$scanImpl scan cannot read $dt when " + - s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." - false - case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => - false - case dt if isStringCollationType(dt) => - // we don't need specific support for collation in scans, but this - // is a convenient place to force the whole query to fall back to Spark for now - false - case s: StructType if s.fields.isEmpty => - false - case _ => - super.isTypeSupported(dt, name, fallbackReasons) + scanImpl match { + case CometConf.SCAN_NATIVE_DATAFUSION => + CometNativeScan.isTypeSupported(dt, name, fallbackReasons) + case CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + CometIcebergNativeScan.isTypeSupported(dt, name, fallbackReasons) + case CometConf.SCAN_NATIVE_COMET => + dt match { + case _: StructType | _: ArrayType | _: MapType => + false + case _ => + super.isTypeSupported(dt, name, fallbackReasons) + } } } } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index dc7df531f6..9e5c13494b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -20,6 +20,7 @@ package org.apache.comet.serde.operator import scala.collection.mutable +import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import org.json4s._ @@ -31,14 +32,18 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec} import org.apache.spark.sql.types._ -import org.apache.comet.ConfigEntry +import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.IcebergReflection import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} -object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] with Logging { +object CometIcebergNativeScan + extends CometOperatorSerde[CometBatchScanExec] + with DataTypeSupport + with Logging { override def enabledConfig: Option[ConfigEntry[Boolean]] = None @@ -70,6 +75,27 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } + override def isTypeSupported( + dt: DataType, + name: String, + fallbackReasons: ListBuffer[String]): Boolean = { + dt match { + case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => + fallbackReasons += s"Cannot read $dt when " + + s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." + false + case m: MapType => + // maps containing complex types are not supported + isComplexType(m.keyType) || isComplexType(m.valueType) || + !isTypeSupported(m.keyType, name, fallbackReasons) || isTypeSupported( + m.valueType, + name, + fallbackReasons) + case _ => + super.isTypeSupported(dt, name, fallbackReasons) + } + } + /** * Converts an Iceberg partition value to JSON format expected by iceberg-rust. * diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 12be14450b..aacfe29d81 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometSc import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.{ByteType, DataType, ShortType, StructField, StructType} -import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} import org.apache.comet.CometConf.COMET_EXEC_ENABLED import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, withInfo} import org.apache.comet.objectstore.NativeConfig @@ -44,7 +44,24 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} /** * Validation and serde logic for `native_datafusion` scans. */ -object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { +object CometNativeScan + extends CometOperatorSerde[CometScanExec] + with DataTypeSupport + with Logging { + + override def isTypeSupported( + dt: DataType, + name: String, + fallbackReasons: ListBuffer[String]): Boolean = { + dt match { + case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => + fallbackReasons += s"Cannot read $dt when " + + s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." + false + case _ => + super.isTypeSupported(dt, name, fallbackReasons) + } + } /** Determine whether the scan is supported and tag the Spark plan with any fallback reasons */ def isSupported(scanExec: FileSourceScanExec): Boolean = {