Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.types._

import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE}
import org.apache.comet.shims.CometTypeShim

trait DataTypeSupport {
trait DataTypeSupport extends CometTypeShim {

/**
* Checks if this schema is supported by checking if each field in the schema is supported.
Expand All @@ -49,6 +50,10 @@ trait DataTypeSupport {
fallbackReasons: ListBuffer[String]): Boolean = {

dt match {
case dt if isStringCollationType(dt) =>
// we don't need specific support for collation in scans, but this
// is a convenient place to force the whole query to fall back to Spark for now
false
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
BinaryType | StringType | _: DecimalType | DateType | TimestampType |
TimestampNTZType =>
Expand Down
63 changes: 17 additions & 46 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
import org.apache.comet.serde.operator.CometNativeScan
import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan}
import org.apache.comet.shims.CometTypeShim

/**
Expand Down Expand Up @@ -272,9 +272,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
return withInfos(scanExec, fallbackReasons.toSet)
}

val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons)
CometNativeScan.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons)

if (!schemaSupported) {
fallbackReasons += "Comet extension is not enabled for " +
Expand Down Expand Up @@ -586,40 +585,17 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3"
}

val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
CometIcebergNativeScan.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
val partitionSchemaSupported =
typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)

def hasUnsupportedType(dataType: DataType): Boolean = {
dataType match {
case s: StructType => s.exists(field => hasUnsupportedType(field.dataType))
case a: ArrayType => hasUnsupportedType(a.elementType)
case m: MapType =>
// maps containing complex types are not supported
isComplexType(m.keyType) || isComplexType(m.valueType) ||
hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType)
case dt if isStringCollationType(dt) => true
case _ => false
}
}

val knownIssues =
scanExec.requiredSchema.exists(field => hasUnsupportedType(field.dataType)) ||
partitionSchema.exists(field => hasUnsupportedType(field.dataType))

if (knownIssues) {
fallbackReasons += "Schema contains data types that are not supported by " +
s"$SCAN_NATIVE_ICEBERG_COMPAT"
}
CometIcebergNativeScan.isSchemaSupported(partitionSchema, fallbackReasons)

val cometExecEnabled = COMET_EXEC_ENABLED.get()
if (!cometExecEnabled) {
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true"
}

if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues &&
if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
fallbackReasons.isEmpty) {
logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
SCAN_NATIVE_ICEBERG_COMPAT
Expand Down Expand Up @@ -660,23 +636,18 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
dt: DataType,
name: String,
fallbackReasons: ListBuffer[String]): Boolean = {
dt match {
case ByteType | ShortType
if scanImpl != CometConf.SCAN_NATIVE_COMET &&
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
fallbackReasons += s"$scanImpl scan cannot read $dt when " +
s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}."
false
case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET =>
false
case dt if isStringCollationType(dt) =>
// we don't need specific support for collation in scans, but this
// is a convenient place to force the whole query to fall back to Spark for now
false
case s: StructType if s.fields.isEmpty =>
false
case _ =>
super.isTypeSupported(dt, name, fallbackReasons)
scanImpl match {
case CometConf.SCAN_NATIVE_DATAFUSION =>
CometNativeScan.isTypeSupported(dt, name, fallbackReasons)
case CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
CometIcebergNativeScan.isTypeSupported(dt, name, fallbackReasons)
case CometConf.SCAN_NATIVE_COMET =>
dt match {
case _: StructType | _: ArrayType | _: MapType =>
false
case _ =>
super.isTypeSupported(dt, name, fallbackReasons)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.comet.serde.operator

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import org.json4s._
Expand All @@ -31,14 +32,18 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec}
import org.apache.spark.sql.types._

import org.apache.comet.ConfigEntry
import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.iceberg.IcebergReflection
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField}
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}

object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] with Logging {
object CometIcebergNativeScan
extends CometOperatorSerde[CometBatchScanExec]
with DataTypeSupport
with Logging {

override def enabledConfig: Option[ConfigEntry[Boolean]] = None

Expand Down Expand Up @@ -70,6 +75,27 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
}
}

override def isTypeSupported(
dt: DataType,
name: String,
fallbackReasons: ListBuffer[String]): Boolean = {
dt match {
case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
fallbackReasons += s"Cannot read $dt when " +
s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}."
false
case m: MapType =>
// maps containing complex types are not supported
isComplexType(m.keyType) || isComplexType(m.valueType) ||
!isTypeSupported(m.keyType, name, fallbackReasons) || isTypeSupported(
m.valueType,
name,
fallbackReasons)
case _ =>
super.isTypeSupported(dt, name, fallbackReasons)
}
}

/**
* Converts an Iceberg partition value to JSON format expected by iceberg-rust.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometSc
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{ByteType, DataType, ShortType, StructField, StructType}

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
import org.apache.comet.CometConf.COMET_EXEC_ENABLED
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, withInfo}
import org.apache.comet.objectstore.NativeConfig
Expand All @@ -44,7 +44,24 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
/**
* Validation and serde logic for `native_datafusion` scans.
*/
object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
object CometNativeScan
extends CometOperatorSerde[CometScanExec]
with DataTypeSupport
with Logging {

override def isTypeSupported(
dt: DataType,
name: String,
fallbackReasons: ListBuffer[String]): Boolean = {
dt match {
case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
fallbackReasons += s"Cannot read $dt when " +
s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}."
false
case _ =>
super.isTypeSupported(dt, name, fallbackReasons)
}
}

/** Determine whether the scan is supported and tag the Spark plan with any fallback reasons */
def isSupported(scanExec: FileSourceScanExec): Boolean = {
Expand Down
Loading