Skip to content

Commit 237929d

Browse files
authored
fix: Remove fallback for maps containing complex types (#2943)
1 parent bba98c7 commit 237929d

File tree

7 files changed

+86
-66
lines changed

7 files changed

+86
-66
lines changed

spark/src/main/scala/org/apache/comet/DataTypeSupport.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,14 @@ object DataTypeSupport {
7979
case _: StructType | _: ArrayType | _: MapType => true
8080
case _ => false
8181
}
82+
83+
def hasTemporalType(t: DataType): Boolean = t match {
84+
case DataTypes.DateType | DataTypes.TimestampType | DataTypes.TimestampNTZType =>
85+
true
86+
case t: StructType => t.exists(f => hasTemporalType(f.dataType))
87+
case t: ArrayType => hasTemporalType(t.elementType)
88+
case t: MapType => hasTemporalType(t.keyType) || hasTemporalType(t.valueType)
89+
case _ => false
90+
}
91+
8292
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -592,34 +592,12 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
592592
val partitionSchemaSupported =
593593
typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
594594

595-
def hasUnsupportedType(dataType: DataType): Boolean = {
596-
dataType match {
597-
case s: StructType => s.exists(field => hasUnsupportedType(field.dataType))
598-
case a: ArrayType => hasUnsupportedType(a.elementType)
599-
case m: MapType =>
600-
// maps containing complex types are not supported
601-
isComplexType(m.keyType) || isComplexType(m.valueType) ||
602-
hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType)
603-
case dt if isStringCollationType(dt) => true
604-
case _ => false
605-
}
606-
}
607-
608-
val knownIssues =
609-
scanExec.requiredSchema.exists(field => hasUnsupportedType(field.dataType)) ||
610-
partitionSchema.exists(field => hasUnsupportedType(field.dataType))
611-
612-
if (knownIssues) {
613-
fallbackReasons += "Schema contains data types that are not supported by " +
614-
s"$SCAN_NATIVE_ICEBERG_COMPAT"
615-
}
616-
617595
val cometExecEnabled = COMET_EXEC_ENABLED.get()
618596
if (!cometExecEnabled) {
619597
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true"
620598
}
621599

622-
if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues &&
600+
if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
623601
fallbackReasons.isEmpty) {
624602
logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
625603
SCAN_NATIVE_ICEBERG_COMPAT

spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@ import org.apache.spark.sql.execution.SparkPlan
3535
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3636
import org.apache.spark.sql.internal.SQLConf
3737

38-
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
38+
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions}
3939

4040
class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
4141

4242
var filename: String = null
4343

44+
/** Filename for data file with deeply nested complex types */
45+
var complexTypesFilename: String = null
46+
4447
/**
4548
* We use Asia/Kathmandu because it has a non-zero number of minutes as the offset, so is an
4649
* interesting edge case. Also, this timezone tends to be different from the default system
@@ -53,18 +56,20 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
5356
override def beforeAll(): Unit = {
5457
super.beforeAll()
5558
val tempDir = System.getProperty("java.io.tmpdir")
56-
filename = s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet"
5759
val random = new Random(42)
60+
val dataGenOptions = DataGenOptions(
61+
generateNegativeZero = false,
62+
// override base date due to known issues with experimental scans
63+
baseDate = new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime)
64+
65+
// generate Parquet file with primitives, structs, and arrays, but no maps
66+
// and no nested complex types
67+
filename = s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet"
5868
withSQLConf(
5969
CometConf.COMET_ENABLED.key -> "false",
6070
SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
6171
val schemaGenOptions =
6272
SchemaGenOptions(generateArray = true, generateStruct = true)
63-
val dataGenOptions = DataGenOptions(
64-
generateNegativeZero = false,
65-
// override base date due to known issues with experimental scans
66-
baseDate =
67-
new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime)
6873
ParquetGenerator.makeParquetFile(
6974
random,
7075
spark,
@@ -73,6 +78,30 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
7378
schemaGenOptions,
7479
dataGenOptions)
7580
}
81+
82+
// generate Parquet file with complex nested types
83+
complexTypesFilename =
84+
s"$tempDir/CometFuzzTestSuite_nested_${System.currentTimeMillis()}.parquet"
85+
withSQLConf(
86+
CometConf.COMET_ENABLED.key -> "false",
87+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
88+
val schemaGenOptions =
89+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = true)
90+
val schema = FuzzDataGenerator.generateNestedSchema(
91+
random,
92+
numCols = 10,
93+
minDepth = 2,
94+
maxDepth = 4,
95+
options = schemaGenOptions)
96+
ParquetGenerator.makeParquetFile(
97+
random,
98+
spark,
99+
complexTypesFilename,
100+
schema,
101+
1000,
102+
dataGenOptions)
103+
}
104+
76105
}
77106

78107
protected override def afterAll(): Unit = {
@@ -84,6 +113,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
84113
pos: Position): Unit = {
85114
Seq("native", "jvm").foreach { shuffleMode =>
86115
Seq(
116+
CometConf.SCAN_AUTO,
87117
CometConf.SCAN_NATIVE_COMET,
88118
CometConf.SCAN_NATIVE_DATAFUSION,
89119
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl =>

spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
2828
import org.apache.spark.sql.types._
2929

3030
import org.apache.comet.DataTypeSupport.isComplexType
31-
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
31+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
3232
import org.apache.comet.testing.FuzzDataGenerator.{doubleNaNLiteral, floatNaNLiteral}
3333

3434
class CometFuzzTestSuite extends CometFuzzTestBase {
@@ -44,6 +44,17 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
4444
}
4545
}
4646

47+
test("select * with deeply nested complex types") {
48+
val df = spark.read.parquet(complexTypesFilename)
49+
df.createOrReplaceTempView("t1")
50+
val sql = "SELECT * FROM t1"
51+
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET) {
52+
checkSparkAnswerAndOperator(sql)
53+
} else {
54+
checkSparkAnswer(sql)
55+
}
56+
}
57+
4758
test("select * with limit") {
4859
val df = spark.read.parquet(filename)
4960
df.createOrReplaceTempView("t1")
@@ -179,7 +190,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
179190
case CometConf.SCAN_NATIVE_COMET =>
180191
// native_comet does not support reading complex types
181192
0
182-
case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | CometConf.SCAN_NATIVE_DATAFUSION =>
193+
case _ =>
183194
CometConf.COMET_SHUFFLE_MODE.get() match {
184195
case "jvm" =>
185196
1
@@ -202,7 +213,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
202213
case CometConf.SCAN_NATIVE_COMET =>
203214
// native_comet does not support reading complex types
204215
0
205-
case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | CometConf.SCAN_NATIVE_DATAFUSION =>
216+
case _ =>
206217
CometConf.COMET_SHUFFLE_MODE.get() match {
207218
case "jvm" =>
208219
1
@@ -272,12 +283,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
272283
}
273284

274285
private def testParquetTemporalTypes(
275-
outputTimestampType: ParquetOutputTimestampType.Value,
276-
generateArray: Boolean = true,
277-
generateStruct: Boolean = true): Unit = {
278-
279-
val schemaGenOptions =
280-
SchemaGenOptions(generateArray = generateArray, generateStruct = generateStruct)
286+
outputTimestampType: ParquetOutputTimestampType.Value): Unit = {
281287

282288
val dataGenOptions = DataGenOptions(generateNegativeZero = false)
283289

@@ -287,12 +293,23 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
287293
CometConf.COMET_ENABLED.key -> "false",
288294
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
289295
SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
296+
297+
// TODO test with MapType
298+
// https://github.com/apache/datafusion-comet/issues/2945
299+
val schema = StructType(
300+
Seq(
301+
StructField("c0", DataTypes.DateType),
302+
StructField("c1", DataTypes.createArrayType(DataTypes.DateType)),
303+
StructField(
304+
"c2",
305+
DataTypes.createStructType(Array(StructField("c3", DataTypes.DateType))))))
306+
290307
ParquetGenerator.makeParquetFile(
291308
random,
292309
spark,
293310
filename.toString,
311+
schema,
294312
100,
295-
schemaGenOptions,
296313
dataGenOptions)
297314
}
298315

@@ -309,18 +326,10 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
309326

310327
val df = spark.read.parquet(filename.toString)
311328
df.createOrReplaceTempView("t1")
312-
313-
def hasTemporalType(t: DataType): Boolean = t match {
314-
case DataTypes.DateType | DataTypes.TimestampType |
315-
DataTypes.TimestampNTZType =>
316-
true
317-
case t: StructType => t.exists(f => hasTemporalType(f.dataType))
318-
case t: ArrayType => hasTemporalType(t.elementType)
319-
case _ => false
320-
}
321-
322329
val columns =
323-
df.schema.fields.filter(f => hasTemporalType(f.dataType)).map(_.name)
330+
df.schema.fields
331+
.filter(f => DataTypeSupport.hasTemporalType(f.dataType))
332+
.map(_.name)
324333

325334
for (col <- columns) {
326335
checkSparkAnswer(s"SELECT $col FROM t1 ORDER BY $col")

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ abstract class CometTestBase
116116
sparkPlan = dfSpark.queryExecution.executedPlan
117117
}
118118
val dfComet = datasetOfRows(spark, df.logicalPlan)
119-
120119
if (withTol.isDefined) {
121120
checkAnswerWithTolerance(dfComet, expected, withTol.get)
122121
} else {

spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.spark.sql
2121

22-
import scala.collection.mutable.ListBuffer
23-
2422
import org.apache.spark.sql.catalyst.TableIdentifier
2523
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2624
import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString}
@@ -29,7 +27,6 @@ import org.apache.spark.sql.types.DataTypes
2927

3028
import org.apache.comet.{CometConf, CometFuzzTestBase}
3129
import org.apache.comet.expressions.{CometCast, CometEvalMode}
32-
import org.apache.comet.rules.CometScanTypeChecker
3330
import org.apache.comet.serde.Compatible
3431

3532
class CometToPrettyStringSuite extends CometFuzzTestBase {
@@ -45,14 +42,14 @@ class CometToPrettyStringSuite extends CometFuzzTestBase {
4542
val plan = Project(Seq(prettyExpr), table)
4643
val analyzed = spark.sessionState.analyzer.execute(plan)
4744
val result: DataFrame = Dataset.ofRows(spark, analyzed)
48-
CometCast.isSupported(
45+
val supportLevel = CometCast.isSupported(
4946
field.dataType,
5047
DataTypes.StringType,
5148
Some(spark.sessionState.conf.sessionLocalTimeZone),
52-
CometEvalMode.TRY) match {
49+
CometEvalMode.TRY)
50+
supportLevel match {
5351
case _: Compatible
54-
if CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
55-
.isTypeSupported(field.dataType, field.name, ListBuffer.empty) =>
52+
if CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET =>
5653
checkSparkAnswerAndOperator(result)
5754
case _ => checkSparkAnswer(result)
5855
}

spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.spark.sql
2121

22-
import scala.collection.mutable.ListBuffer
23-
2422
import org.apache.spark.sql.catalyst.TableIdentifier
2523
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2624
import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString}
@@ -32,7 +30,6 @@ import org.apache.spark.sql.types.DataTypes
3230

3331
import org.apache.comet.{CometConf, CometFuzzTestBase}
3432
import org.apache.comet.expressions.{CometCast, CometEvalMode}
35-
import org.apache.comet.rules.CometScanTypeChecker
3633
import org.apache.comet.serde.Compatible
3734

3835
class CometToPrettyStringSuite extends CometFuzzTestBase {
@@ -56,14 +53,14 @@ class CometToPrettyStringSuite extends CometFuzzTestBase {
5653
val plan = Project(Seq(prettyExpr), table)
5754
val analyzed = spark.sessionState.analyzer.execute(plan)
5855
val result: DataFrame = Dataset.ofRows(spark, analyzed)
59-
CometCast.isSupported(
56+
val supportLevel = CometCast.isSupported(
6057
field.dataType,
6158
DataTypes.StringType,
6259
Some(spark.sessionState.conf.sessionLocalTimeZone),
63-
CometEvalMode.TRY) match {
60+
CometEvalMode.TRY)
61+
supportLevel match {
6462
case _: Compatible
65-
if CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
66-
.isTypeSupported(field.dataType, field.name, ListBuffer.empty) =>
63+
if CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET =>
6764
checkSparkAnswerAndOperator(result)
6865
case _ => checkSparkAnswer(result)
6966
}

0 commit comments

Comments
 (0)