@@ -2465,17 +2465,14 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
24652465 val df = spark.read.parquet(dir.toString())
24662466
24672467 checkSparkAnswerAndOperator(df.select(" nested1.id" ))
2468-
24692468 checkSparkAnswerAndOperator(df.select(" nested1.id" , " nested1.nested2.id" ))
2470-
2471- // unsupported cast from Int64 to Struct([Field { name: "id", data_type: Int64, ...
2472- // checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
2469+ checkSparkAnswerAndOperator(df.select(" nested1.nested2.id" ))
24732470 }
24742471 }
24752472 }
24762473
2477- // TODO this is not using DataFusion's ParquetExec for some reason
2478- ignore( " get_struct_field with DataFusion ParquetExec - read entire struct " ) {
2474+ test( " get_struct_field with DataFusion ParquetExec - read entire struct " ) {
2475+ assume(usingDataSourceExec(conf))
24792476 withTempPath { dir =>
24802477 // create input file with Comet disabled
24812478 withSQLConf(CometConf .COMET_ENABLED .key -> " false" ) {
@@ -2501,13 +2498,19 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
25012498 CometConf .COMET_EXPLAIN_FALLBACK_ENABLED .key -> " true" ) {
25022499
25032500 val df = spark.read.parquet(dir.toString())
2504- checkSparkAnswerAndOperator(df.select(" nested1" ))
2501+ if (v1List.isEmpty) {
2502+ checkSparkAnswer(df.select(" nested1" ))
2503+ } else {
2504+ checkSparkAnswerAndOperator(df.select(" nested1" ))
2505+ }
25052506 }
25062507 }
25072508 }
25082509 }
25092510
2510- ignore(" read map[int, int] from parquet" ) {
2511+ test(" read map[int, int] from parquet" ) {
2512+ assume(usingDataSourceExec(conf))
2513+
25112514 withTempPath { dir =>
25122515// create input file with Comet disabled
25132516 withSQLConf(CometConf .COMET_ENABLED .key -> " false" ) {
@@ -2523,15 +2526,63 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
25232526 Seq (" " , " parquet" ).foreach { v1List =>
25242527 withSQLConf(SQLConf .USE_V1_SOURCE_LIST .key -> v1List) {
25252528 val df = spark.read.parquet(dir.toString())
2526- checkSparkAnswerAndOperator(df.select(" map1" ))
2527- checkSparkAnswerAndOperator(df.select(map_keys(col(" map1" ))))
2528- checkSparkAnswerAndOperator(df.select(map_values(col(" map1" ))))
2529+ if (v1List.isEmpty) {
2530+ checkSparkAnswer(df.select(" map1" ))
2531+ } else {
2532+ checkSparkAnswerAndOperator(df.select(" map1" ))
2533+ }
2534+ // we fall back to Spark for map_keys and map_values
2535+ checkSparkAnswer(df.select(map_keys(col(" map1" ))))
2536+ checkSparkAnswer(df.select(map_values(col(" map1" ))))
25292537 }
25302538 }
25312539 }
25322540 }
25332541
2534- ignore(" read array[int] from parquet" ) {
2542+ // repro for https://github.com/apache/datafusion-comet/issues/1754
2543+ ignore(" read map[struct, struct] from parquet" ) {
2544+ assume(usingDataSourceExec(conf))
2545+
2546+ withTempPath { dir =>
2547+ // create input file with Comet disabled
2548+ withSQLConf(CometConf .COMET_ENABLED .key -> " false" ) {
2549+ val df = spark
2550+ .range(5 )
2551+ .withColumn(" id2" , col(" id" ))
2552+ .withColumn(" id3" , col(" id" ))
2553+ // Spark does not allow null as a key but does allow null as a
2554+ // value, and the entire map be null
2555+ .select(
2556+ when(
2557+ col(" id" ) > 1 ,
2558+ map(
2559+ struct(col(" id" ), col(" id2" ), col(" id3" )),
2560+ when(col(" id" ) > 2 , struct(col(" id" ), col(" id2" ), col(" id3" ))))).alias(" map1" ))
2561+ df.write.parquet(dir.toString())
2562+ }
2563+
2564+ Seq (" " , " parquet" ).foreach { v1List =>
2565+ withSQLConf(SQLConf .USE_V1_SOURCE_LIST .key -> v1List) {
2566+ val df = spark.read.parquet(dir.toString())
2567+ df.createOrReplaceTempView(" tbl" )
2568+ if (v1List.isEmpty) {
2569+ checkSparkAnswer(df.select(" map1" ))
2570+ } else {
2571+ checkSparkAnswerAndOperator(df.select(" map1" ))
2572+ }
2573+ // we fall back to Spark for map_keys and map_values
2574+ checkSparkAnswer(df.select(map_keys(col(" map1" ))))
2575+ checkSparkAnswer(df.select(map_values(col(" map1" ))))
2576+ checkSparkAnswer(spark.sql(" SELECT map_keys(map1).id2 FROM tbl" ))
2577+ checkSparkAnswer(spark.sql(" SELECT map_values(map1).id2 FROM tbl" ))
2578+ }
2579+ }
2580+ }
2581+ }
2582+
2583+ test(" read array[int] from parquet" ) {
2584+ assume(usingDataSourceExec(conf))
2585+
25352586 withTempPath { dir =>
25362587// create input file with Comet disabled
25372588 withSQLConf(CometConf .COMET_ENABLED .key -> " false" ) {
@@ -2546,8 +2597,13 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
25462597 Seq (" " , " parquet" ).foreach { v1List =>
25472598 withSQLConf(SQLConf .USE_V1_SOURCE_LIST .key -> v1List) {
25482599 val df = spark.read.parquet(dir.toString())
2549- checkSparkAnswerAndOperator(df.select(" array1" ))
2550- checkSparkAnswerAndOperator(df.select(element_at(col(" array1" ), lit(1 ))))
2600+ if (v1List.isEmpty) {
2601+ checkSparkAnswer(df.select(" array1" ))
2602+ checkSparkAnswer(df.select(element_at(col(" array1" ), lit(1 ))))
2603+ } else {
2604+ checkSparkAnswerAndOperator(df.select(" array1" ))
2605+ checkSparkAnswerAndOperator(df.select(element_at(col(" array1" ), lit(1 ))))
2606+ }
25512607 }
25522608 }
25532609 }
0 commit comments