Skip to content

Commit 9d107de

Browse files
committed
Make FallbackStorage cleanup multithreaded
1 parent 774c887 commit 9d107de

File tree

2 files changed

+36
-27
lines changed

2 files changed

+36
-27
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,13 @@ package object config {
623623
.booleanConf
624624
.createWithDefault(false)
625625

626+
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS =
627+
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp.threads")
628+
.doc("Number of threads that clean up fallback storage data.")
629+
.version("4.2.0")
630+
.intConf
631+
.createWithDefault(5)
632+
626633
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN =
627634
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp.waitOnShutdown")
628635
.doc("If true, Spark waits for all fallback storage data to be cleaned up " +

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package org.apache.spark.storage
1919

2020
import java.io.DataInputStream
2121
import java.nio.ByteBuffer
22-
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
22+
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicBoolean
2424

25-
import scala.concurrent.Future
25+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
2626
import scala.reflect.ClassTag
2727

2828
import org.apache.hadoop.conf.Configuration
@@ -32,15 +32,15 @@ import org.apache.spark.{SparkConf, SparkException}
3232
import org.apache.spark.deploy.SparkHadoopUtil
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.internal.LogKeys._
35-
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
35+
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
3636
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
3737
import org.apache.spark.network.util.JavaUtils
3838
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
3939
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
4040
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
4141
import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
42-
import org.apache.spark.storage.FallbackStorage.{cleanupShufflesThread, stopped}
43-
import org.apache.spark.util.{ShutdownHookManager, Utils}
42+
import org.apache.spark.storage.FallbackStorage.{cleanupShufflesThreadPool, stopped}
43+
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
4444

4545
/**
4646
* A fallback storage used by storage decommissioners.
@@ -59,10 +59,14 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
5959
// indicate cleanup thread to shut down once queue is drained
6060
stopped.set(true)
6161

62-
// only wait for cleanup thread to finish when configured so
63-
// thread is set daemon so JVM can shut down while it is running
64-
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)) {
65-
cleanupShufflesThread.join()
62+
// only wait for cleanups to finish when configured so
63+
if (cleanupShufflesThreadPool.isDefined) {
64+
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)) {
65+
cleanupShufflesThreadPool.get.shutdown()
66+
while (!cleanupShufflesThreadPool.get.awaitTermination(1L, TimeUnit.SECONDS)) { }
67+
} else {
68+
cleanupShufflesThreadPool.get.shutdownNow()
69+
}
6670
}
6771
}
6872

@@ -139,25 +143,23 @@ private[spark] object FallbackStorage extends Logging {
139143

140144
private val stopped = new AtomicBoolean(false)
141145

142-
// a queue of shuffle cleanup requests and a daemon thread processing them
143-
private val cleanupShufflesQueue: BlockingQueue[CleanUp] = new LinkedBlockingQueue[CleanUp]()
144-
private val cleanupShufflesThread: Thread = new Thread(new Runnable {
145-
override def run(): Unit = {
146-
// if stopped and queue is empty, this thread terminates
147-
while (!stopped.get() || !cleanupShufflesQueue.isEmpty) {
148-
Utils.tryLogNonFatalError {
149-
// wait a second for another cleanup request, then check while condition
150-
val cleanup = Option(cleanupShufflesQueue.poll(1L, TimeUnit.SECONDS))
151-
cleanup.foreach { c => cleanUp(c.conf, c.hadoopConf, c.shuffleId) }
152-
}
146+
// a daemon thread pool for shuffle cleanups
147+
private var cleanupShufflesThreadPool: Option[ThreadPoolExecutor] = None
148+
private var cleanupShufflesExecutionContext: Option[ExecutionContextExecutor] = None
149+
150+
private def getCleanupShufflesExecutionContext(conf: SparkConf): ExecutionContextExecutor = {
151+
FallbackStorage.synchronized {
152+
if (cleanupShufflesExecutionContext.isEmpty) {
153+
val threads = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS)
154+
cleanupShufflesThreadPool =
155+
Some(ThreadUtils.newDaemonFixedThreadPool(threads, "fallback-storage-cleanup"))
156+
cleanupShufflesExecutionContext =
157+
cleanupShufflesThreadPool.map(ExecutionContext.fromExecutor)
153158
}
159+
160+
cleanupShufflesExecutionContext.get
154161
}
155-
}, "fallback-storage-cleanup")
156-
// this is a daemon thread so JVM shutdown is not blocked by this thread running
157-
// we block ShutdownHook above if STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN
158-
// is true
159-
cleanupShufflesThread.setDaemon(true)
160-
cleanupShufflesThread.start()
162+
}
161163

162164
def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
163165
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
@@ -187,7 +189,7 @@ private[spark] object FallbackStorage extends Logging {
187189
if (stopped.get()) {
188190
logInfo("Not queueing cleanup due to shutdown")
189191
} else {
190-
cleanupShufflesQueue.put(CleanUp(conf, hadoopConf, shuffleId))
192+
Future { cleanUp(conf, hadoopConf, shuffleId) }(getCleanupShufflesExecutionContext(conf))
191193
}
192194
}
193195

0 commit comments

Comments
 (0)