Skip to content

Commit 1dfd2e1

Browse files
committed
Make FallbackStorage cleanup shuffles async
1 parent ffb1624 commit 1dfd2e1

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2375,6 +2375,7 @@ class SparkContext(config: SparkConf) extends Logging {
23752375
}
23762376
}
23772377
Utils.tryLogNonFatalError {
2378+
// not calling cleanUpAsync here as we want cleanup to complete before exiting
23782379
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
23792380
}
23802381
Utils.tryLogNonFatalError {

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.storage
1919

2020
import java.io.DataInputStream
2121
import java.nio.ByteBuffer
22+
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
2223

2324
import scala.concurrent.Future
2425
import scala.reflect.ClassTag
@@ -107,7 +108,7 @@ private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf
107108
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
108109
message match {
109110
case RemoveShuffle(shuffleId) =>
110-
FallbackStorage.cleanUp(conf, hadoopConf, Some(shuffleId))
111+
FallbackStorage.cleanUpAsync(conf, hadoopConf, Some(shuffleId))
111112
case _ => // no-op
112113
}
113114
Future{true.asInstanceOf[T]}
@@ -118,6 +119,23 @@ private[spark] object FallbackStorage extends Logging {
118119
/** We use one block manager id as a place holder. */
119120
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
120121

122+
/** Shuffle data can be cleaned up asynchronously by adding them to cleanupShufflesQueue. */
123+
private case class CleanUp(
124+
conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None)
125+
126+
private val cleanupShufflesQueue: BlockingQueue[CleanUp] = new LinkedBlockingQueue[CleanUp]()
127+
private val cleanupShufflesThread: Thread = new Thread(new Runnable {
128+
override def run(): Unit = {
129+
while (true) {
130+
Utils.tryLogNonFatalError {
131+
val cleanup = cleanupShufflesQueue.poll()
132+
cleanUp(cleanup.conf, cleanup.hadoopConf, cleanup.shuffleId)
133+
}
134+
}
135+
}
136+
}, "FallbackStorage-Cleanup")
137+
cleanupShufflesThread.start()
138+
121139
def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
122140
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
123141
Some(new FallbackStorage(conf))
@@ -138,11 +156,24 @@ private[spark] object FallbackStorage extends Logging {
138156
}
139157
}
140158

159+
/**
160+
* Asynchronously clean up the generated fallback location for this app (and shuffle id if given).
161+
*/
162+
def cleanUpAsync(
163+
conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = {
164+
cleanupShufflesQueue.put(CleanUp(conf, hadoopConf, shuffleId))
165+
}
166+
141167
/** Clean up the generated fallback location for this app (and shuffle id if given). */
142168
def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = {
143169
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
144170
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
145171
conf.contains("spark.app.id")) {
172+
if (shuffleId.isDefined) {
173+
logInfo(log"Cleaning up shuffle ${MDC(SHUFFLE_ID, shuffleId.get)}")
174+
} else {
175+
logInfo(log"Cleaning up app shuffle data")
176+
}
146177
val fallbackPath = shuffleId.foldLeft(
147178
new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, conf.getAppId)
148179
) { case (path, shuffleId) => new Path(path, shuffleId.toString) }

0 commit comments

Comments
 (0)