Skip to content

Commit 0bda9d2

Browse files
authored
feat: Add support for explode and explode_outer for array inputs (#2836)
1 parent 769d76e commit 0bda9d2

File tree

15 files changed

+621
-17
lines changed

15 files changed

+621
-17
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ jobs:
132132
org.apache.comet.exec.CometAggregateSuite
133133
org.apache.comet.exec.CometExec3_4PlusSuite
134134
org.apache.comet.exec.CometExecSuite
135+
org.apache.comet.exec.CometGenerateExecSuite
135136
org.apache.comet.exec.CometWindowExecSuite
136137
org.apache.comet.exec.CometJoinSuite
137138
org.apache.comet.CometNativeSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ jobs:
9797
org.apache.comet.exec.CometAggregateSuite
9898
org.apache.comet.exec.CometExec3_4PlusSuite
9999
org.apache.comet.exec.CometExecSuite
100+
org.apache.comet.exec.CometGenerateExecSuite
100101
org.apache.comet.exec.CometWindowExecSuite
101102
org.apache.comet.exec.CometJoinSuite
102103
org.apache.comet.CometNativeSuite

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ object CometConf extends ShimCometConf {
270270
createExecEnabledConfig("union", defaultValue = true)
271271
val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] =
272272
createExecEnabledConfig("expand", defaultValue = true)
273+
val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] =
274+
createExecEnabledConfig("explode", defaultValue = true)
273275
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
274276
createExecEnabledConfig("window", defaultValue = true)
275277
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =

dev/diffs/3.4.3.diff

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index d3544881af1..fbe1c4b9a87 100644
2+
index d3544881af1..07d1ed97925 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -148,6 +148,8 @@
@@ -1297,20 +1297,22 @@ index 4b3d3a4b805..56e1e0e6f16 100644
12971297

12981298
setupTestData()
12991299
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
1300-
index 9e9d717db3b..c1a7caf56e0 100644
1300+
index 9e9d717db3b..ec73082f458 100644
13011301
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
13021302
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
1303-
@@ -17,7 +17,8 @@
1303+
@@ -17,7 +17,10 @@
13041304

13051305
package org.apache.spark.sql.execution
13061306

13071307
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
1308+
+import org.apache.comet.CometConf
1309+
+
13081310
+import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row}
13091311
+import org.apache.spark.sql.comet.CometProjectExec
13101312
import org.apache.spark.sql.connector.SimpleWritableDataSource
13111313
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
13121314
import org.apache.spark.sql.internal.SQLConf
1313-
@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
1315+
@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase
13141316
private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
13151317
withClue(df.queryExecution) {
13161318
val plan = df.queryExecution.executedPlan
@@ -1322,7 +1324,7 @@ index 9e9d717db3b..c1a7caf56e0 100644
13221324
assert(actual == expected)
13231325
}
13241326
}
1325-
@@ -112,7 +116,8 @@ abstract class RemoveRedundantProjectsSuiteBase
1327+
@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase
13261328
assertProjectExec(query, 1, 3)
13271329
}
13281330

@@ -1332,6 +1334,41 @@ index 9e9d717db3b..c1a7caf56e0 100644
13321334
val query = "select * from (select key, a, c, b from testView) as t1 join " +
13331335
"(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50"
13341336
assertProjectExec(query, 2, 2)
1337+
@@ -134,12 +141,21 @@ abstract class RemoveRedundantProjectsSuiteBase
1338+
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
1339+
df.collect()
1340+
val plan = df.queryExecution.executedPlan
1341+
- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length
1342+
+ val numProjects = collectWithSubqueries(plan) {
1343+
+ case p: ProjectExec => p
1344+
+ case p: CometProjectExec => p
1345+
+ }.length
1346+
+
1347+
+ // Comet-specific change to get original Spark plan before applying
1348+
+ // a transformation to add a new ProjectExec
1349+
+ var sparkPlan: SparkPlan = null
1350+
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
1351+
+ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
1352+
+ df.collect()
1353+
+ sparkPlan = df.queryExecution.executedPlan
1354+
+ }
1355+
1356+
- // Create a new plan that reverse the GenerateExec output and add a new ProjectExec between
1357+
- // GenerateExec and its child. This is to test if the ProjectExec is removed, the output of
1358+
- // the query will be incorrect.
1359+
- val newPlan = stripAQEPlan(plan) transform {
1360+
+ val newPlan = stripAQEPlan(sparkPlan) transform {
1361+
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
1362+
g.copy(requiredChildOutput = requiredChildOutput.reverse,
1363+
child = ProjectExec(requiredChildOutput.reverse, child))
1364+
@@ -151,6 +167,7 @@ abstract class RemoveRedundantProjectsSuiteBase
1365+
// The manually added ProjectExec node shouldn't be removed.
1366+
assert(collectWithSubqueries(newExecutedPlan) {
1367+
case p: ProjectExec => p
1368+
+ case p: CometProjectExec => p
1369+
}.size == numProjects + 1)
1370+
1371+
// Check the original plan's output and the new plan's output are the same.
13351372
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
13361373
index 30ce940b032..0d3f6c6c934 100644
13371374
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala

dev/diffs/3.5.7.diff

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index a0e25ce4d8d..7db86212507 100644
2+
index a0e25ce4d8d..29d3b93f994 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -152,6 +152,8 @@
@@ -1274,18 +1274,21 @@ index de24b8c82b0..1f835481290 100644
12741274

12751275
setupTestData()
12761276
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
1277-
index 9e9d717db3b..91a4f9a38d5 100644
1277+
index 9e9d717db3b..73de2b84938 100644
12781278
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
12791279
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
1280-
@@ -18,6 +18,7 @@
1280+
@@ -17,7 +17,10 @@
1281+
12811282
package org.apache.spark.sql.execution
12821283

1284+
+import org.apache.comet.CometConf
1285+
+
12831286
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
12841287
+import org.apache.spark.sql.comet.CometProjectExec
12851288
import org.apache.spark.sql.connector.SimpleWritableDataSource
12861289
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
12871290
import org.apache.spark.sql.internal.SQLConf
1288-
@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
1291+
@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase
12891292
private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
12901293
withClue(df.queryExecution) {
12911294
val plan = df.queryExecution.executedPlan
@@ -1297,6 +1300,43 @@ index 9e9d717db3b..91a4f9a38d5 100644
12971300
assert(actual == expected)
12981301
}
12991302
}
1303+
@@ -134,12 +140,26 @@ abstract class RemoveRedundantProjectsSuiteBase
1304+
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
1305+
df.collect()
1306+
val plan = df.queryExecution.executedPlan
1307+
- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length
1308+
+
1309+
+ val numProjects = collectWithSubqueries(plan) {
1310+
+ case p: ProjectExec => p
1311+
+ case p: CometProjectExec => p
1312+
+ }.length
1313+
1314+
// Create a new plan that reverse the GenerateExec output and add a new ProjectExec between
1315+
// GenerateExec and its child. This is to test if the ProjectExec is removed, the output of
1316+
// the query will be incorrect.
1317+
- val newPlan = stripAQEPlan(plan) transform {
1318+
+
1319+
+ // Comet-specific change to get original Spark plan before applying
1320+
+ // a transformation to add a new ProjectExec
1321+
+ var sparkPlan: SparkPlan = null
1322+
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
1323+
+ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
1324+
+ df.collect()
1325+
+ sparkPlan = df.queryExecution.executedPlan
1326+
+ }
1327+
+
1328+
+ val newPlan = stripAQEPlan(sparkPlan) transform {
1329+
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
1330+
g.copy(requiredChildOutput = requiredChildOutput.reverse,
1331+
child = ProjectExec(requiredChildOutput.reverse, child))
1332+
@@ -151,6 +171,7 @@ abstract class RemoveRedundantProjectsSuiteBase
1333+
// The manually added ProjectExec node shouldn't be removed.
1334+
assert(collectWithSubqueries(newExecutedPlan) {
1335+
case p: ProjectExec => p
1336+
+ case p: CometProjectExec => p
1337+
}.size == numProjects + 1)
1338+
1339+
// Check the original plan's output and the new plan's output are the same.
13001340
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
13011341
index 005e764cc30..92ec088efab 100644
13021342
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala

dev/diffs/4.0.1.diff

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index 22922143fc3..3c1f5d381ee 100644
2+
index 22922143fc3..477d4ec4194 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -148,6 +148,8 @@
@@ -1701,20 +1701,22 @@ index 47d5ff67b84..8dc8f65d4b1 100644
17011701
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
17021702
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
17031703
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
1704-
index b5bac8079c4..a3731888e12 100644
1704+
index b5bac8079c4..9420dbdb936 100644
17051705
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
17061706
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
1707-
@@ -17,7 +17,8 @@
1707+
@@ -17,7 +17,10 @@
17081708

17091709
package org.apache.spark.sql.execution
17101710

17111711
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
1712+
+import org.apache.comet.CometConf
1713+
+
17121714
+import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row}
17131715
+import org.apache.spark.sql.comet.CometProjectExec
17141716
import org.apache.spark.sql.connector.SimpleWritableDataSource
17151717
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
17161718
import org.apache.spark.sql.internal.SQLConf
1717-
@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
1719+
@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase
17181720
private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
17191721
withClue(df.queryExecution) {
17201722
val plan = df.queryExecution.executedPlan
@@ -1726,7 +1728,7 @@ index b5bac8079c4..a3731888e12 100644
17261728
assert(actual == expected)
17271729
}
17281730
}
1729-
@@ -112,7 +116,8 @@ abstract class RemoveRedundantProjectsSuiteBase
1731+
@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase
17301732
assertProjectExec(query, 1, 3)
17311733
}
17321734

@@ -1736,6 +1738,42 @@ index b5bac8079c4..a3731888e12 100644
17361738
val query = "select * from (select key, a, c, b from testView) as t1 join " +
17371739
"(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50"
17381740
assertProjectExec(query, 2, 2)
1741+
@@ -134,12 +141,25 @@ abstract class RemoveRedundantProjectsSuiteBase
1742+
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
1743+
df.collect()
1744+
val plan = df.queryExecution.executedPlan
1745+
- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length
1746+
+ val numProjects = collectWithSubqueries(plan) {
1747+
+ case p: ProjectExec => p
1748+
+ case p: CometProjectExec => p
1749+
+ }.length
1750+
1751+
// Create a new plan that reverse the GenerateExec output and add a new ProjectExec between
1752+
// GenerateExec and its child. This is to test if the ProjectExec is removed, the output of
1753+
// the query will be incorrect.
1754+
- val newPlan = stripAQEPlan(plan) transform {
1755+
+
1756+
+ // Comet-specific change to get original Spark plan before applying
1757+
+ // a transformation to add a new ProjectExec
1758+
+ var sparkPlan: SparkPlan = null
1759+
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
1760+
+ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
1761+
+ df.collect()
1762+
+ sparkPlan = df.queryExecution.executedPlan
1763+
+ }
1764+
+
1765+
+ val newPlan = stripAQEPlan(sparkPlan) transform {
1766+
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
1767+
g.copy(requiredChildOutput = requiredChildOutput.reverse,
1768+
child = ProjectExec(requiredChildOutput.reverse, child))
1769+
@@ -151,6 +171,7 @@ abstract class RemoveRedundantProjectsSuiteBase
1770+
// The manually added ProjectExec node shouldn't be removed.
1771+
assert(collectWithSubqueries(newExecutedPlan) {
1772+
case p: ProjectExec => p
1773+
+ case p: CometProjectExec => p
1774+
}.size == numProjects + 1)
1775+
1776+
// Check the original plan's output and the new plan's output are the same.
17391777
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
17401778
index 005e764cc30..92ec088efab 100644
17411779
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ These settings can be used to determine which parts of the plan are accelerated
162162
| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true |
163163
| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true |
164164
| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true |
165+
| `spark.comet.exec.explode.enabled` | Whether to enable explode by default. | true |
165166
| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true |
166167
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
167168
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |

docs/source/user-guide/latest/operators.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ not supported by Comet will fall back to regular Spark execution.
3030
| ExpandExec | Yes | |
3131
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
3232
| FilterExec | Yes | |
33+
| GenerateExec | Yes | Supports `explode` generator only. |
3334
| GlobalLimitExec | Yes | |
3435
| HashAggregateExec | Yes | |
3536
| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. |

0 commit comments

Comments
 (0)