Skip to content

Commit 3008f4a

Browse files
hsiang-candygrove
andauthored
feat: More warning info for users (#1667)
* feat: More warning info for users * Remove unused import * Collect feedback reasons * isTypeSupported -> validateTypeSupported * Better name for array element and Map key-value * Evaluate conf in individual test scope * validateTypeSupported -> isTypeSupported * Switch back to SQLConf * Update spark/src/main/scala/org/apache/comet/DataTypeSupport.scala Thanks 👍 Co-authored-by: Andy Grove <[email protected]> * Use the given conf * Fix unit test * Revert "Fix unit test" This reverts commit 7d000c0. * Don't depend on empty fallbackReasons to use Coment exec * Missing early return --------- Co-authored-by: Andy Grove <[email protected]>
1 parent d8293f0 commit 3008f4a

File tree

10 files changed

+166
-97
lines changed

10 files changed

+166
-97
lines changed

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package org.apache.comet
2121

2222
import java.nio.ByteOrder
2323

24+
import scala.collection.mutable.ListBuffer
25+
2426
import org.apache.spark.SparkConf
2527
import org.apache.spark.internal.Logging
2628
import org.apache.spark.network.util.ByteUnit
@@ -193,7 +195,8 @@ object CometSparkSessionExtensions extends Logging {
193195
// operators can have a chance to be converted to columnar. Leaf operators that output
194196
// columnar batches, such as Spark's vectorized readers, will also be converted to native
195197
// comet batches.
196-
if (CometSparkToColumnarExec.isSchemaSupported(op.schema)) {
198+
val fallbackReasons = new ListBuffer[String]()
199+
if (CometSparkToColumnarExec.isSchemaSupported(op.schema, fallbackReasons)) {
197200
op match {
198201
// Convert Spark DS v1 scan to Arrow format
199202
case scan: FileSourceScanExec =>

spark/src/main/scala/org/apache/comet/DataTypeSupport.scala

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
package org.apache.comet
2121

22+
import scala.collection.mutable.ListBuffer
23+
2224
import org.apache.spark.sql.internal.SQLConf
2325
import org.apache.spark.sql.types._
2426

27+
import org.apache.comet.DataTypeSupport.{usingParquetExecWithIncompatTypes, ARRAY_ELEMENT, MAP_KEY, MAP_VALUE}
28+
2529
trait DataTypeSupport {
2630

2731
/**
@@ -33,20 +37,25 @@ trait DataTypeSupport {
3337
* @return
3438
* true if the datatype is supported
3539
*/
36-
def isAdditionallySupported(dt: DataType): Boolean = false
40+
def isAdditionallySupported(
41+
dt: DataType,
42+
name: String,
43+
fallbackReasons: ListBuffer[String]): Boolean = false
3744

38-
private def isGloballySupported(dt: DataType): Boolean = dt match {
39-
case ByteType | ShortType
40-
if CometSparkSessionExtensions.usingDataFusionParquetExec(SQLConf.get) &&
41-
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
42-
false
43-
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
44-
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
45-
true
46-
case t: DataType if t.typeName == "timestamp_ntz" =>
47-
true
48-
case _ => false
49-
}
45+
private def isGloballySupported(dt: DataType, fallbackReasons: ListBuffer[String]): Boolean =
46+
dt match {
47+
case ByteType | ShortType if usingParquetExecWithIncompatTypes(SQLConf.get) =>
48+
fallbackReasons += s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false"
49+
false
50+
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
51+
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
52+
true
53+
case t: DataType if t.typeName == "timestamp_ntz" =>
54+
true
55+
case other =>
56+
fallbackReasons += s"Unsupported type: $other"
57+
false
58+
}
5059

5160
/**
5261
* Checks if this schema is supported by checking if each field in the struct is supported.
@@ -56,23 +65,46 @@ trait DataTypeSupport {
5665
* @return
5766
* true if all fields in the struct are supported
5867
*/
59-
def isSchemaSupported(struct: StructType): Boolean = {
60-
struct.fields.map(_.dataType).forall(isTypeSupported)
68+
def isSchemaSupported(struct: StructType, fallbackReasons: ListBuffer[String]): Boolean = {
69+
struct.fields.forall(f => isTypeSupported(f.dataType, f.name, fallbackReasons))
6170
}
6271

63-
def isTypeSupported(dt: DataType): Boolean = {
64-
if (isGloballySupported(dt) || isAdditionallySupported(dt)) {
72+
def isTypeSupported(
73+
dt: DataType,
74+
name: String,
75+
fallbackReasons: ListBuffer[String]): Boolean = {
76+
if (isGloballySupported(dt, fallbackReasons) || isAdditionallySupported(
77+
dt,
78+
name,
79+
fallbackReasons)) {
6580
// If complex types are supported, we additionally want to recurse into their children
6681
dt match {
67-
case StructType(fields) => fields.map(_.dataType).forall(isTypeSupported)
68-
case ArrayType(elementType, _) => isTypeSupported(elementType)
82+
case StructType(fields) =>
83+
fields.forall(f => isTypeSupported(f.dataType, f.name, fallbackReasons))
84+
case ArrayType(elementType, _) =>
85+
isTypeSupported(elementType, ARRAY_ELEMENT, fallbackReasons)
6986
case MapType(keyType, valueType, _) =>
70-
isTypeSupported(keyType) && isTypeSupported(valueType)
87+
isTypeSupported(keyType, MAP_KEY, fallbackReasons) && isTypeSupported(
88+
valueType,
89+
MAP_VALUE,
90+
fallbackReasons)
7191
// Not a complex type
7292
case _ => true
7393
}
7494
} else {
95+
fallbackReasons += s"Unsupported ${name} of type ${dt}"
7596
false
7697
}
7798
}
7899
}
100+
101+
object DataTypeSupport {
102+
val ARRAY_ELEMENT = "array element"
103+
val MAP_KEY = "map key"
104+
val MAP_VALUE = "map value"
105+
106+
def usingParquetExecWithIncompatTypes(conf: SQLConf): Boolean = {
107+
CometSparkSessionExtensions.usingDataFusionParquetExec(conf) &&
108+
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
109+
}
110+
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,23 +86,25 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
8686
val fallbackReasons = new ListBuffer[String]()
8787
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
8888
fallbackReasons += s"Unsupported file format ${r.fileFormat}"
89+
return withInfo(scanExec, fallbackReasons.mkString(", "))
8990
}
9091

9192
val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
9293
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
9394
fallbackReasons +=
9495
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"
96+
return withInfo(scanExec, fallbackReasons.mkString(", "))
9597
}
9698

9799
val (schemaSupported, partitionSchemaSupported) = scanImpl match {
98100
case CometConf.SCAN_NATIVE_DATAFUSION =>
99101
(
100-
CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema),
101-
CometNativeScanExec.isSchemaSupported(r.partitionSchema))
102+
CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema, fallbackReasons),
103+
CometNativeScanExec.isSchemaSupported(r.partitionSchema, fallbackReasons))
102104
case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
103105
(
104-
CometScanExec.isSchemaSupported(scanExec.requiredSchema),
105-
CometScanExec.isSchemaSupported(r.partitionSchema))
106+
CometScanExec.isSchemaSupported(scanExec.requiredSchema, fallbackReasons),
107+
CometScanExec.isSchemaSupported(r.partitionSchema, fallbackReasons))
106108
}
107109

108110
if (!schemaSupported) {
@@ -112,7 +114,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
112114
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
113115
}
114116

115-
if (fallbackReasons.isEmpty) {
117+
if (schemaSupported && partitionSchemaSupported) {
116118
CometScanExec(scanExec, session)
117119
} else {
118120
withInfo(scanExec, fallbackReasons.mkString(", "))
@@ -128,19 +130,23 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
128130
scanExec.scan match {
129131
case scan: ParquetScan =>
130132
val fallbackReasons = new ListBuffer[String]()
131-
if (!CometBatchScanExec.isSchemaSupported(scan.readDataSchema)) {
133+
val schemaSupported =
134+
CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons)
135+
if (!schemaSupported) {
132136
fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
133137
}
134138

135-
if (!CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema)) {
139+
val partitionSchemaSupported =
140+
CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons)
141+
if (!partitionSchemaSupported) {
136142
fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported"
137143
}
138144

139145
if (scan.pushedAggregate.nonEmpty) {
140146
fallbackReasons += "Comet does not support pushed aggregate"
141147
}
142148

143-
if (fallbackReasons.isEmpty) {
149+
if (schemaSupported && partitionSchemaSupported && scan.pushedAggregate.isEmpty) {
144150
val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
145151
CometBatchScanExec(
146152
scanExec.copy(scan = cometScan),
@@ -158,12 +164,15 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
158164
s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side"
159165
}
160166

161-
if (!CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema())) {
167+
val schemaSupported =
168+
CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons)
169+
170+
if (!schemaSupported) {
162171
fallbackReasons += "Comet extension is not enabled for " +
163172
s"${scanExec.scan.getClass.getSimpleName}: Schema not supported"
164173
}
165174

166-
if (fallbackReasons.isEmpty) {
175+
if (s.isCometEnabled && schemaSupported) {
167176
// When reading from Iceberg, we automatically enable type promotion
168177
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
169178
CometBatchScanExec(

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.spark.sql.comet
2121

22+
import scala.collection.mutable.ListBuffer
2223
import scala.reflect.ClassTag
2324

2425
import org.apache.spark.rdd.RDD
@@ -36,6 +37,7 @@ import org.apache.spark.util.collection._
3637
import com.google.common.base.Objects
3738

3839
import org.apache.comet.DataTypeSupport
40+
import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE}
3941
import org.apache.comet.parquet.CometParquetFileFormat
4042
import org.apache.comet.serde.OperatorOuterClass.Operator
4143

@@ -229,11 +231,19 @@ object CometNativeScanExec extends DataTypeSupport {
229231
batchScanExec
230232
}
231233

232-
override def isAdditionallySupported(dt: DataType): Boolean = {
234+
override def isAdditionallySupported(
235+
dt: DataType,
236+
name: String,
237+
fallbackReasons: ListBuffer[String]): Boolean = {
233238
dt match {
234-
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
235-
case a: ArrayType => isTypeSupported(a.elementType)
236-
case m: MapType => isTypeSupported(m.keyType) && isTypeSupported(m.valueType)
239+
case s: StructType =>
240+
s.fields.forall(f => isTypeSupported(f.dataType, f.name, fallbackReasons))
241+
case a: ArrayType => isTypeSupported(a.elementType, ARRAY_ELEMENT, fallbackReasons)
242+
case m: MapType =>
243+
isTypeSupported(m.keyType, MAP_KEY, fallbackReasons) && isTypeSupported(
244+
m.valueType,
245+
MAP_VALUE,
246+
fallbackReasons)
237247
case _ => false
238248
}
239249
}

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.spark.sql.comet
2121

22-
import scala.collection.mutable.HashMap
22+
import scala.collection.mutable.{HashMap, ListBuffer}
2323
import scala.concurrent.duration.NANOSECONDS
2424
import scala.reflect.ClassTag
2525

@@ -45,6 +45,7 @@ import org.apache.spark.util.SerializableConfiguration
4545
import org.apache.spark.util.collection._
4646

4747
import org.apache.comet.{CometConf, DataTypeSupport, MetricsSupport}
48+
import org.apache.comet.DataTypeSupport.{ARRAY_ELEMENT, MAP_KEY, MAP_VALUE}
4849
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory}
4950

5051
/**
@@ -478,12 +479,21 @@ case class CometScanExec(
478479

479480
object CometScanExec extends DataTypeSupport {
480481

481-
override def isAdditionallySupported(dt: DataType): Boolean = {
482+
override def isAdditionallySupported(
483+
dt: DataType,
484+
name: String,
485+
fallbackReasons: ListBuffer[String]): Boolean = {
482486
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
483487
dt match {
484-
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
485-
case a: ArrayType => isTypeSupported(a.elementType)
486-
case m: MapType => isTypeSupported(m.keyType) && isTypeSupported(m.valueType)
488+
case s: StructType =>
489+
s.fields.forall(f => isTypeSupported(f.dataType, f.name, fallbackReasons))
490+
case a: ArrayType =>
491+
isTypeSupported(a.elementType, ARRAY_ELEMENT, fallbackReasons)
492+
case m: MapType =>
493+
isTypeSupported(m.keyType, MAP_KEY, fallbackReasons) && isTypeSupported(
494+
m.valueType,
495+
MAP_VALUE,
496+
fallbackReasons)
487497
case _ => false
488498
}
489499
} else {

spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.spark.sql.comet
2121

22+
import scala.collection.mutable.ListBuffer
23+
2224
import org.apache.spark.TaskContext
2325
import org.apache.spark.broadcast.Broadcast
2426
import org.apache.spark.rdd.RDD
@@ -135,7 +137,10 @@ case class CometSparkToColumnarExec(child: SparkPlan)
135137
}
136138

137139
object CometSparkToColumnarExec extends DataTypeSupport {
138-
override def isAdditionallySupported(dt: DataType): Boolean = dt match {
140+
override def isAdditionallySupported(
141+
dt: DataType,
142+
name: String,
143+
fallbackReasons: ListBuffer[String]): Boolean = dt match {
139144
case _: StructType => true
140145
case _ => false
141146
}

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
6060
private val timestampPattern = "0123456789/:T" + whitespaceChars
6161

6262
lazy val usingParquetExecWithIncompatTypes: Boolean =
63-
CometSparkSessionExtensions.usingDataFusionParquetExec(conf) &&
64-
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
63+
DataTypeSupport.usingParquetExecWithIncompatTypes(conf)
6564

6665
test("all valid cast combinations covered") {
6766
val names = testNames

0 commit comments

Comments
 (0)