Skip to content

Commit 337a67f

Browse files
Karuppayya Rajendrankaruppayya
authored andcommitted
[SPARK-52741][SQL] RemoveFiles ShuffleCleanup mode doesnt work with non-adaptive execution
### What changes were proposed in this pull request? Currently, shuffle cleanup only works for adaptive execution plans. Non-adaptive execution plans are not cleaned up. Thing change cleans it. ### Why are the changes needed? - To cleanup shuffle files of non-adaptive query executions - Consistency in behavior between adaptive and non-adaptive shuffle cleanup based on the cleanup mode ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modified existing unit tests to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes #51432 from karuppayya/SPARK-52741. Lead-authored-by: Karuppayya Rajendran <[email protected]> Co-authored-by: Karuppayya Rajendran <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4b72478 commit 337a67f

File tree

2 files changed

+45
-26
lines changed

2 files changed

+45
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PRE
3030
import org.apache.spark.internal.config.Tests.IS_TESTING
3131
import org.apache.spark.sql.classic.SparkSession
3232
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
33+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
3334
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
3435
import org.apache.spark.sql.internal.SQLConf
3536
import org.apache.spark.sql.internal.StaticSQLConf.SQL_EVENT_TRUNCATE_LENGTH
@@ -178,8 +179,11 @@ object SQLExecution extends Logging {
178179
val shuffleIds = queryExecution.executedPlan match {
179180
case ae: AdaptiveSparkPlanExec =>
180181
ae.context.shuffleIds.asScala.keys
181-
case _ =>
182-
Iterable.empty
182+
case nonAdaptivePlan =>
183+
nonAdaptivePlan.collect {
184+
case exec: ShuffleExchangeLike =>
185+
exec.shuffleId
186+
}
183187
}
184188
shuffleIds.foreach { shuffleId =>
185189
queryExecution.shuffleCleanupMode match {

sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -328,36 +328,51 @@ class QueryExecutionSuite extends SharedSparkSession {
328328
}
329329

330330
test("SPARK-47764: Cleanup shuffle dependencies - DoNotCleanup mode") {
331-
val plan = spark.range(100).repartition(10).logicalPlan
332-
val df = Dataset.ofRows(spark, plan, DoNotCleanup)
333-
df.collect()
334-
335-
val blockManager = spark.sparkContext.env.blockManager
336-
assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty)
337-
assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
338-
cleanupShuffles()
331+
Seq(true, false).foreach { adaptiveEnabled => {
332+
withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) {
333+
val plan = spark.range(100).repartition(10).logicalPlan
334+
val df = Dataset.ofRows(spark, plan, DoNotCleanup)
335+
df.collect()
336+
337+
val blockManager = spark.sparkContext.env.blockManager
338+
assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty)
339+
assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
340+
cleanupShuffles()
341+
}
342+
}
343+
}
339344
}
340345

341346
test("SPARK-47764: Cleanup shuffle dependencies - SkipMigration mode") {
342-
val plan = spark.range(100).repartition(10).logicalPlan
343-
val df = Dataset.ofRows(spark, plan, SkipMigration)
344-
df.collect()
345-
346-
val blockManager = spark.sparkContext.env.blockManager
347-
assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
348-
assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
349-
cleanupShuffles()
347+
Seq(true, false).foreach { adaptiveEnabled => {
348+
withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) {
349+
val plan = spark.range(100).repartition(10).logicalPlan
350+
val df = Dataset.ofRows(spark, plan, SkipMigration)
351+
df.collect()
352+
353+
val blockManager = spark.sparkContext.env.blockManager
354+
assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
355+
assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
356+
cleanupShuffles()
357+
}
358+
}
359+
}
350360
}
351361

352362
test("SPARK-47764: Cleanup shuffle dependencies - RemoveShuffleFiles mode") {
353-
val plan = spark.range(100).repartition(10).logicalPlan
354-
val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles)
355-
df.collect()
356-
357-
val blockManager = spark.sparkContext.env.blockManager
358-
assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
359-
assert(blockManager.diskBlockManager.getAllBlocks().isEmpty)
360-
cleanupShuffles()
363+
Seq(true, false).foreach { adaptiveEnabled => {
364+
withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) {
365+
val plan = spark.range(100).repartition(10).logicalPlan
366+
val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles)
367+
df.collect()
368+
369+
val blockManager = spark.sparkContext.env.blockManager
370+
assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
371+
assert(blockManager.diskBlockManager.getAllBlocks().isEmpty)
372+
cleanupShuffles()
373+
}
374+
}
375+
}
361376
}
362377

363378
test("SPARK-35378: Return UnsafeRow in CommandResultExecCheck execute methods") {

0 commit comments

Comments
 (0)