Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometArrayExpressionSuite
org.apache.comet.CometCastSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometArrayExpressionSuite
org.apache.comet.CometCastSuite
Expand Down
173 changes: 1 addition & 172 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
Expand Down Expand Up @@ -314,45 +313,6 @@ class CometExecSuite extends CometTestBase {
}
}

test(
"fall back to Spark when the partition spec and order spec are not the same for window function") {
withTempView("test") {
sql("""
|CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES
| (1, true), (1, false),
|(2, true), (3, false), (4, true) AS test(k, v)
|""".stripMargin)

val df = sql("""
SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg
|""".stripMargin)
checkSparkAnswer(df)
}
}

test("Native window operator should be CometUnaryExec") {
withTempView("testData") {
sql("""
|CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
|(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
|(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
|(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"),
|(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"),
|(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"),
|(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"),
|(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"),
|(null, null, null, null, null, null),
|(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null)
|AS testData(val, val_long, val_double, val_date, val_timestamp, cate)
|""".stripMargin)
val df1 = sql("""
|SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW)
|FROM testData ORDER BY cate, val
|""".stripMargin)
checkSparkAnswer(df1)
}
}

test("subquery execution under CometTakeOrderedAndProjectExec should not fail") {
assume(isSpark35Plus, "SPARK-45584 is fixed in Spark 3.5+")

Expand All @@ -374,32 +334,6 @@ class CometExecSuite extends CometTestBase {
}
}

test("Window range frame with long boundary should not fail") {
val df =
Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, "1"), (2147483650L, "2"))
.toDF("key", "value")

checkSparkAnswer(
df.select(
$"key",
count("key").over(
Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L))))
checkSparkAnswer(
df.select(
$"key",
count("key").over(
Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))))
}

test("Unsupported window expression should fall back to Spark") {
checkAnswer(
spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"),
Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil)
checkAnswer(
spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"),
Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil)
}

test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
withSQLConf(
CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
Expand Down Expand Up @@ -508,26 +442,6 @@ class CometExecSuite extends CometTestBase {
}
}

test("Repeated shuffle exchange don't fail") {
Seq("true", "false").foreach { aqeEnabled =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df =
Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value")
val windowSpec = Window.partitionBy("key1", "key2").orderBy("value")

val windowed = df
// repartition by subset of window partitionBy keys which satisfies ClusteredDistribution
.repartition($"key1")
.select(lead($"key1", 1).over(windowSpec), lead($"value", 1).over(windowSpec))

checkSparkAnswer(windowed)
}
}
}

test("try_sum should return null if overflow happens before merging") {
val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v")
val yearMonthDf = Seq(Int.MaxValue, Int.MaxValue, 2)
Expand Down Expand Up @@ -1789,7 +1703,7 @@ class CometExecSuite extends CometTestBase {
spark
.range(numRows)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.repartition(3) // Move data across multiple partitions
.write
.saveAsTable("t1")

Expand Down Expand Up @@ -2060,91 +1974,6 @@ class CometExecSuite extends CometTestBase {
}
}

test("aggregate window function for all types") {
val numValues = 2048

Seq(1, 100, numValues).foreach { numGroups =>
Seq(true, false).foreach { dictionaryEnabled =>
withTempPath { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFile(path, numValues, numGroups, dictionaryEnabled)
withParquetTable(path.toUri.toString, "tbl") {
Seq(128, numValues + 100).foreach { batchSize =>
withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
(1 to 11).foreach { col =>
val aggregateFunctions =
List(s"COUNT(_$col)", s"MAX(_$col)", s"MIN(_$col)", s"SUM(_$col)")
aggregateFunctions.foreach { function =>
val df1 = sql(s"SELECT $function OVER() FROM tbl")
checkSparkAnswerWithTolerance(df1, 1e-6)

val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl")
checkSparkAnswerWithTolerance(df2, 1e-6)

val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl")
checkSparkAnswerWithTolerance(df3, 1e-6)

val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl")
checkSparkAnswerWithTolerance(df4, 1e-6)
}
}

// SUM doesn't work for Date type. org.apache.spark.sql.AnalysisException will be thrown.
val aggregateFunctionsWithoutSum = List("COUNT(_12)", "MAX(_12)", "MIN(_12)")
aggregateFunctionsWithoutSum.foreach { function =>
val df1 = sql(s"SELECT $function OVER() FROM tbl")
checkSparkAnswerWithTolerance(df1, 1e-6)

val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl")
checkSparkAnswerWithTolerance(df2, 1e-6)

val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl")
checkSparkAnswerWithTolerance(df3, 1e-6)

val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl")
checkSparkAnswerWithTolerance(df4, 1e-6)
}
}
}
}
}
}
}
}

test("Windows support") {
Seq("true", "false").foreach(aqeEnabled =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO: test nulls
val aggregateFunctions =
List(
"COUNT(_1)",
"COUNT(*)",
"MAX(_1)",
"MIN(_1)",
"SUM(_1)"
) // TODO: Test all the aggregates

aggregateFunctions.foreach { function =>
val queries = Seq(
s"SELECT $function OVER() FROM t1",
s"SELECT $function OVER(order by _2) FROM t1",
s"SELECT $function OVER(order by _2 desc) FROM t1",
s"SELECT $function OVER(partition by _2 order by _2) FROM t1",
s"SELECT $function OVER(rows between 1 preceding and 1 following) FROM t1",
s"SELECT $function OVER(order by _2 rows between 1 preceding and current row) FROM t1",
s"SELECT $function OVER(order by _2 rows between current row and 1 following) FROM t1")

queries.foreach { query =>
checkSparkAnswerAndOperator(query)
}
}
}
})
}

test("read CSV file") {
Seq("", "csv").foreach { v1List =>
withSQLConf(
Expand Down
Loading
Loading