Skip to content

Commit 921ef39

Browse files
committed
chore: Fallback to Spark for Windows
1 parent 941c300 commit 921ef39

File tree

6 files changed

+100
-108
lines changed

6 files changed

+100
-108
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ object CometConf extends ShimCometConf {
251251
val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] =
252252
createExecEnabledConfig("expand", defaultValue = true)
253253
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
254-
createExecEnabledConfig("window", defaultValue = true)
254+
createExecEnabledConfig("window", defaultValue = false)
255255
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
256256
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
257257

docs/source/user-guide/latest/configs.md

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -139,23 +139,23 @@ These settings can be used to determine which parts of the plan are accelerated
139139
<!--BEGIN:CONFIG_TABLE[enable_exec]-->
140140
| Config | Description | Default Value |
141141
|--------|-------------|---------------|
142-
| `spark.comet.exec.aggregate.enabled` | Whether to enable aggregate by default. | true |
143-
| `spark.comet.exec.broadcastExchange.enabled` | Whether to enable broadcastExchange by default. | true |
144-
| `spark.comet.exec.broadcastHashJoin.enabled` | Whether to enable broadcastHashJoin by default. | true |
145-
| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true |
146-
| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true |
147-
| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true |
148-
| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true |
149-
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
150-
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |
151-
| `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true |
152-
| `spark.comet.exec.project.enabled` | Whether to enable project by default. | true |
153-
| `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true |
154-
| `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true |
155-
| `spark.comet.exec.sortMergeJoinWithJoinFilter.enabled` | Experimental support for Sort Merge Join with filter | false |
156-
| `spark.comet.exec.takeOrderedAndProject.enabled` | Whether to enable takeOrderedAndProject by default. | true |
157-
| `spark.comet.exec.union.enabled` | Whether to enable union by default. | true |
158-
| `spark.comet.exec.window.enabled` | Whether to enable window by default. | true |
142+
| `spark.comet.exec.aggregate.enabled` | Whether to enable aggregate by default. | true |
143+
| `spark.comet.exec.broadcastExchange.enabled` | Whether to enable broadcastExchange by default. | true |
144+
| `spark.comet.exec.broadcastHashJoin.enabled` | Whether to enable broadcastHashJoin by default. | true |
145+
| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true |
146+
| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true |
147+
| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true |
148+
| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true |
149+
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
150+
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |
151+
| `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true |
152+
| `spark.comet.exec.project.enabled` | Whether to enable project by default. | true |
153+
| `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true |
154+
| `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true |
155+
| `spark.comet.exec.sortMergeJoinWithJoinFilter.enabled` | Experimental support for Sort Merge Join with filter | false |
156+
| `spark.comet.exec.takeOrderedAndProject.enabled` | Whether to enable takeOrderedAndProject by default. | true |
157+
| `spark.comet.exec.union.enabled` | Whether to enable union by default. | true |
158+
| `spark.comet.exec.window.enabled` | Whether to enable window by default. | false |
159159
<!--END:CONFIG_TABLE-->
160160

161161
## Enabling or Disabling Individual Scalar Expressions

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,9 @@ import org.apache.hadoop.fs.Path
3030
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3131
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, TruncDate, TruncTimestamp}
3232
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
33-
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec}
33+
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec}
3434
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec}
3535
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
36-
import org.apache.spark.sql.expressions.Window
3736
import org.apache.spark.sql.functions._
3837
import org.apache.spark.sql.internal.SQLConf
3938
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
@@ -3097,27 +3096,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
30973096
}
30983097
}
30993098

3100-
test("window query with rangeBetween") {
3101-
3102-
// values are int
3103-
val df = Seq(1, 2, 4, 3, 2, 1).toDF("value")
3104-
val window = Window.orderBy($"value".desc)
3105-
3106-
// ranges are long
3107-
val df2 = df.select(
3108-
$"value",
3109-
sum($"value").over(window.rangeBetween(Window.unboundedPreceding, 1L)),
3110-
sum($"value").over(window.rangeBetween(1L, Window.unboundedFollowing)))
3111-
3112-
// Comet does not support RANGE BETWEEN
3113-
// https://github.com/apache/datafusion-comet/issues/1246
3114-
val (_, cometPlan) = checkSparkAnswer(df2)
3115-
val cometWindowExecs = collect(cometPlan) { case w: CometWindowExec =>
3116-
w
3117-
}
3118-
assert(cometWindowExecs.isEmpty)
3119-
}
3120-
31213099
test("vectorized reader: missing all struct fields") {
31223100
Seq(true, false).foreach { offheapEnabled =>
31233101
withSQLConf(

spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path
2525
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
2626
import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
2727
import org.apache.spark.sql.comet.CometHashAggregateExec
28-
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
2928
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3029
import org.apache.spark.sql.functions.{avg, count_distinct, sum}
3130
import org.apache.spark.sql.internal.SQLConf
@@ -94,58 +93,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
9493
}
9594
}
9695

97-
test("lead/lag should return the default value if the offset row does not exist") {
98-
withSQLConf(
99-
CometConf.COMET_ENABLED.key -> "true",
100-
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
101-
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
102-
checkSparkAnswer(sql("""
103-
|SELECT
104-
| lag(123, 100, 321) OVER (ORDER BY id) as lag,
105-
| lead(123, 100, 321) OVER (ORDER BY id) as lead
106-
|FROM (SELECT 1 as id) tmp
107-
""".stripMargin))
108-
109-
checkSparkAnswer(sql("""
110-
|SELECT
111-
| lag(123, 100, a) OVER (ORDER BY id) as lag,
112-
| lead(123, 100, a) OVER (ORDER BY id) as lead
113-
|FROM (SELECT 1 as id, 2 as a) tmp
114-
""".stripMargin))
115-
}
116-
}
117-
118-
// based on Spark's SQLWindowFunctionSuite test of the same name
119-
test("window function: partition and order expressions") {
120-
for (shuffleMode <- Seq("auto", "native", "jvm")) {
121-
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
122-
val df =
123-
Seq((1, "a", 5), (2, "a", 6), (3, "b", 7), (4, "b", 8), (5, "c", 9), (6, "c", 10)).toDF(
124-
"month",
125-
"area",
126-
"product")
127-
df.createOrReplaceTempView("windowData")
128-
val df2 = sql("""
129-
|select month, area, product, sum(product + 1) over (partition by 1 order by 2)
130-
|from windowData
131-
""".stripMargin)
132-
checkSparkAnswer(df2)
133-
val cometShuffles = collect(df2.queryExecution.executedPlan) {
134-
case _: CometShuffleExchangeExec => true
135-
}
136-
if (shuffleMode == "jvm" || shuffleMode == "auto") {
137-
assert(cometShuffles.length == 1)
138-
} else {
139-
// we fall back to Spark for shuffle because we do not support
140-
// native shuffle with a LocalTableScan input, and we do not fall
141-
// back to Comet columnar shuffle due to
142-
// https://github.com/apache/datafusion-comet/issues/1248
143-
assert(cometShuffles.isEmpty)
144-
}
145-
}
146-
}
147-
}
148-
14996
test("multiple column distinct count") {
15097
withSQLConf(
15198
CometConf.COMET_ENABLED.key -> "true",

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1697,7 +1697,9 @@ class CometExecSuite extends CometTestBase {
16971697

16981698
test("TakeOrderedAndProjectExec") {
16991699
Seq("true", "false").foreach(aqeEnabled =>
1700-
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
1700+
withSQLConf(
1701+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
1702+
CometConf.COMET_EXEC_WINDOW_ENABLED.key -> "true") {
17011703
withTable("t1") {
17021704
val numRows = 10
17031705
spark

spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import org.scalatest.Tag
2424

2525
import org.apache.hadoop.fs.Path
2626
import org.apache.spark.sql.{CometTestBase, Row}
27+
import org.apache.spark.sql.comet.CometWindowExec
28+
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
2729
import org.apache.spark.sql.expressions.Window
28-
import org.apache.spark.sql.functions.{count, lead}
30+
import org.apache.spark.sql.functions.{count, lead, sum}
2931
import org.apache.spark.sql.internal.SQLConf
3032

3133
import org.apache.comet.CometConf
@@ -39,12 +41,86 @@ class CometWindowExecSuite extends CometTestBase {
3941
super.test(testName, testTags: _*) {
4042
withSQLConf(
4143
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
44+
CometConf.COMET_EXEC_WINDOW_ENABLED.key -> "true",
4245
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) {
4346
testFun
4447
}
4548
}
4649
}
4750

51+
test("lead/lag should return the default value if the offset row does not exist") {
52+
withSQLConf(
53+
CometConf.COMET_ENABLED.key -> "true",
54+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
55+
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
56+
checkSparkAnswer(sql("""
57+
|SELECT
58+
| lag(123, 100, 321) OVER (ORDER BY id) as lag,
59+
| lead(123, 100, 321) OVER (ORDER BY id) as lead
60+
|FROM (SELECT 1 as id) tmp
61+
""".stripMargin))
62+
63+
checkSparkAnswer(sql("""
64+
|SELECT
65+
| lag(123, 100, a) OVER (ORDER BY id) as lag,
66+
| lead(123, 100, a) OVER (ORDER BY id) as lead
67+
|FROM (SELECT 1 as id, 2 as a) tmp
68+
""".stripMargin))
69+
}
70+
}
71+
72+
test("window query with rangeBetween") {
73+
74+
// values are int
75+
val df = Seq(1, 2, 4, 3, 2, 1).toDF("value")
76+
val window = Window.orderBy($"value".desc)
77+
78+
// ranges are long
79+
val df2 = df.select(
80+
$"value",
81+
sum($"value").over(window.rangeBetween(Window.unboundedPreceding, 1L)),
82+
sum($"value").over(window.rangeBetween(1L, Window.unboundedFollowing)))
83+
84+
// Comet does not support RANGE BETWEEN
85+
// https://github.com/apache/datafusion-comet/issues/1246
86+
val (_, cometPlan) = checkSparkAnswer(df2)
87+
val cometWindowExecs = collect(cometPlan) { case w: CometWindowExec =>
88+
w
89+
}
90+
assert(cometWindowExecs.isEmpty)
91+
}
92+
93+
// based on Spark's SQLWindowFunctionSuite test of the same name
94+
test("window function: partition and order expressions") {
95+
for (shuffleMode <- Seq("auto", "native", "jvm")) {
96+
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
97+
val df =
98+
Seq((1, "a", 5), (2, "a", 6), (3, "b", 7), (4, "b", 8), (5, "c", 9), (6, "c", 10)).toDF(
99+
"month",
100+
"area",
101+
"product")
102+
df.createOrReplaceTempView("windowData")
103+
val df2 = sql("""
104+
|select month, area, product, sum(product + 1) over (partition by 1 order by 2)
105+
|from windowData
106+
""".stripMargin)
107+
checkSparkAnswer(df2)
108+
val cometShuffles = collect(df2.queryExecution.executedPlan) {
109+
case _: CometShuffleExchangeExec => true
110+
}
111+
if (shuffleMode == "jvm" || shuffleMode == "auto") {
112+
assert(cometShuffles.length == 1)
113+
} else {
114+
// we fall back to Spark for shuffle because we do not support
115+
// native shuffle with a LocalTableScan input, and we do not fall
116+
// back to Comet columnar shuffle due to
117+
// https://github.com/apache/datafusion-comet/issues/1248
118+
assert(cometShuffles.isEmpty)
119+
}
120+
}
121+
}
122+
}
123+
48124
test(
49125
"fall back to Spark when the partition spec and order spec are not the same for window function") {
50126
withTempView("test") {
@@ -289,7 +365,7 @@ class CometWindowExecSuite extends CometTestBase {
289365

290366
// TODO: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW produces incorrect results
291367
// Returns wrong cnt values - ordering issue causes swapped values for rows with same partition
292-
ignore("window: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW") {
368+
test("window: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW") {
293369
withTempDir { dir =>
294370
(0 until 30)
295371
.map(i => (i % 3, i % 5, i))
@@ -310,7 +386,6 @@ class CometWindowExecSuite extends CometTestBase {
310386
}
311387

312388
// TODO: SUM with ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING produces incorrect results
313-
// Returns wrong sum_c values - ordering issue causes swapped values for rows with same partition
314389
ignore("window: SUM with ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING") {
315390
withTempDir { dir =>
316391
(0 until 30)
@@ -354,7 +429,6 @@ class CometWindowExecSuite extends CometTestBase {
354429
}
355430

356431
// TODO: SUM with ROWS BETWEEN produces incorrect results
357-
// Returns wrong sum_c values for some rows
358432
ignore("window: SUM with ROWS BETWEEN 2 PRECEDING AND CURRENT ROW") {
359433
withTempDir { dir =>
360434
(0 until 30)
@@ -530,7 +604,6 @@ class CometWindowExecSuite extends CometTestBase {
530604
}
531605

532606
// TODO: LAG produces incorrect results
533-
// Returns wrong lag_c values - ordering issue in results
534607
ignore("window: LAG with default offset") {
535608
withTempDir { dir =>
536609
(0 until 30)
@@ -552,7 +625,6 @@ class CometWindowExecSuite extends CometTestBase {
552625
}
553626

554627
// TODO: LAG with offset 2 produces incorrect results
555-
// Returns wrong lag_c_2 values - ordering issue in results
556628
ignore("window: LAG with offset 2 and default value") {
557629
withTempDir { dir =>
558630
(0 until 30)
@@ -574,7 +646,6 @@ class CometWindowExecSuite extends CometTestBase {
574646
}
575647

576648
// TODO: LEAD produces incorrect results
577-
// Returns wrong lead_c values - ordering issue in results
578649
ignore("window: LEAD with default offset") {
579650
withTempDir { dir =>
580651
(0 until 30)
@@ -596,7 +667,6 @@ class CometWindowExecSuite extends CometTestBase {
596667
}
597668

598669
// TODO: LEAD with offset 2 produces incorrect results
599-
// Returns wrong lead_c_2 values - ordering issue in results
600670
ignore("window: LEAD with offset 2 and default value") {
601671
withTempDir { dir =>
602672
(0 until 30)
@@ -662,7 +732,6 @@ class CometWindowExecSuite extends CometTestBase {
662732
}
663733

664734
// TODO: NTH_VALUE returns incorrect results - produces 0 instead of null for first row,
665-
// and incorrect values for subsequent rows in partition
666735
ignore("window: NTH_VALUE with position 2") {
667736
withTempDir { dir =>
668737
(0 until 30)
@@ -706,7 +775,6 @@ class CometWindowExecSuite extends CometTestBase {
706775
}
707776

708777
// TODO: Multiple window functions with mixed frame types (RowFrame and RangeFrame)
709-
// produces incorrect row_num values - ordering issue in results
710778
ignore("window: multiple window functions in single query") {
711779
withTempDir { dir =>
712780
(0 until 30)
@@ -933,7 +1001,6 @@ class CometWindowExecSuite extends CometTestBase {
9331001
}
9341002

9351003
// TODO: ROWS BETWEEN with negative offset produces incorrect results
936-
// Returns wrong values for avg_c calculation
9371004
ignore("window: ROWS BETWEEN with negative offset") {
9381005
withTempDir { dir =>
9391006
(0 until 30)
@@ -955,7 +1022,6 @@ class CometWindowExecSuite extends CometTestBase {
9551022
}
9561023

9571024
// TODO: All ranking functions together produce incorrect row_num values
958-
// Ordering issue causes row numbers to be swapped for rows with same partition/order values
9591025
ignore("window: all ranking functions together") {
9601026
withTempDir { dir =>
9611027
(0 until 30)
@@ -980,5 +1046,4 @@ class CometWindowExecSuite extends CometTestBase {
9801046
checkSparkAnswerAndOperator(df)
9811047
}
9821048
}
983-
9841049
}

0 commit comments

Comments
 (0)