From 451e7e84cb197370a0f8b641f214ef47ae040775 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 19 Dec 2025 07:48:31 -0700 Subject: [PATCH 1/3] Save --- .../apache/comet/rules/CometScanRule.scala | 44 ++++++------------- .../operator/CometIcebergNativeScan.scala | 36 ++++++++++++++- .../serde/operator/CometNativeScan.scala | 29 ++++++++++-- 3 files changed, 74 insertions(+), 35 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 69bce75559..10b07f36d3 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -48,7 +48,7 @@ import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti import org.apache.comet.objectstore.NativeConfig import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet} import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} -import org.apache.comet.serde.operator.CometNativeScan +import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan} import org.apache.comet.shims.CometTypeShim /** @@ -272,9 +272,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com return withInfos(scanExec, fallbackReasons.toSet) } - val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION) val schemaSupported = - typeChecker.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons) + CometNativeScan.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons) if (!schemaSupported) { fallbackReasons += "Comet extension is not enabled for " + @@ -586,40 +585,17 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3" } - val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT) val schemaSupported = - typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) + CometIcebergNativeScan.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) val partitionSchemaSupported = - typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) - - def hasUnsupportedType(dataType: DataType): Boolean = { - dataType match { - case s: StructType => s.exists(field => hasUnsupportedType(field.dataType)) - case a: ArrayType => hasUnsupportedType(a.elementType) - case m: MapType => - // maps containing complex types are not supported - isComplexType(m.keyType) || isComplexType(m.valueType) || - hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType) - case dt if isStringCollationType(dt) => true - case _ => false - } - } - - val knownIssues = - scanExec.requiredSchema.exists(field => hasUnsupportedType(field.dataType)) || - partitionSchema.exists(field => hasUnsupportedType(field.dataType)) - - if (knownIssues) { - fallbackReasons += "Schema contains data types that are not supported by " + - s"$SCAN_NATIVE_ICEBERG_COMPAT" - } + CometIcebergNativeScan.isSchemaSupported(partitionSchema, fallbackReasons) val cometExecEnabled = COMET_EXEC_ENABLED.get() if (!cometExecEnabled) { fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true" } - if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues && + if (cometExecEnabled && schemaSupported && partitionSchemaSupported && fallbackReasons.isEmpty) { logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT") SCAN_NATIVE_ICEBERG_COMPAT @@ -634,7 +610,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com private def isDynamicPruningFilter(e: Expression): Boolean = e.exists(_.isInstanceOf[PlanExpression[_]]) - def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = { + private def checkSchema( + scanExec: FileSourceScanExec, + scanImpl: String, + r: HadoopFsRelation): Unit = { val fallbackReasons = new ListBuffer[String]() val typeChecker = CometScanTypeChecker(scanImpl) val schemaSupported = @@ -651,11 +630,16 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com } } +// TODO move these type checks into specific scan classes case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with CometTypeShim { // this class is intended to be used with a specific scan impl assert(scanImpl != CometConf.SCAN_AUTO) + assert( + scanImpl != CometConf.SCAN_NATIVE_ICEBERG_COMPAT, + "Call CometIcebergNativeScan.isSchemaSupported instead of using CometScanTypeChecker") + override def isTypeSupported( dt: DataType, name: String, diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index dc7df531f6..3ab1f75412 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -20,6 +20,7 @@ package org.apache.comet.serde.operator import scala.collection.mutable +import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import org.json4s._ @@ -31,14 +32,20 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec} import org.apache.spark.sql.types._ -import org.apache.comet.ConfigEntry +import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.IcebergReflection import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} +import org.apache.comet.shims.CometTypeShim -object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] with Logging { +object CometIcebergNativeScan + extends CometOperatorSerde[CometBatchScanExec] + with DataTypeSupport + with CometTypeShim + with Logging { override def enabledConfig: Option[ConfigEntry[Boolean]] = None @@ -70,6 +77,31 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } + override def isTypeSupported( + dt: DataType, + name: String, + fallbackReasons: ListBuffer[String]): Boolean = { + dt match { + case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => + fallbackReasons += s"Cannot read $dt when " + + s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." + false + case dt if isStringCollationType(dt) => + // we don't need specific support for collation in scans, but this + // is a convenient place to force the whole query to fall back to Spark for now + false + case m: MapType => + // maps containing complex types are not supported + isComplexType(m.keyType) || isComplexType(m.valueType) || + !isTypeSupported(m.keyType, name, fallbackReasons) || isTypeSupported( + m.valueType, + name, + fallbackReasons) + case _ => + super.isTypeSupported(dt, name, fallbackReasons) + } + } + /** * Converts an Iceberg partition value to JSON format expected by iceberg-rust. * diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 12be14450b..93e58e7bc4 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometSc import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.{ByteType, DataType, ShortType, StructField, StructType} -import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} import org.apache.comet.CometConf.COMET_EXEC_ENABLED import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, withInfo} import org.apache.comet.objectstore.NativeConfig @@ -40,11 +40,34 @@ import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClas import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} +import org.apache.comet.shims.CometTypeShim /** * Validation and serde logic for `native_datafusion` scans. */ -object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { +object CometNativeScan + extends CometOperatorSerde[CometScanExec] + with DataTypeSupport + with CometTypeShim + with Logging { + + override def isTypeSupported( + dt: DataType, + name: String, + fallbackReasons: ListBuffer[String]): Boolean = { + dt match { + case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => + fallbackReasons += "Cannot read $dt when " + + s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." + false + case dt if isStringCollationType(dt) => + // we don't need specific support for collation in scans, but this + // is a convenient place to force the whole query to fall back to Spark for now + false + case _ => + super.isTypeSupported(dt, name, fallbackReasons) + } + } /** Determine whether the scan is supported and tag the Spark plan with any fallback reasons */ def isSupported(scanExec: FileSourceScanExec): Boolean = { From af19001f4de47c512b7d2be320d64de75070c73a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 19 Dec 2025 07:55:32 -0700 Subject: [PATCH 2/3] save --- .../org/apache/comet/DataTypeSupport.scala | 7 +++- .../apache/comet/rules/CometScanRule.scala | 34 +++++++------------ .../operator/CometIcebergNativeScan.scala | 6 ---- .../serde/operator/CometNativeScan.scala | 8 +---- 4 files changed, 19 insertions(+), 36 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index 9adf829580..901702c378 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -24,8 +24,9 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE} +import org.apache.comet.shims.CometTypeShim -trait DataTypeSupport { +trait DataTypeSupport extends CometTypeShim { /** * Checks if this schema is supported by checking if each field in the schema is supported. @@ -49,6 +50,10 @@ trait DataTypeSupport { fallbackReasons: ListBuffer[String]): Boolean = { dt match { + case dt if isStringCollationType(dt) => + // we don't need specific support for collation in scans, but this + // is a convenient place to force the whole query to fall back to Spark for now + false case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType | StringType | _: DecimalType | DateType | TimestampType | TimestampNTZType => diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 10b07f36d3..d2b897eec5 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -630,37 +630,27 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com } } -// TODO move these type checks into specific scan classes case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with CometTypeShim { // this class is intended to be used with a specific scan impl assert(scanImpl != CometConf.SCAN_AUTO) - assert( - scanImpl != CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - "Call CometIcebergNativeScan.isSchemaSupported instead of using CometScanTypeChecker") - override def isTypeSupported( dt: DataType, name: String, fallbackReasons: ListBuffer[String]): Boolean = { - dt match { - case ByteType | ShortType - if scanImpl != CometConf.SCAN_NATIVE_COMET && - !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => - fallbackReasons += s"$scanImpl scan cannot read $dt when " + - s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." - false - case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET => - false - case dt if isStringCollationType(dt) => - // we don't need specific support for collation in scans, but this - // is a convenient place to force the whole query to fall back to Spark for now - false - case s: StructType if s.fields.isEmpty => - false - case _ => - super.isTypeSupported(dt, name, fallbackReasons) + scanImpl match { + case CometConf.SCAN_NATIVE_DATAFUSION => + CometNativeScan.isTypeSupported(dt, name, fallbackReasons) + case CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + CometIcebergNativeScan.isTypeSupported(dt, name, fallbackReasons) + case CometConf.SCAN_NATIVE_COMET => + dt match { + case _: StructType | _: ArrayType | _: MapType => + false + case _ => + super.isTypeSupported(dt, name, fallbackReasons) + } } } } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 3ab1f75412..9e5c13494b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -39,12 +39,10 @@ import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} -import org.apache.comet.shims.CometTypeShim object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] with DataTypeSupport - with CometTypeShim with Logging { override def enabledConfig: Option[ConfigEntry[Boolean]] = None @@ -86,10 +84,6 @@ object CometIcebergNativeScan fallbackReasons += s"Cannot read $dt when " + s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." false - case dt if isStringCollationType(dt) => - // we don't need specific support for collation in scans, but this - // is a convenient place to force the whole query to fall back to Spark for now - false case m: MapType => // maps containing complex types are not supported isComplexType(m.keyType) || isComplexType(m.valueType) || diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 93e58e7bc4..aacfe29d81 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -40,7 +40,6 @@ import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClas import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} -import org.apache.comet.shims.CometTypeShim /** * Validation and serde logic for `native_datafusion` scans. @@ -48,7 +47,6 @@ import org.apache.comet.shims.CometTypeShim object CometNativeScan extends CometOperatorSerde[CometScanExec] with DataTypeSupport - with CometTypeShim with Logging { override def isTypeSupported( @@ -57,13 +55,9 @@ object CometNativeScan fallbackReasons: ListBuffer[String]): Boolean = { dt match { case ByteType | ShortType if !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() => - fallbackReasons += "Cannot read $dt when " + + fallbackReasons += s"Cannot read $dt when " + s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}." false - case dt if isStringCollationType(dt) => - // we don't need specific support for collation in scans, but this - // is a convenient place to force the whole query to fall back to Spark for now - false case _ => super.isTypeSupported(dt, name, fallbackReasons) } From 1c81264edf1d715b4df77d2a742fff8d38ea33eb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 19 Dec 2025 07:56:47 -0700 Subject: [PATCH 3/3] Save --- .../main/scala/org/apache/comet/rules/CometScanRule.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index d2b897eec5..c0f5040fdd 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -610,10 +610,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com private def isDynamicPruningFilter(e: Expression): Boolean = e.exists(_.isInstanceOf[PlanExpression[_]]) - private def checkSchema( - scanExec: FileSourceScanExec, - scanImpl: String, - r: HadoopFsRelation): Unit = { + def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = { val fallbackReasons = new ListBuffer[String]() val typeChecker = CometScanTypeChecker(scanImpl) val schemaSupported =