Skip to content

Commit adcfce4

Browse files
authored
chore: Enable more Spark SQL tests (#1869)
1 parent a838068 commit adcfce4

File tree

8 files changed

+36
-6635
lines changed

8 files changed

+36
-6635
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ jobs:
7474
maven_opts: "-Pspark-3.4 -Pscala-2.12"
7575
scan_impl: "native_comet"
7676

77-
- name: "Spark 3.5, JDK 11, Scala 2.12"
78-
java_version: "17"
79-
maven_opts: "-Pspark-3.5 -Pscala-2.12"
77+
- name: "Spark 3.5.4, JDK 11, Scala 2.12"
78+
java_version: "11"
79+
maven_opts: "-Pspark-3.5 -Dspark.version=3.5.4 -Pscala-2.12"
8080
scan_impl: "native_comet"
8181

82-
- name: "Spark 3.5, JDK 17, Scala 2.13"
82+
- name: "Spark 3.5.5, JDK 17, Scala 2.13"
8383
java_version: "17"
84-
maven_opts: "-Pspark-3.5 -Pscala-2.13"
84+
maven_opts: "-Pspark-3.5 -Dspark.version=3.5.5 -Pscala-2.13"
8585
scan_impl: "native_comet"
8686

8787
- name: "Spark 3.5, JDK 17, Scala 2.12 native_datafusion"

.github/workflows/spark_sql_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ jobs:
4545
matrix:
4646
os: [ubuntu-24.04]
4747
java-version: [11]
48-
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.4'}, {short: '3.5', full: '3.5.5'}, {short: '3.5', full: '3.5.6'}]
48+
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.6'}]
4949
module:
5050
- {name: "catalyst", args1: "catalyst/test", args2: ""}
5151
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}

dev/diffs/3.4.3.diff

Lines changed: 9 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ index 56e9520fdab..917932336df 100644
284284
spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
285285

286286
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
287-
index a9f69ab28a1..5d9d4f2cb83 100644
287+
index a9f69ab28a1..760ea0e9565 100644
288288
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
289289
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
290290
@@ -39,11 +39,12 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
@@ -343,16 +343,6 @@ index a9f69ab28a1..5d9d4f2cb83 100644
343343
}
344344
assert(exchanges.size == 2)
345345
}
346-
@@ -3325,7 +3328,8 @@ class DataFrameSuite extends QueryTest
347-
assert(df2.isLocal)
348-
}
349-
350-
- test("SPARK-35886: PromotePrecision should be subexpr replaced") {
351-
+ test("SPARK-35886: PromotePrecision should be subexpr replaced",
352-
+ IgnoreComet("TODO: fix Comet for this test")) {
353-
withTable("tbl") {
354-
sql(
355-
"""
356346
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
357347
index 433b4741979..07148eee480 100644
358348
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1900,7 +1890,7 @@ index 07e2849ce6f..3e73645b638 100644
19001890
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
19011891
)
19021892
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
1903-
index 104b4e416cd..81af723b4d0 100644
1893+
index 104b4e416cd..0bd2e24e387 100644
19041894
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19051895
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19061896
@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -1952,17 +1942,7 @@ index 104b4e416cd..81af723b4d0 100644
19521942
}
19531943
}
19541944
}
1955-
@@ -1744,7 +1757,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
1956-
}
1957-
}
1958-
1959-
- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
1960-
+ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
1961-
+ IgnoreComet("IN predicate is not yet supported in Comet, see issue #36")) {
1962-
val schema = StructType(Seq(
1963-
StructField("a", IntegerType, nullable = false)
1964-
))
1965-
@@ -1985,7 +1999,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
1945+
@@ -1985,7 +1998,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
19661946
}
19671947
}
19681948

@@ -1972,7 +1952,7 @@ index 104b4e416cd..81af723b4d0 100644
19721952
// block 1:
19731953
// null count min max
19741954
// page-0 0 0 99
1975-
@@ -2045,7 +2060,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
1955+
@@ -2045,7 +2059,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
19761956
}
19771957
}
19781958

@@ -1982,7 +1962,7 @@ index 104b4e416cd..81af723b4d0 100644
19821962
withTempPath { dir =>
19831963
val path = dir.getCanonicalPath
19841964
spark.range(100).selectExpr("id * 2 AS id")
1985-
@@ -2277,7 +2293,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
1965+
@@ -2277,7 +2292,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
19861966
assert(pushedParquetFilters.exists(_.getClass === filterClass),
19871967
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
19881968

@@ -1995,7 +1975,7 @@ index 104b4e416cd..81af723b4d0 100644
19951975
} else {
19961976
assert(selectedFilters.isEmpty, "There is filter pushed down")
19971977
}
1998-
@@ -2337,7 +2357,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
1978+
@@ -2337,7 +2356,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
19991979
assert(pushedParquetFilters.exists(_.getClass === filterClass),
20001980
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
20011981

@@ -2520,94 +2500,18 @@ index 75f440caefc..36b1146bc3a 100644
25202500
}.headOption.getOrElse {
25212501
fail(s"No FileScan in query\n${df.queryExecution}")
25222502
}
2523-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
2524-
index b597a244710..b2e8be41065 100644
2525-
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
2526-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
2527-
@@ -21,6 +21,7 @@ import java.io.File
2528-
2529-
import org.apache.commons.io.FileUtils
2530-
2531-
+import org.apache.spark.sql.IgnoreComet
2532-
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
2533-
import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, MemoryStream}
2534-
import org.apache.spark.sql.internal.SQLConf
2535-
@@ -91,7 +92,7 @@ class FlatMapGroupsWithStateDistributionSuite extends StreamTest
2536-
}
2537-
2538-
test("SPARK-38204: flatMapGroupsWithState should require StatefulOpClusteredDistribution " +
2539-
- "from children - without initial state") {
2540-
+ "from children - without initial state", IgnoreComet("TODO: fix Comet for this test")) {
2541-
// function will return -1 on timeout and returns count of the state otherwise
2542-
val stateFunc =
2543-
(key: (String, String), values: Iterator[(String, String, Long)],
2544-
@@ -243,7 +244,8 @@ class FlatMapGroupsWithStateDistributionSuite extends StreamTest
2545-
}
2546-
2547-
test("SPARK-38204: flatMapGroupsWithState should require ClusteredDistribution " +
2548-
- "from children if the query starts from checkpoint in 3.2.x - without initial state") {
2549-
+ "from children if the query starts from checkpoint in 3.2.x - without initial state",
2550-
+ IgnoreComet("TODO: fix Comet for this test")) {
2551-
// function will return -1 on timeout and returns count of the state otherwise
2552-
val stateFunc =
2553-
(key: (String, String), values: Iterator[(String, String, Long)],
2554-
@@ -335,7 +337,8 @@ class FlatMapGroupsWithStateDistributionSuite extends StreamTest
2555-
}
2556-
2557-
test("SPARK-38204: flatMapGroupsWithState should require ClusteredDistribution " +
2558-
- "from children if the query starts from checkpoint in prior to 3.2") {
2559-
+ "from children if the query starts from checkpoint in prior to 3.2",
2560-
+ IgnoreComet("TODO: fix Comet for this test")) {
2561-
// function will return -1 on timeout and returns count of the state otherwise
2562-
val stateFunc =
2563-
(key: (String, String), values: Iterator[(String, String, Long)],
25642503
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
2565-
index 6aa7d0945c7..38523536154 100644
2504+
index 6aa7d0945c7..ad26ad833e2 100644
25662505
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
25672506
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
2568-
@@ -25,7 +25,7 @@ import org.scalatest.exceptions.TestFailedException
2569-
2570-
import org.apache.spark.SparkException
2571-
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
2572-
-import org.apache.spark.sql.{DataFrame, Encoder}
2573-
+import org.apache.spark.sql.{DataFrame, Encoder, IgnoreCometSuite}
2574-
import org.apache.spark.sql.catalyst.InternalRow
2575-
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
2576-
import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState
2577-
@@ -46,8 +46,9 @@ case class RunningCount(count: Long)
2507+
@@ -46,6 +46,7 @@ case class RunningCount(count: Long)
25782508

25792509
case class Result(key: Long, count: Int)
25802510

25812511
+// TODO: fix Comet to enable this suite
25822512
@SlowSQLTest
2583-
-class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
2584-
+class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with IgnoreCometSuite {
2585-
2586-
import testImplicits._
2587-
2588-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
2589-
index 2a2a83d35e1..e3b7b290b3e 100644
2590-
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
2591-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
2592-
@@ -18,7 +18,7 @@
2593-
package org.apache.spark.sql.streaming
2594-
2595-
import org.apache.spark.SparkException
2596-
-import org.apache.spark.sql.{AnalysisException, Dataset, KeyValueGroupedDataset}
2597-
+import org.apache.spark.sql.{AnalysisException, Dataset, IgnoreComet, KeyValueGroupedDataset}
2598-
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
2599-
import org.apache.spark.sql.execution.streaming.MemoryStream
2600-
import org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper
2601-
@@ -253,7 +253,8 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends StateStoreMetricsTest
2602-
assert(e.message.contains(expectedError))
2603-
}
2513+
class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
26042514

2605-
- test("flatMapGroupsWithState - initial state - initial state has flatMapGroupsWithState") {
2606-
+ test("flatMapGroupsWithState - initial state - initial state has flatMapGroupsWithState",
2607-
+ IgnoreComet("TODO: fix Comet for this test")) {
2608-
val initialStateDS = Seq(("keyInStateAndData", new RunningCount(1))).toDS()
2609-
val initialState: KeyValueGroupedDataset[String, RunningCount] =
2610-
initialStateDS.groupByKey(_._1).mapValues(_._2)
26112515
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
26122516
index ef5b8a769fe..84fe1bfabc9 100644
26132517
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -2912,20 +2816,6 @@ index 52abd248f3a..7a199931a08 100644
29122816
case h: HiveTableScanExec => h.partitionPruningPred.collect {
29132817
case d: DynamicPruningExpression => d.child
29142818
}
2915-
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
2916-
index 1966e1e64fd..cde97a0aafe 100644
2917-
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
2918-
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
2919-
@@ -656,7 +656,8 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
2920-
Row(3, 4, 4, 3, null) :: Nil)
2921-
}
2922-
2923-
- test("single distinct multiple columns set") {
2924-
+ test("single distinct multiple columns set",
2925-
+ IgnoreComet("TODO: fix Comet for this test")) {
2926-
checkAnswer(
2927-
spark.sql(
2928-
"""
29292819
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
29302820
index 07361cfdce9..b4d53dbe900 100644
29312821
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala

0 commit comments

Comments
 (0)