diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 02b544e2d5..74ac9c6956 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -129,6 +129,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometArrayExpressionSuite org.apache.comet.CometCastSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 3a1b82d044..0478a35eb1 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -94,6 +94,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometArrayExpressionSuite org.apache.comet.CometCastSuite diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 56174c7fc0..cba0329df8 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE @@ -314,45 +313,6 @@ class CometExecSuite extends CometTestBase { } } - test( - "fall back to Spark when the partition spec and order spec are not the same for window function") { - withTempView("test") { - sql(""" - |CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES - | (1, true), (1, false), - |(2, true), (3, false), (4, true) AS test(k, v) - |""".stripMargin) - - val df = sql(""" - SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg - |""".stripMargin) - checkSparkAnswer(df) - } - } - - test("Native window operator should be CometUnaryExec") { - withTempView("testData") { - sql(""" - |CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES - |(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), - |(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), - |(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"), - |(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"), - |(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"), - |(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"), - |(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"), - |(null, null, null, null, null, null), - |(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) - |AS testData(val, val_long, val_double, val_date, val_timestamp, cate) - |""".stripMargin) - val df1 = sql(""" - |SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) - |FROM testData ORDER BY cate, val - |""".stripMargin) - checkSparkAnswer(df1) - } - } - test("subquery execution under CometTakeOrderedAndProjectExec should not fail") { assume(isSpark35Plus, "SPARK-45584 is fixed in Spark 3.5+") @@ -374,32 +334,6 @@ class CometExecSuite extends CometTestBase { } } - test("Window range frame with long boundary should not fail") { - val df = - Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, "1"), (2147483650L, "2")) - .toDF("key", "value") - - checkSparkAnswer( - df.select( - $"key", - count("key").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L)))) - checkSparkAnswer( - df.select( - $"key", - count("key").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0)))) - } - - test("Unsupported window expression should fall back to Spark") { - checkAnswer( - spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"), - Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil) - checkAnswer( - spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"), - Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil) - } - test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") { withSQLConf( CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true", @@ -508,26 +442,6 @@ class CometExecSuite extends CometTestBase { } } - test("Repeated shuffle exchange don't fail") { - Seq("true", "false").foreach { aqeEnabled => - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled, - SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true", - CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { - val df = - Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value") - val windowSpec = Window.partitionBy("key1", "key2").orderBy("value") - - val windowed = df - // repartition by subset of window partitionBy keys which satisfies ClusteredDistribution - .repartition($"key1") - .select(lead($"key1", 1).over(windowSpec), lead($"value", 1).over(windowSpec)) - - checkSparkAnswer(windowed) - } - } - } - test("try_sum should return null if overflow happens before merging") { val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v") val yearMonthDf = Seq(Int.MaxValue, Int.MaxValue, 2) @@ -1789,7 +1703,7 @@ class CometExecSuite extends CometTestBase { spark .range(numRows) .selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b") - .repartition(3) // Force repartition to test data will come to single partition + .repartition(3) // Move data across multiple partitions .write .saveAsTable("t1") @@ -2060,91 +1974,6 @@ class CometExecSuite extends CometTestBase { } } - test("aggregate window function for all types") { - val numValues = 2048 - - Seq(1, 100, numValues).foreach { numGroups => - Seq(true, false).foreach { dictionaryEnabled => - withTempPath { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFile(path, numValues, numGroups, dictionaryEnabled) - withParquetTable(path.toUri.toString, "tbl") { - Seq(128, numValues + 100).foreach { batchSize => - withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { - (1 to 11).foreach { col => - val aggregateFunctions = - List(s"COUNT(_$col)", s"MAX(_$col)", s"MIN(_$col)", s"SUM(_$col)") - aggregateFunctions.foreach { function => - val df1 = sql(s"SELECT $function OVER() FROM tbl") - checkSparkAnswerWithTolerance(df1, 1e-6) - - val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl") - checkSparkAnswerWithTolerance(df2, 1e-6) - - val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl") - checkSparkAnswerWithTolerance(df3, 1e-6) - - val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl") - checkSparkAnswerWithTolerance(df4, 1e-6) - } - } - - // SUM doesn't work for Date type. org.apache.spark.sql.AnalysisException will be thrown. - val aggregateFunctionsWithoutSum = List("COUNT(_12)", "MAX(_12)", "MIN(_12)") - aggregateFunctionsWithoutSum.foreach { function => - val df1 = sql(s"SELECT $function OVER() FROM tbl") - checkSparkAnswerWithTolerance(df1, 1e-6) - - val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl") - checkSparkAnswerWithTolerance(df2, 1e-6) - - val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl") - checkSparkAnswerWithTolerance(df3, 1e-6) - - val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl") - checkSparkAnswerWithTolerance(df4, 1e-6) - } - } - } - } - } - } - } - } - - test("Windows support") { - Seq("true", "false").foreach(aqeEnabled => - withSQLConf( - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { - withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO: test nulls - val aggregateFunctions = - List( - "COUNT(_1)", - "COUNT(*)", - "MAX(_1)", - "MIN(_1)", - "SUM(_1)" - ) // TODO: Test all the aggregates - - aggregateFunctions.foreach { function => - val queries = Seq( - s"SELECT $function OVER() FROM t1", - s"SELECT $function OVER(order by _2) FROM t1", - s"SELECT $function OVER(order by _2 desc) FROM t1", - s"SELECT $function OVER(partition by _2 order by _2) FROM t1", - s"SELECT $function OVER(rows between 1 preceding and 1 following) FROM t1", - s"SELECT $function OVER(order by _2 rows between 1 preceding and current row) FROM t1", - s"SELECT $function OVER(order by _2 rows between current row and 1 following) FROM t1") - - queries.foreach { query => - checkSparkAnswerAndOperator(query) - } - } - } - }) - } - test("read CSV file") { Seq("", "csv").foreach { v1List => withSQLConf( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala new file mode 100644 index 0000000000..488829f996 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala @@ -0,0 +1,984 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions.{count, lead} +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf + +class CometWindowExecSuite extends CometTestBase { + + import testImplicits._ + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { + testFun + } + } + } + + test( + "fall back to Spark when the partition spec and order spec are not the same for window function") { + withTempView("test") { + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES + | (1, true), (1, false), + |(2, true), (3, false), (4, true) AS test(k, v) + |""".stripMargin) + + val df = sql(""" + SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg + |""".stripMargin) + checkSparkAnswer(df) + } + } + + test("Native window operator should be CometUnaryExec") { + withTempView("testData") { + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES + |(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), + |(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), + |(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"), + |(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"), + |(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"), + |(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"), + |(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"), + |(null, null, null, null, null, null), + |(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) + |AS testData(val, val_long, val_double, val_date, val_timestamp, cate) + |""".stripMargin) + val df1 = sql(""" + |SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) + |FROM testData ORDER BY cate, val + |""".stripMargin) + checkSparkAnswer(df1) + } + } + + test("Window range frame with long boundary should not fail") { + val df = + Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, "1"), (2147483650L, "2")) + .toDF("key", "value") + + checkSparkAnswer( + df.select( + $"key", + count("key").over( + Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L)))) + checkSparkAnswer( + df.select( + $"key", + count("key").over( + Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0)))) + } + + test("Unsupported window expression should fall back to Spark") { + checkAnswer( + spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"), + Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil) + checkAnswer( + spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"), + Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil) + } + + test("Repeated shuffle exchange don't fail") { + Seq("true", "false").foreach { aqeEnabled => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled, + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + val df = + Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value") + val windowSpec = Window.partitionBy("key1", "key2").orderBy("value") + + val windowed = df + // repartition by subset of window partitionBy keys which satisfies ClusteredDistribution + .repartition($"key1") + .select(lead($"key1", 1).over(windowSpec), lead($"value", 1).over(windowSpec)) + + checkSparkAnswer(windowed) + } + } + } + + test("aggregate window function for all types") { + val numValues = 2048 + + Seq(1, 100, numValues).foreach { numGroups => + Seq(true, false).foreach { dictionaryEnabled => + withTempPath { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFile(path, numValues, numGroups, dictionaryEnabled) + withParquetTable(path.toUri.toString, "tbl") { + Seq(128, numValues + 100).foreach { batchSize => + withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { + (1 to 11).foreach { col => + val aggregateFunctions = + List(s"COUNT(_$col)", s"MAX(_$col)", s"MIN(_$col)", s"SUM(_$col)") + aggregateFunctions.foreach { function => + val df1 = sql(s"SELECT $function OVER() FROM tbl") + checkSparkAnswerWithTolerance(df1, 1e-6) + + val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl") + checkSparkAnswerWithTolerance(df2, 1e-6) + + val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl") + checkSparkAnswerWithTolerance(df3, 1e-6) + + val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl") + checkSparkAnswerWithTolerance(df4, 1e-6) + } + } + + // SUM doesn't work for Date type. org.apache.spark.sql.AnalysisException will be thrown. + val aggregateFunctionsWithoutSum = List("COUNT(_12)", "MAX(_12)", "MIN(_12)") + aggregateFunctionsWithoutSum.foreach { function => + val df1 = sql(s"SELECT $function OVER() FROM tbl") + checkSparkAnswerWithTolerance(df1, 1e-6) + + val df2 = sql(s"SELECT $function OVER(order by _2) FROM tbl") + checkSparkAnswerWithTolerance(df2, 1e-6) + + val df3 = sql(s"SELECT $function OVER(order by _2 desc) FROM tbl") + checkSparkAnswerWithTolerance(df3, 1e-6) + + val df4 = sql(s"SELECT $function OVER(partition by _2 order by _2) FROM tbl") + checkSparkAnswerWithTolerance(df4, 1e-6) + } + } + } + } + } + } + } + } + + test("Windows support") { + Seq("true", "false").foreach(aqeEnabled => + withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO: test nulls + val aggregateFunctions = + List( + "COUNT(_1)", + "COUNT(*)", + "MAX(_1)", + "MIN(_1)", + "SUM(_1)" + ) // TODO: Test all the aggregates + + aggregateFunctions.foreach { function => + val queries = Seq( + s"SELECT $function OVER() FROM t1", + s"SELECT $function OVER(order by _2) FROM t1", + s"SELECT $function OVER(order by _2 desc) FROM t1", + s"SELECT $function OVER(partition by _2 order by _2) FROM t1", + s"SELECT $function OVER(rows between 1 preceding and 1 following) FROM t1", + s"SELECT $function OVER(order by _2 rows between 1 preceding and current row) FROM t1", + s"SELECT $function OVER(order by _2 rows between current row and 1 following) FROM t1") + + queries.foreach { query => + checkSparkAnswerAndOperator(query) + } + } + } + }) + } + + test("window: simple COUNT(*) without frame") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql("SELECT a, b, c, COUNT(*) OVER () as cnt FROM window_test") + checkSparkAnswerAndOperator(df) + } + } + + test("window: simple SUM with PARTITION BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql("SELECT a, b, c, SUM(c) OVER (PARTITION BY a) as sum_c FROM window_test") + checkSparkAnswerAndOperator(df) + } + } + + // TODO: AVG with PARTITION BY and ORDER BY not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: AVG with PARTITION BY and ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = + sql("SELECT a, b, c, AVG(c) OVER (PARTITION BY a ORDER BY b) as avg_c FROM window_test") + checkSparkAnswerAndOperator(df) + } + } + + test("window: MIN and MAX with ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + MIN(c) OVER (ORDER BY b) as min_c, + MAX(c) OVER (ORDER BY b) as max_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW produces incorrect results + // Returns wrong cnt values - ordering issue causes swapped values for rows with same partition + ignore("window: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + COUNT(*) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as cnt + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: SUM with ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING produces incorrect results + // Returns wrong sum_c values - ordering issue causes swapped values for rows with same partition + ignore("window: SUM with ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as sum_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: AVG with ROWS BETWEEN produces incorrect results + // Returns wrong avg_c values - calculation appears to be off + ignore("window: AVG with ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + AVG(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as avg_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: SUM with ROWS BETWEEN produces incorrect results + // Returns wrong sum_c values for some rows + ignore("window: SUM with ROWS BETWEEN 2 PRECEDING AND CURRENT ROW") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as sum_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: COUNT with ROWS BETWEEN not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: COUNT with ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + COUNT(*) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING) as cnt + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: MAX with ROWS BETWEEN UNBOUNDED not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: MAX with ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + MAX(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: ROW_NUMBER not supported + // Falls back to Spark Window operator + ignore("window: ROW_NUMBER with PARTITION BY and ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, c) as row_num + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: RANK not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: RANK with PARTITION BY and ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + RANK() OVER (PARTITION BY a ORDER BY b) as rnk + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: DENSE_RANK not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: DENSE_RANK with PARTITION BY and ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + DENSE_RANK() OVER (PARTITION BY a ORDER BY b) as dense_rnk + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: PERCENT_RANK not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: PERCENT_RANK with PARTITION BY and ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + PERCENT_RANK() OVER (PARTITION BY a ORDER BY b) as pct_rnk + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: NTILE not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: NTILE with PARTITION BY and ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + NTILE(4) OVER (PARTITION BY a ORDER BY b) as ntile_4 + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: LAG produces incorrect results + // Returns wrong lag_c values - ordering issue in results + ignore("window: LAG with default offset") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + LAG(c) OVER (PARTITION BY a ORDER BY b) as lag_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: LAG with offset 2 produces incorrect results + // Returns wrong lag_c_2 values - ordering issue in results + ignore("window: LAG with offset 2 and default value") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + LAG(c, 2, -1) OVER (PARTITION BY a ORDER BY b) as lag_c_2 + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: LEAD produces incorrect results + // Returns wrong lead_c values - ordering issue in results + ignore("window: LEAD with default offset") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + LEAD(c) OVER (PARTITION BY a ORDER BY b) as lead_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: LEAD with offset 2 produces incorrect results + // Returns wrong lead_c_2 values - ordering issue in results + ignore("window: LEAD with offset 2 and default value") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + LEAD(c, 2, -1) OVER (PARTITION BY a ORDER BY b) as lead_c_2 + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: FIRST_VALUE causes encoder error + // org.apache.spark.SparkUnsupportedOperationException: [ENCODER_NOT_FOUND] Not found an encoder of the type Any + ignore("window: FIRST_VALUE with default ignore nulls") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, if (i % 7 == 0) null else i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + FIRST_VALUE(c) OVER (PARTITION BY a ORDER BY b) as first_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: LAST_VALUE causes encoder error + // org.apache.spark.SparkUnsupportedOperationException: [ENCODER_NOT_FOUND] Not found an encoder of the type Any + ignore("window: LAST_VALUE with ROWS frame") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, if (i % 7 == 0) null else i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + LAST_VALUE(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as last_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: NTH_VALUE returns incorrect results - produces 0 instead of null for first row, + // and incorrect values for subsequent rows in partition + ignore("window: NTH_VALUE with position 2") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + NTH_VALUE(c, 2) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as nth_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: CUME_DIST not supported - falls back to Spark Window operator + // Error: "Partitioning and sorting specifications must be the same" + ignore("window: CUME_DIST with PARTITION BY and ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + CUME_DIST() OVER (PARTITION BY a ORDER BY b) as cume_dist + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: Multiple window functions with mixed frame types (RowFrame and RangeFrame) + // produces incorrect row_num values - ordering issue in results + ignore("window: multiple window functions in single query") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) as row_num, + RANK() OVER (PARTITION BY a ORDER BY b) as rnk, + SUM(c) OVER (PARTITION BY a ORDER BY b) as sum_c, + AVG(c) OVER (PARTITION BY a ORDER BY b) as avg_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: Different window specifications not fully supported + // Falls back to Spark Project and Window operators + ignore("window: different window specifications in single query") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c) OVER (PARTITION BY a ORDER BY b) as sum_by_a, + SUM(c) OVER (PARTITION BY b ORDER BY a) as sum_by_b, + COUNT(*) OVER () as total_count + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: ORDER BY DESC with aggregation not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: ORDER BY DESC with aggregation") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c) OVER (PARTITION BY a ORDER BY b DESC) as sum_c_desc + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: Multiple PARTITION BY columns not supported + // Falls back to Spark Window operator + ignore("window: multiple PARTITION BY columns") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i % 2, i)) + .toDF("a", "b", "c", "d") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, d, c, + SUM(c) OVER (PARTITION BY a, b ORDER BY d) as sum_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: Multiple ORDER BY columns not supported + // Falls back to Spark Window operator + ignore("window: multiple ORDER BY columns") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i % 2, i)) + .toDF("a", "b", "c", "d") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, d, c, + ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, d, c) as row_num + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: RANGE BETWEEN with numeric ORDER BY not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: RANGE BETWEEN with numeric ORDER BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i, i * 2)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c) OVER (PARTITION BY a ORDER BY b RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING) as sum_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i, i * 2)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c) OVER (PARTITION BY a ORDER BY b RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as sum_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: Complex expressions in window functions not fully supported + // Falls back to Spark Project operator + ignore("window: complex expression in window function") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c * 2 + 1) OVER (PARTITION BY a ORDER BY b) as sum_expr + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: Window function with WHERE clause not supported + // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same" + ignore("window: window function with WHERE clause") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + SUM(c) OVER (PARTITION BY a ORDER BY b) as sum_c + FROM window_test + WHERE a > 0 + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: Window function with GROUP BY not fully supported + // Falls back to Spark Project and Window operators + ignore("window: window function with GROUP BY") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, SUM(c) as total_c, + RANK() OVER (ORDER BY SUM(c) DESC) as rnk + FROM window_test + GROUP BY a, b + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: ROWS BETWEEN with negative offset produces incorrect results + // Returns wrong values for avg_c calculation + ignore("window: ROWS BETWEEN with negative offset") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + AVG(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN 3 PRECEDING AND 1 PRECEDING) as avg_c + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + + // TODO: All ranking functions together produce incorrect row_num values + // Ordering issue causes row numbers to be swapped for rows with same partition/order values + ignore("window: all ranking functions together") { + withTempDir { dir => + (0 until 30) + .map(i => (i % 3, i % 5, i)) + .toDF("a", "b", "c") + .repartition(3) + .write + .mode("overwrite") + .parquet(dir.toString) + + spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") + val df = sql(""" + SELECT a, b, c, + ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) as row_num, + RANK() OVER (PARTITION BY a ORDER BY b) as rnk, + DENSE_RANK() OVER (PARTITION BY a ORDER BY b) as dense_rnk, + PERCENT_RANK() OVER (PARTITION BY a ORDER BY b) as pct_rnk, + CUME_DIST() OVER (PARTITION BY a ORDER BY b) as cume_dist, + NTILE(3) OVER (PARTITION BY a ORDER BY b) as ntile_3 + FROM window_test + """) + checkSparkAnswerAndOperator(df) + } + } + +}