diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index 9adf829580..9f8fc77eba 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -79,4 +79,14 @@ object DataTypeSupport { case _: StructType | _: ArrayType | _: MapType => true case _ => false } + + def hasTemporalType(t: DataType): Boolean = t match { + case DataTypes.DateType | DataTypes.TimestampType | DataTypes.TimestampNTZType => + true + case t: StructType => t.exists(f => hasTemporalType(f.dataType)) + case t: ArrayType => hasTemporalType(t.elementType) + case t: MapType => hasTemporalType(t.keyType) || hasTemporalType(t.valueType) + case _ => false + } + } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 69bce75559..01e385b0ae 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -592,34 +592,12 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com val partitionSchemaSupported = typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) - def hasUnsupportedType(dataType: DataType): Boolean = { - dataType match { - case s: StructType => s.exists(field => hasUnsupportedType(field.dataType)) - case a: ArrayType => hasUnsupportedType(a.elementType) - case m: MapType => - // maps containing complex types are not supported - isComplexType(m.keyType) || isComplexType(m.valueType) || - hasUnsupportedType(m.keyType) || hasUnsupportedType(m.valueType) - case dt if isStringCollationType(dt) => true - case _ => false - } - } - - val knownIssues = - scanExec.requiredSchema.exists(field => hasUnsupportedType(field.dataType)) || - partitionSchema.exists(field => hasUnsupportedType(field.dataType)) - - if (knownIssues) { - fallbackReasons += "Schema contains data types that are not supported by " + - s"$SCAN_NATIVE_ICEBERG_COMPAT" - } - val cometExecEnabled = COMET_EXEC_ENABLED.get() if (!cometExecEnabled) { fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true" } - if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues && + if (cometExecEnabled && schemaSupported && partitionSchemaSupported && fallbackReasons.isEmpty) { logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT") SCAN_NATIVE_ICEBERG_COMPAT diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 1c0636780e..74858ed614 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -35,12 +35,15 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} +import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions} class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { var filename: String = null + /** Filename for data file with deeply nested complex types */ + var complexTypesFilename: String = null + /** * We use Asia/Kathmandu because it has a non-zero number of minutes as the offset, so is an * interesting edge case. Also, this timezone tends to be different from the default system @@ -53,18 +56,20 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { override def beforeAll(): Unit = { super.beforeAll() val tempDir = System.getProperty("java.io.tmpdir") - filename = s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet" val random = new Random(42) + val dataGenOptions = DataGenOptions( + generateNegativeZero = false, + // override base date due to known issues with experimental scans + baseDate = new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime) + + // generate Parquet file with primitives, structs, and arrays, but no maps + // and no nested complex types + filename = s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet" withSQLConf( CometConf.COMET_ENABLED.key -> "false", SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { val schemaGenOptions = SchemaGenOptions(generateArray = true, generateStruct = true) - val dataGenOptions = DataGenOptions( - generateNegativeZero = false, - // override base date due to known issues with experimental scans - baseDate = - new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime) ParquetGenerator.makeParquetFile( random, spark, @@ -73,6 +78,30 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { schemaGenOptions, dataGenOptions) } + + // generate Parquet file with complex nested types + complexTypesFilename = + s"$tempDir/CometFuzzTestSuite_nested_${System.currentTimeMillis()}.parquet" + withSQLConf( + CometConf.COMET_ENABLED.key -> "false", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { + val schemaGenOptions = + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = true) + val schema = FuzzDataGenerator.generateNestedSchema( + random, + numCols = 10, + minDepth = 2, + maxDepth = 4, + options = schemaGenOptions) + ParquetGenerator.makeParquetFile( + random, + spark, + complexTypesFilename, + schema, + 1000, + dataGenOptions) + } + } protected override def afterAll(): Unit = { @@ -84,6 +113,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { pos: Position): Unit = { Seq("native", "jvm").foreach { shuffleMode => Seq( + CometConf.SCAN_AUTO, CometConf.SCAN_NATIVE_COMET, CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl => diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 59680bd6bc..833314a5c6 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.isComplexType -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} import org.apache.comet.testing.FuzzDataGenerator.{doubleNaNLiteral, floatNaNLiteral} class CometFuzzTestSuite extends CometFuzzTestBase { @@ -44,6 +44,17 @@ class CometFuzzTestSuite extends CometFuzzTestBase { } } + test("select * with deeply nested complex types") { + val df = spark.read.parquet(complexTypesFilename) + df.createOrReplaceTempView("t1") + val sql = "SELECT * FROM t1" + if (CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET) { + checkSparkAnswerAndOperator(sql) + } else { + checkSparkAnswer(sql) + } + } + test("select * with limit") { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") @@ -179,7 +190,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { case CometConf.SCAN_NATIVE_COMET => // native_comet does not support reading complex types 0 - case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | CometConf.SCAN_NATIVE_DATAFUSION => + case _ => CometConf.COMET_SHUFFLE_MODE.get() match { case "jvm" => 1 @@ -202,7 +213,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { case CometConf.SCAN_NATIVE_COMET => // native_comet does not support reading complex types 0 - case CometConf.SCAN_NATIVE_ICEBERG_COMPAT | CometConf.SCAN_NATIVE_DATAFUSION => + case _ => CometConf.COMET_SHUFFLE_MODE.get() match { case "jvm" => 1 @@ -272,12 +283,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { } private def testParquetTemporalTypes( - outputTimestampType: ParquetOutputTimestampType.Value, - generateArray: Boolean = true, - generateStruct: Boolean = true): Unit = { - - val schemaGenOptions = - SchemaGenOptions(generateArray = generateArray, generateStruct = generateStruct) + outputTimestampType: ParquetOutputTimestampType.Value): Unit = { val dataGenOptions = DataGenOptions(generateNegativeZero = false) @@ -287,12 +293,23 @@ class CometFuzzTestSuite extends CometFuzzTestBase { CometConf.COMET_ENABLED.key -> "false", SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { + + // TODO test with MapType + // https://github.com/apache/datafusion-comet/issues/2945 + val schema = StructType( + Seq( + StructField("c0", DataTypes.DateType), + StructField("c1", DataTypes.createArrayType(DataTypes.DateType)), + StructField( + "c2", + DataTypes.createStructType(Array(StructField("c3", DataTypes.DateType)))))) + ParquetGenerator.makeParquetFile( random, spark, filename.toString, + schema, 100, - schemaGenOptions, dataGenOptions) } @@ -309,18 +326,10 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename.toString) df.createOrReplaceTempView("t1") - - def hasTemporalType(t: DataType): Boolean = t match { - case DataTypes.DateType | DataTypes.TimestampType | - DataTypes.TimestampNTZType => - true - case t: StructType => t.exists(f => hasTemporalType(f.dataType)) - case t: ArrayType => hasTemporalType(t.elementType) - case _ => false - } - val columns = - df.schema.fields.filter(f => hasTemporalType(f.dataType)).map(_.name) + df.schema.fields + .filter(f => DataTypeSupport.hasTemporalType(f.dataType)) + .map(_.name) for (col <- columns) { checkSparkAnswer(s"SELECT $col FROM t1 ORDER BY $col") diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index bc9e521d38..8011e5e70d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -116,7 +116,6 @@ abstract class CometTestBase sparkPlan = dfSpark.queryExecution.executedPlan } val dfComet = datasetOfRows(spark, df.logicalPlan) - if (withTol.isDefined) { checkAnswerWithTolerance(dfComet, expected, withTol.get) } else { diff --git a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala index 991d02014e..70119f44a7 100644 --- a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql -import scala.collection.mutable.ListBuffer - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString} @@ -29,7 +27,6 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.{CometConf, CometFuzzTestBase} import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.rules.CometScanTypeChecker import org.apache.comet.serde.Compatible class CometToPrettyStringSuite extends CometFuzzTestBase { @@ -45,14 +42,14 @@ class CometToPrettyStringSuite extends CometFuzzTestBase { val plan = Project(Seq(prettyExpr), table) val analyzed = spark.sessionState.analyzer.execute(plan) val result: DataFrame = Dataset.ofRows(spark, analyzed) - CometCast.isSupported( + val supportLevel = CometCast.isSupported( field.dataType, DataTypes.StringType, Some(spark.sessionState.conf.sessionLocalTimeZone), - CometEvalMode.TRY) match { + CometEvalMode.TRY) + supportLevel match { case _: Compatible - if CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(field.dataType, field.name, ListBuffer.empty) => + if CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET => checkSparkAnswerAndOperator(result) case _ => checkSparkAnswer(result) } diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala index f842e3f559..b0f40edf76 100644 --- a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql -import scala.collection.mutable.ListBuffer - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString} @@ -32,7 +30,6 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.{CometConf, CometFuzzTestBase} import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.rules.CometScanTypeChecker import org.apache.comet.serde.Compatible class CometToPrettyStringSuite extends CometFuzzTestBase { @@ -56,14 +53,14 @@ class CometToPrettyStringSuite extends CometFuzzTestBase { val plan = Project(Seq(prettyExpr), table) val analyzed = spark.sessionState.analyzer.execute(plan) val result: DataFrame = Dataset.ofRows(spark, analyzed) - CometCast.isSupported( + val supportLevel = CometCast.isSupported( field.dataType, DataTypes.StringType, Some(spark.sessionState.conf.sessionLocalTimeZone), - CometEvalMode.TRY) match { + CometEvalMode.TRY) + supportLevel match { case _: Compatible - if CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(field.dataType, field.name, ListBuffer.empty) => + if CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET => checkSparkAnswerAndOperator(result) case _ => checkSparkAnswer(result) }