Skip to content

Commit 9dd6fd5

Browse files
authored
chore: Ignore Spark SQL WholeStageCodegenSuite tests (#1859)
1 parent cdfdc21 commit 9dd6fd5

File tree

4 files changed

+62
-496
lines changed

4 files changed

+62
-496
lines changed

dev/diffs/3.4.3.diff

Lines changed: 14 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,153 +1305,27 @@ index b14f4a405f6..ab7baf434a5 100644
13051305
spark.range(1).foreach { _ =>
13061306
columnarToRowExec.canonicalized
13071307
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
1308-
index ac710c32296..baae214c6ee 100644
1308+
index ac710c32296..2854b433dd3 100644
13091309
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
13101310
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
1311-
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
1311+
@@ -17,7 +17,7 @@
1312+
1313+
package org.apache.spark.sql.execution
13121314

1313-
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
1315+
-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
1316+
+import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row, SaveMode}
13141317
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
1315-
+import org.apache.spark.sql.comet.{CometHashJoinExec, CometSortMergeJoinExec}
13161318
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
13171319
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
1318-
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
1319-
@@ -169,6 +170,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1320-
val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2")
1321-
assert(oneJoinDF.queryExecution.executedPlan.collect {
1322-
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
1323-
+ case _: CometHashJoinExec => true
1324-
}.size === 1)
1325-
checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4)))
1326-
1327-
@@ -177,6 +179,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1328-
.join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3")
1329-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1330-
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
1331-
+ case _: CometHashJoinExec => true
1332-
}.size === 2)
1333-
checkAnswer(twoJoinsDF,
1334-
Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4)))
1335-
@@ -193,6 +196,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1336-
assert(joinUniqueDF.queryExecution.executedPlan.collect {
1337-
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true
1338-
case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true
1339-
+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
1340-
+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
1341-
}.size === 1)
1342-
checkAnswer(joinUniqueDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4),
1343-
Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9)))
1344-
@@ -203,6 +208,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1345-
assert(joinNonUniqueDF.queryExecution.executedPlan.collect {
1346-
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true
1347-
case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true
1348-
+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
1349-
+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
1350-
}.size === 1)
1351-
checkAnswer(joinNonUniqueDF, Seq(Row(0, 0), Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1),
1352-
Row(1, 4), Row(1, 7), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null), Row(4, null)))
1353-
@@ -213,6 +220,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1354-
assert(joinWithNonEquiDF.queryExecution.executedPlan.collect {
1355-
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true
1356-
case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true
1357-
+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
1358-
+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
1359-
}.size === 1)
1360-
checkAnswer(joinWithNonEquiDF, Seq(Row(0, 0), Row(0, 6), Row(0, 9), Row(1, 1),
1361-
Row(1, 7), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null), Row(null, 3), Row(null, 4),
1362-
@@ -224,6 +233,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1363-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1364-
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true
1365-
case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true
1366-
+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
1367-
+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
1368-
}.size === 2)
1369-
checkAnswer(twoJoinsDF,
1370-
Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null), Row(4, 4, null),
1371-
@@ -241,6 +252,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1372-
val oneLeftOuterJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_outer")
1373-
assert(oneLeftOuterJoinDF.queryExecution.executedPlan.collect {
1374-
case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1375-
+ case _: CometSortMergeJoinExec => true
1376-
}.size === 1)
1377-
checkAnswer(oneLeftOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, null),
1378-
Row(5, null), Row(6, null), Row(7, null), Row(8, null), Row(9, null)))
1379-
@@ -249,6 +261,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1380-
val oneRightOuterJoinDF = df2.join(df3.hint("SHUFFLE_MERGE"), $"k2" === $"k3", "right_outer")
1381-
assert(oneRightOuterJoinDF.queryExecution.executedPlan.collect {
1382-
case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1383-
+ case _: CometSortMergeJoinExec => true
1384-
}.size === 1)
1385-
checkAnswer(oneRightOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(null, 4),
1386-
Row(null, 5)))
1387-
@@ -258,6 +271,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1388-
.join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer")
1389-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1390-
case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1391-
+ case _: CometSortMergeJoinExec => true
1392-
}.size === 2)
1393-
checkAnswer(twoJoinsDF,
1394-
Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, null, 4), Row(5, null, 5),
1395-
@@ -273,6 +287,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1396-
val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_semi")
1397-
assert(oneJoinDF.queryExecution.executedPlan.collect {
1398-
case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) => true
1399-
+ case _: CometSortMergeJoinExec => true
1400-
}.size === 1)
1401-
checkAnswer(oneJoinDF, Seq(Row(0), Row(1), Row(2), Row(3)))
1402-
1403-
@@ -280,8 +295,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1404-
val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2", "left_semi")
1405-
.join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi")
1406-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1407-
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
1408-
- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1409-
+ case _: SortMergeJoinExec => true
1410-
+ case _: CometSortMergeJoinExec => true
1411-
}.size === 2)
1412-
checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3)))
1413-
}
1414-
@@ -295,6 +310,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1415-
val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti")
1416-
assert(oneJoinDF.queryExecution.executedPlan.collect {
1417-
case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) => true
1418-
+ case _: CometSortMergeJoinExec => true
1419-
}.size === 1)
1420-
checkAnswer(oneJoinDF, Seq(Row(4), Row(5), Row(6), Row(7), Row(8), Row(9)))
1421-
1422-
@@ -302,8 +318,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1423-
val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti")
1424-
.join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti")
1425-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1426-
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
1427-
- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1428-
+ case _: SortMergeJoinExec => true
1429-
+ case _: CometSortMergeJoinExec => true
1430-
}.size === 2)
1431-
checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9)))
1432-
}
1433-
@@ -433,10 +449,6 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1434-
1435-
test("Sort should be included in WholeStageCodegen") {
1436-
val df = spark.range(3, 0, -1).toDF().sort(col("id"))
1437-
- val plan = df.queryExecution.executedPlan
1438-
- assert(plan.exists(p =>
1439-
- p.isInstanceOf[WholeStageCodegenExec] &&
1440-
- p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
1441-
assert(df.collect() === Array(Row(1), Row(2), Row(3)))
1442-
}
1443-
1444-
@@ -616,7 +628,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1445-
.write.mode(SaveMode.Overwrite).parquet(path)
1446-
1447-
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
1448-
- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
1449-
+ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
1450-
+ // Disable Comet native execution because this checks wholestage codegen.
1451-
+ "spark.comet.exec.enabled" -> "false") {
1452-
val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as newC$i")
1453-
val df = spark.read.parquet(path).selectExpr(projection: _*)
1320+
@@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SharedSparkSession
1321+
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
1322+
1323+
// Disable AQE because the WholeStageCodegenExec is added when running QueryStageExec
1324+
-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1325+
+class WholeStageCodegenSuite extends QueryTest with SharedSparkSession with IgnoreCometSuite
1326+
with DisableAdaptiveExecutionSuite {
14541327

1328+
import testImplicits._
14551329
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
14561330
index 593bd7bb4ba..32af28b0238 100644
14571331
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

dev/diffs/3.5.4.diff

Lines changed: 15 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,76 +1476,27 @@ index b14f4a405f6..ab7baf434a5 100644
14761476
spark.range(1).foreach { _ =>
14771477
columnarToRowExec.canonicalized
14781478
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
1479-
index 5a413c77754..a6f97dccb67 100644
1479+
index 5a413c77754..207b66e1d7b 100644
14801480
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
14811481
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
1482-
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
1483-
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
1482+
@@ -17,7 +17,7 @@
1483+
1484+
package org.apache.spark.sql.execution
1485+
1486+
-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
1487+
+import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row, SaveMode}
14841488
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
14851489
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
1486-
+import org.apache.spark.sql.comet.{CometSortExec, CometSortMergeJoinExec}
14871490
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
1488-
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
1489-
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
1490-
@@ -235,6 +236,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1491-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1492-
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true
1493-
case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true
1494-
+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
1495-
}.size === 2)
1496-
checkAnswer(twoJoinsDF,
1497-
Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null), Row(4, 4, null),
1498-
@@ -358,6 +360,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1499-
.join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer")
1500-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1501-
case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1502-
+ case _: CometSortMergeJoinExec => true
1503-
}.size === 2)
1504-
checkAnswer(twoJoinsDF,
1505-
Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, null, 4), Row(5, null, 5),
1506-
@@ -380,8 +383,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1507-
val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2", "left_semi")
1508-
.join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi")
1509-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1510-
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
1511-
- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1512-
+ case _: SortMergeJoinExec => true
1513-
}.size === 2)
1514-
checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3)))
1515-
}
1516-
@@ -402,8 +404,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1517-
val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti")
1518-
.join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti")
1519-
assert(twoJoinsDF.queryExecution.executedPlan.collect {
1520-
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
1521-
- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
1522-
+ case _: SortMergeJoinExec => true
1523-
}.size === 2)
1524-
checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9)))
1525-
}
1526-
@@ -536,7 +537,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1527-
val plan = df.queryExecution.executedPlan
1528-
assert(plan.exists(p =>
1529-
p.isInstanceOf[WholeStageCodegenExec] &&
1530-
- p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
1531-
+ p.asInstanceOf[WholeStageCodegenExec].collect {
1532-
+ case _: SortExec => true
1533-
+ case _: CometSortExec => true
1534-
+ }.nonEmpty))
1535-
assert(df.collect() === Array(Row(1), Row(2), Row(3)))
1536-
}
1537-
1538-
@@ -716,7 +720,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1539-
.write.mode(SaveMode.Overwrite).parquet(path)
1540-
1541-
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
1542-
- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
1543-
+ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
1544-
+ // Disable Comet native execution because this checks wholestage codegen.
1545-
+ "spark.comet.exec.enabled" -> "false") {
1546-
val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as newC$i")
1547-
val df = spark.read.parquet(path).selectExpr(projection: _*)
1491+
@@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSparkSession
1492+
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
1493+
1494+
// Disable AQE because the WholeStageCodegenExec is added when running QueryStageExec
1495+
-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
1496+
+class WholeStageCodegenSuite extends QueryTest with SharedSparkSession with IgnoreCometSuite
1497+
with DisableAdaptiveExecutionSuite {
15481498

1499+
import testImplicits._
15491500
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
15501501
index 2f8e401e743..a4f94417dcc 100644
15511502
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

0 commit comments

Comments
 (0)