Skip to content

Commit 3f375c8

Browse files
carsonwangcloud-fan
authored andcommitted
[SPARK-28339][SQL] Rename Spark SQL adaptive execution configuration name
## What changes were proposed in this pull request? The new adaptive execution framework introduced configuration `spark.sql.runtime.reoptimization.enabled`. We now rename it back to `spark.sql.adaptive.enabled` as the umbrella configuration for adaptive execution. ## How was this patch tested? Existing tests. Closes apache#25102 from carsonwang/renameAE. Authored-by: Carson Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 92e051c commit 3f375c8

File tree

8 files changed

+26
-35
lines changed

8 files changed

+26
-35
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,6 @@ object SQLConf {
300300
.bytesConf(ByteUnit.BYTE)
301301
.createWithDefault(64 * 1024 * 1024)
302302

303-
val RUNTIME_REOPTIMIZATION_ENABLED =
304-
buildConf("spark.sql.runtime.reoptimization.enabled")
305-
.doc("When true, enable runtime query re-optimization.")
306-
.booleanConf
307-
.createWithDefault(false)
308-
309303
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
310304
.doc("When true, enable adaptive query execution.")
311305
.booleanConf
@@ -1953,10 +1947,7 @@ class SQLConf extends Serializable with Logging {
19531947
def targetPostShuffleInputSize: Long =
19541948
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
19551949

1956-
def runtimeReoptimizationEnabled: Boolean = getConf(RUNTIME_REOPTIMIZATION_ENABLED)
1957-
1958-
def adaptiveExecutionEnabled: Boolean =
1959-
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)
1950+
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
19601951

19611952
def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
19621953

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class QueryExecution(
7676
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
7777
SparkSession.setActiveSession(sparkSession)
7878
// Runtime re-optimization requires a unique instance of every node in the logical plan.
79-
val logicalPlan = if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
79+
val logicalPlan = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
8080
optimizedPlan.clone()
8181
} else {
8282
optimizedPlan

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan
4444

4545
override def apply(plan: SparkPlan): SparkPlan = plan match {
4646
case _: ExecutedCommandExec => plan
47-
case _ if conf.runtimeReoptimizationEnabled && supportAdaptive(plan) =>
47+
case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) =>
4848
try {
4949
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall
5050
// back to non-adaptive mode if adaptive execution is supported in any of the sub-queries.
@@ -57,13 +57,13 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan
5757
AdaptiveSparkPlanExec(newPlan, session, subqueryMap, stageCache)
5858
} catch {
5959
case SubqueryAdaptiveNotSupportedException(subquery) =>
60-
logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} is enabled " +
60+
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
6161
s"but is not supported for sub-query: $subquery.")
6262
plan
6363
}
6464
case _ =>
65-
if (conf.runtimeReoptimizationEnabled) {
66-
logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} is enabled " +
65+
if (conf.adaptiveExecutionEnabled) {
66+
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
6767
s"but is not supported for query: $plan.")
6868
}
6969
plan

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
3737
*/
3838
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
3939
private def defaultNumPreShufflePartitions: Int =
40-
if (conf.runtimeReoptimizationEnabled) {
40+
if (conf.adaptiveExecutionEnabled) {
4141
conf.maxNumPostShufflePartitions
4242
} else {
4343
conf.numShufflePartitions

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ abstract class StreamExecution(
320320
logicalPlan
321321

322322
// Adaptive execution can change num shuffle partitions, disallow
323-
sparkSessionForStream.conf.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "false")
323+
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
324324
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
325325
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
326326
offsetSeqMetadata = OffsetSeqMetadata(

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
255255

256256
val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
257257

258-
if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
259-
logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} " +
258+
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
259+
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
260260
"is not supported in streaming DataFrames/Datasets and will be disabled.")
261261
}
262262

sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
275275
.setAppName("test")
276276
.set(UI_ENABLED, false)
277277
.set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
278-
.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "true")
278+
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
279279
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
280280
.set(
281281
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
3535
val planBefore = dfAdaptive.queryExecution.executedPlan
3636
assert(planBefore.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=false)"))
3737
val result = dfAdaptive.collect()
38-
withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "false") {
38+
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
3939
val df = sql(query)
4040
QueryTest.sameRows(result.toSeq, df.collect().toSeq)
4141
}
@@ -82,7 +82,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
8282

8383
test("Change merge join to broadcast join") {
8484
withSQLConf(
85-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
85+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
8686
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
8787
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
8888
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
@@ -95,7 +95,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
9595

9696
test("Change merge join to broadcast join and reduce number of shuffle partitions") {
9797
withSQLConf(
98-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
98+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
9999
SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "true",
100100
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
101101
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "150") {
@@ -119,7 +119,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
119119

120120
test("Scalar subquery") {
121121
withSQLConf(
122-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
122+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
123123
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
124124
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
125125
"SELECT * FROM testData join testData2 ON key = a " +
@@ -133,7 +133,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
133133

134134
test("Scalar subquery in later stages") {
135135
withSQLConf(
136-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
136+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
137137
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
138138
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
139139
"SELECT * FROM testData join testData2 ON key = a " +
@@ -147,7 +147,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
147147

148148
test("multiple joins") {
149149
withSQLConf(
150-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
150+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
151151
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
152152
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
153153
"""
@@ -168,7 +168,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
168168

169169
test("multiple joins with aggregate") {
170170
withSQLConf(
171-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
171+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
172172
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
173173
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
174174
"""
@@ -191,7 +191,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
191191

192192
test("multiple joins with aggregate 2") {
193193
withSQLConf(
194-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
194+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
195195
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") {
196196
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
197197
"""
@@ -214,7 +214,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
214214

215215
test("Exchange reuse") {
216216
withSQLConf(
217-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
217+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
218218
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
219219
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
220220
"SELECT value FROM testData join testData2 ON key = a " +
@@ -230,7 +230,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
230230

231231
test("Exchange reuse with subqueries") {
232232
withSQLConf(
233-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
233+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
234234
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
235235
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
236236
"SELECT a FROM testData join testData2 ON key = a " +
@@ -246,7 +246,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
246246

247247
test("Exchange reuse across subqueries") {
248248
withSQLConf(
249-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
249+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
250250
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
251251
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") {
252252
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
@@ -266,7 +266,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
266266

267267
test("Subquery reuse") {
268268
withSQLConf(
269-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
269+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
270270
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
271271
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
272272
"SELECT a FROM testData join testData2 ON key = a " +
@@ -285,7 +285,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
285285

286286
test("Broadcast exchange reuse across subqueries") {
287287
withSQLConf(
288-
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
288+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
289289
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000",
290290
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") {
291291
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
@@ -307,7 +307,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
307307
}
308308

309309
test("Union/Except/Intersect queries") {
310-
withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true") {
310+
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
311311
runAdaptiveAndVerifyResult(
312312
"""
313313
|SELECT * FROM testData
@@ -322,7 +322,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
322322
}
323323

324324
test("Subquery de-correlation in Union queries") {
325-
withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true") {
325+
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
326326
withTempView("a", "b") {
327327
Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a")
328328
Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("b")

0 commit comments

Comments
 (0)