Skip to content

Commit 3ea6ad6

Browse files
authored
chore: Expand test coverage for CometWindowsExec (#2711)
1 parent 31a2160 commit 3ea6ad6

File tree

4 files changed

+987
-172
lines changed

4 files changed

+987
-172
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ jobs:
129129
org.apache.comet.exec.CometAggregateSuite
130130
org.apache.comet.exec.CometExec3_4PlusSuite
131131
org.apache.comet.exec.CometExecSuite
132+
org.apache.comet.exec.CometWindowExecSuite
132133
org.apache.comet.exec.CometJoinSuite
133134
org.apache.comet.CometArrayExpressionSuite
134135
org.apache.comet.CometCastSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ jobs:
9494
org.apache.comet.exec.CometAggregateSuite
9595
org.apache.comet.exec.CometExec3_4PlusSuite
9696
org.apache.comet.exec.CometExecSuite
97+
org.apache.comet.exec.CometWindowExecSuite
9798
org.apache.comet.exec.CometJoinSuite
9899
org.apache.comet.CometArrayExpressionSuite
99100
org.apache.comet.CometCastSuite

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 1 addition & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc
4242
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec}
4343
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
4444
import org.apache.spark.sql.execution.window.WindowExec
45-
import org.apache.spark.sql.expressions.Window
4645
import org.apache.spark.sql.functions._
4746
import org.apache.spark.sql.internal.SQLConf
4847
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
@@ -314,45 +313,6 @@ class CometExecSuite extends CometTestBase {
314313
}
315314
}
316315

317-
test(
318-
"fall back to Spark when the partition spec and order spec are not the same for window function") {
319-
withTempView("test") {
320-
sql("""
321-
|CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES
322-
| (1, true), (1, false),
323-
|(2, true), (3, false), (4, true) AS test(k, v)
324-
|""".stripMargin)
325-
326-
val df = sql("""
327-
SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg
328-
|""".stripMargin)
329-
checkSparkAnswer(df)
330-
}
331-
}
332-
333-
test("Native window operator should be CometUnaryExec") {
334-
withTempView("testData") {
335-
sql("""
336-
|CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
337-
|(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
338-
|(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
339-
|(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"),
340-
|(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"),
341-
|(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"),
342-
|(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"),
343-
|(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"),
344-
|(null, null, null, null, null, null),
345-
|(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null)
346-
|AS testData(val, val_long, val_double, val_date, val_timestamp, cate)
347-
|""".stripMargin)
348-
val df1 = sql("""
349-
|SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW)
350-
|FROM testData ORDER BY cate, val
351-
|""".stripMargin)
352-
checkSparkAnswer(df1)
353-
}
354-
}
355-
356316
test("subquery execution under CometTakeOrderedAndProjectExec should not fail") {
357317
assume(isSpark35Plus, "SPARK-45584 is fixed in Spark 3.5+")
358318

@@ -374,32 +334,6 @@ class CometExecSuite extends CometTestBase {
374334
}
375335
}
376336

377-
test("Window range frame with long boundary should not fail") {
378-
val df =
379-
Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, "1"), (2147483650L, "2"))
380-
.toDF("key", "value")
381-
382-
checkSparkAnswer(
383-
df.select(
384-
$"key",
385-
count("key").over(
386-
Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L))))
387-
checkSparkAnswer(
388-
df.select(
389-
$"key",
390-
count("key").over(
391-
Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))))
392-
}
393-
394-
test("Unsupported window expression should fall back to Spark") {
395-
checkAnswer(
396-
spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"),
397-
Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil)
398-
checkAnswer(
399-
spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"),
400-
Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil)
401-
}
402-
403337
test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
404338
withSQLConf(
405339
CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
@@ -508,26 +442,6 @@ class CometExecSuite extends CometTestBase {
508442
}
509443
}
510444

511-
test("Repeated shuffle exchange don't fail") {
512-
Seq("true", "false").foreach { aqeEnabled =>
513-
withSQLConf(
514-
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
515-
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true",
516-
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
517-
val df =
518-
Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value")
519-
val windowSpec = Window.partitionBy("key1", "key2").orderBy("value")
520-
521-
val windowed = df
522-
// repartition by subset of window partitionBy keys which satisfies ClusteredDistribution
523-
.repartition($"key1")
524-
.select(lead($"key1", 1).over(windowSpec), lead($"value", 1).over(windowSpec))
525-
526-
checkSparkAnswer(windowed)
527-
}
528-
}
529-
}
530-
531445
test("try_sum should return null if overflow happens before merging") {
532446
val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v")
533447
val yearMonthDf = Seq(Int.MaxValue, Int.MaxValue, 2)
@@ -1789,7 +1703,7 @@ class CometExecSuite extends CometTestBase {
17891703
spark
17901704
.range(numRows)
17911705
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
1792-
.repartition(3) // Force repartition to test data will come to single partition
1706+
.repartition(3) // Move data across multiple partitions
17931707
.write
17941708
.saveAsTable("t1")
17951709

@@ -2060,91 +1974,6 @@ class CometExecSuite extends CometTestBase {
20601974
}
20611975
}
20621976

2063-
test("aggregate window function for all types") {
2064-
val numValues = 2048
2065-
2066-
Seq(1, 100, numValues).foreach { numGroups =>
2067-
Seq(true, false).foreach { dictionaryEnabled =>
2068-
withTempPath { dir =>
2069-
val path = new Path(dir.toURI.toString, "test.parquet")
2070-
makeParquetFile(path, numValues, numGroups, dictionaryEnabled)
2071-
withParquetTable(path.toUri.toString, "tbl") {
2072-
Seq(128, numValues + 100).foreach { batchSize =>
2073-
withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
2074-
(1 to 11).foreach { col =>
2075-
val aggregateFunctions =
2076-
List(s"COUNT(_$col)", s"MAX(_$col)", s"MIN(_$col)", s"SUM(_$col)")
2077-
aggregateFunctions.foreach { function =>
2078-
val df1 = sql(s"SELECT $function OVER() FROM tbl")
2079-
checkSparkAnswerWithTolerance(df1, 1e-6)
2080-
2081-
val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl")
2082-
checkSparkAnswerWithTolerance(df2, 1e-6)
2083-
2084-
val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl")
2085-
checkSparkAnswerWithTolerance(df3, 1e-6)
2086-
2087-
val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl")
2088-
checkSparkAnswerWithTolerance(df4, 1e-6)
2089-
}
2090-
}
2091-
2092-
// SUM doesn't work for Date type. org.apache.spark.sql.AnalysisException will be thrown.
2093-
val aggregateFunctionsWithoutSum = List("COUNT(_12)", "MAX(_12)", "MIN(_12)")
2094-
aggregateFunctionsWithoutSum.foreach { function =>
2095-
val df1 = sql(s"SELECT $function OVER() FROM tbl")
2096-
checkSparkAnswerWithTolerance(df1, 1e-6)
2097-
2098-
val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl")
2099-
checkSparkAnswerWithTolerance(df2, 1e-6)
2100-
2101-
val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl")
2102-
checkSparkAnswerWithTolerance(df3, 1e-6)
2103-
2104-
val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl")
2105-
checkSparkAnswerWithTolerance(df4, 1e-6)
2106-
}
2107-
}
2108-
}
2109-
}
2110-
}
2111-
}
2112-
}
2113-
}
2114-
2115-
test("Windows support") {
2116-
Seq("true", "false").foreach(aqeEnabled =>
2117-
withSQLConf(
2118-
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
2119-
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
2120-
withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO: test nulls
2121-
val aggregateFunctions =
2122-
List(
2123-
"COUNT(_1)",
2124-
"COUNT(*)",
2125-
"MAX(_1)",
2126-
"MIN(_1)",
2127-
"SUM(_1)"
2128-
) // TODO: Test all the aggregates
2129-
2130-
aggregateFunctions.foreach { function =>
2131-
val queries = Seq(
2132-
s"SELECT $function OVER() FROM t1",
2133-
s"SELECT $function OVER(order by _2) FROM t1",
2134-
s"SELECT $function OVER(order by _2 desc) FROM t1",
2135-
s"SELECT $function OVER(partition by _2 order by _2) FROM t1",
2136-
s"SELECT $function OVER(rows between 1 preceding and 1 following) FROM t1",
2137-
s"SELECT $function OVER(order by _2 rows between 1 preceding and current row) FROM t1",
2138-
s"SELECT $function OVER(order by _2 rows between current row and 1 following) FROM t1")
2139-
2140-
queries.foreach { query =>
2141-
checkSparkAnswerAndOperator(query)
2142-
}
2143-
}
2144-
}
2145-
})
2146-
}
2147-
21481977
test("read CSV file") {
21491978
Seq("", "csv").foreach { v1List =>
21501979
withSQLConf(

0 commit comments

Comments
 (0)