Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ object CometConf extends ShimCometConf {
createExecEnabledConfig("union", defaultValue = true)
val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("expand", defaultValue = true)
val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("explode", defaultValue = true)
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
Expand Down
47 changes: 42 additions & 5 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index d3544881af1..fbe1c4b9a87 100644
index d3544881af1..07d1ed97925 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -1297,20 +1297,22 @@ index 4b3d3a4b805..56e1e0e6f16 100644

setupTestData()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
index 9e9d717db3b..c1a7caf56e0 100644
index 9e9d717db3b..ec73082f458 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
@@ -17,7 +17,8 @@
@@ -17,7 +17,10 @@

package org.apache.spark.sql.execution

-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.comet.CometConf
+
+import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row}
+import org.apache.spark.sql.comet.CometProjectExec
import org.apache.spark.sql.connector.SimpleWritableDataSource
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.internal.SQLConf
@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase
private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
withClue(df.queryExecution) {
val plan = df.queryExecution.executedPlan
Expand All @@ -1322,7 +1324,7 @@ index 9e9d717db3b..c1a7caf56e0 100644
assert(actual == expected)
}
}
@@ -112,7 +116,8 @@ abstract class RemoveRedundantProjectsSuiteBase
@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase
assertProjectExec(query, 1, 3)
}

Expand All @@ -1332,6 +1334,41 @@ index 9e9d717db3b..c1a7caf56e0 100644
val query = "select * from (select key, a, c, b from testView) as t1 join " +
"(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50"
assertProjectExec(query, 2, 2)
@@ -134,12 +141,21 @@ abstract class RemoveRedundantProjectsSuiteBase
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
df.collect()
val plan = df.queryExecution.executedPlan
- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length
+ val numProjects = collectWithSubqueries(plan) {
+ case p: ProjectExec => p
+ case p: CometProjectExec => p
+ }.length
+
+ // Comet-specific change to get original Spark plan before applying
+ // a transformation to add a new ProjectExec
+ var sparkPlan: SparkPlan = null
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
+ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
+ df.collect()
+ sparkPlan = df.queryExecution.executedPlan
+ }

- // Create a new plan that reverse the GenerateExec output and add a new ProjectExec between
- // GenerateExec and its child. This is to test if the ProjectExec is removed, the output of
- // the query will be incorrect.
- val newPlan = stripAQEPlan(plan) transform {
+ val newPlan = stripAQEPlan(sparkPlan) transform {
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
g.copy(requiredChildOutput = requiredChildOutput.reverse,
child = ProjectExec(requiredChildOutput.reverse, child))
@@ -151,6 +167,7 @@ abstract class RemoveRedundantProjectsSuiteBase
// The manually added ProjectExec node shouldn't be removed.
assert(collectWithSubqueries(newExecutedPlan) {
case p: ProjectExec => p
+ case p: CometProjectExec => p
}.size == numProjects + 1)

// Check the original plan's output and the new plan's output are the same.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
index 30ce940b032..0d3f6c6c934 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
Expand Down
48 changes: 44 additions & 4 deletions dev/diffs/3.5.7.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index a0e25ce4d8d..7db86212507 100644
index a0e25ce4d8d..29d3b93f994 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -1274,18 +1274,21 @@ index de24b8c82b0..1f835481290 100644

setupTestData()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
index 9e9d717db3b..91a4f9a38d5 100644
index 9e9d717db3b..73de2b84938 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
@@ -18,6 +18,7 @@
@@ -17,7 +17,10 @@

package org.apache.spark.sql.execution

+import org.apache.comet.CometConf
+
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.comet.CometProjectExec
import org.apache.spark.sql.connector.SimpleWritableDataSource
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.internal.SQLConf
@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase
private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
withClue(df.queryExecution) {
val plan = df.queryExecution.executedPlan
Expand All @@ -1297,6 +1300,43 @@ index 9e9d717db3b..91a4f9a38d5 100644
assert(actual == expected)
}
}
@@ -134,12 +140,26 @@ abstract class RemoveRedundantProjectsSuiteBase
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
df.collect()
val plan = df.queryExecution.executedPlan
- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length
+
+ val numProjects = collectWithSubqueries(plan) {
+ case p: ProjectExec => p
+ case p: CometProjectExec => p
+ }.length

// Create a new plan that reverse the GenerateExec output and add a new ProjectExec between
// GenerateExec and its child. This is to test if the ProjectExec is removed, the output of
// the query will be incorrect.
- val newPlan = stripAQEPlan(plan) transform {
+
+ // Comet-specific change to get original Spark plan before applying
+ // a transformation to add a new ProjectExec
+ var sparkPlan: SparkPlan = null
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
+ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
+ df.collect()
+ sparkPlan = df.queryExecution.executedPlan
+ }
+
+ val newPlan = stripAQEPlan(sparkPlan) transform {
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
g.copy(requiredChildOutput = requiredChildOutput.reverse,
child = ProjectExec(requiredChildOutput.reverse, child))
@@ -151,6 +171,7 @@ abstract class RemoveRedundantProjectsSuiteBase
// The manually added ProjectExec node shouldn't be removed.
assert(collectWithSubqueries(newExecutedPlan) {
case p: ProjectExec => p
+ case p: CometProjectExec => p
}.size == numProjects + 1)

// Check the original plan's output and the new plan's output are the same.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
index 005e764cc30..92ec088efab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
Expand Down
48 changes: 43 additions & 5 deletions dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index 22922143fc3..3c1f5d381ee 100644
index 22922143fc3..477d4ec4194 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -1701,20 +1701,22 @@ index 47d5ff67b84..8dc8f65d4b1 100644
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
index b5bac8079c4..a3731888e12 100644
index b5bac8079c4..9420dbdb936 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
@@ -17,7 +17,8 @@
@@ -17,7 +17,10 @@

package org.apache.spark.sql.execution

-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.comet.CometConf
+
+import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row}
+import org.apache.spark.sql.comet.CometProjectExec
import org.apache.spark.sql.connector.SimpleWritableDataSource
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.internal.SQLConf
@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase
private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
withClue(df.queryExecution) {
val plan = df.queryExecution.executedPlan
Expand All @@ -1726,7 +1728,7 @@ index b5bac8079c4..a3731888e12 100644
assert(actual == expected)
}
}
@@ -112,7 +116,8 @@ abstract class RemoveRedundantProjectsSuiteBase
@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase
assertProjectExec(query, 1, 3)
}

Expand All @@ -1736,6 +1738,42 @@ index b5bac8079c4..a3731888e12 100644
val query = "select * from (select key, a, c, b from testView) as t1 join " +
"(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50"
assertProjectExec(query, 2, 2)
@@ -134,12 +141,25 @@ abstract class RemoveRedundantProjectsSuiteBase
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
df.collect()
val plan = df.queryExecution.executedPlan
- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length
+ val numProjects = collectWithSubqueries(plan) {
+ case p: ProjectExec => p
+ case p: CometProjectExec => p
+ }.length

// Create a new plan that reverse the GenerateExec output and add a new ProjectExec between
// GenerateExec and its child. This is to test if the ProjectExec is removed, the output of
// the query will be incorrect.
- val newPlan = stripAQEPlan(plan) transform {
+
+ // Comet-specific change to get original Spark plan before applying
+ // a transformation to add a new ProjectExec
+ var sparkPlan: SparkPlan = null
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
+ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
+ df.collect()
+ sparkPlan = df.queryExecution.executedPlan
+ }
+
+ val newPlan = stripAQEPlan(sparkPlan) transform {
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
g.copy(requiredChildOutput = requiredChildOutput.reverse,
child = ProjectExec(requiredChildOutput.reverse, child))
@@ -151,6 +171,7 @@ abstract class RemoveRedundantProjectsSuiteBase
// The manually added ProjectExec node shouldn't be removed.
assert(collectWithSubqueries(newExecutedPlan) {
case p: ProjectExec => p
+ case p: CometProjectExec => p
}.size == numProjects + 1)

// Check the original plan's output and the new plan's output are the same.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
index 005e764cc30..92ec088efab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true |
| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true |
| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true |
| `spark.comet.exec.explode.enabled` | Whether to enable explode by default. | true |
| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true |
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ not supported by Comet will fall back to regular Spark execution.
| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
| GenerateExec | Yes | Supports `explode` generator only. |
| GlobalLimitExec | Yes | |
| HashAggregateExec | Yes | |
| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. |
Expand Down
Loading
Loading