Skip to content

Commit 6b63c71

Browse files
committed
Make FallbackStorage cleanup multithreaded
1 parent 774c887 commit 6b63c71

File tree

4 files changed

+50
-39
lines changed

4 files changed

+50
-39
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,13 @@ package object config {
623623
.booleanConf
624624
.createWithDefault(false)
625625

626+
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS =
627+
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp.threads")
628+
.doc("Number of threads that clean up fallback storage data.")
629+
.version("4.2.0")
630+
.intConf
631+
.createWithDefault(5)
632+
626633
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN =
627634
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp.waitOnShutdown")
628635
.doc("If true, Spark waits for all fallback storage data to be cleaned up " +

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,27 @@ package org.apache.spark.storage
1919

2020
import java.io.DataInputStream
2121
import java.nio.ByteBuffer
22-
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
22+
import java.util.concurrent.TimeUnit
2323
import java.util.concurrent.atomic.AtomicBoolean
2424

25-
import scala.concurrent.Future
25+
import scala.concurrent.{ExecutionContext, Future}
2626
import scala.reflect.ClassTag
2727

2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.hadoop.fs.{FileSystem, Path}
3030

31-
import org.apache.spark.{SparkConf, SparkException}
31+
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
3232
import org.apache.spark.deploy.SparkHadoopUtil
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.internal.LogKeys._
35-
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
35+
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
3636
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
3737
import org.apache.spark.network.util.JavaUtils
3838
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
3939
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
4040
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
4141
import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
42-
import org.apache.spark.storage.FallbackStorage.{cleanupShufflesThread, stopped}
43-
import org.apache.spark.util.{ShutdownHookManager, Utils}
42+
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
4443

4544
/**
4645
* A fallback storage used by storage decommissioners.
@@ -54,18 +53,6 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
5453
private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, hadoopConf)
5554
private val appId = conf.getAppId
5655

57-
// Ensure cleanup work only blocks Spark shutdown when configured so
58-
ShutdownHookManager.addShutdownHook { () =>
59-
// indicate cleanup thread to shut down once queue is drained
60-
stopped.set(true)
61-
62-
// only wait for cleanup thread to finish when configured so
63-
// thread is set daemon so JVM can shut down while it is running
64-
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)) {
65-
cleanupShufflesThread.join()
66-
}
67-
}
68-
6956
// Visible for testing
7057
def copy(
7158
shuffleBlockInfo: ShuffleBlockInfo,
@@ -139,25 +126,30 @@ private[spark] object FallbackStorage extends Logging {
139126

140127
private val stopped = new AtomicBoolean(false)
141128

142-
// a queue of shuffle cleanup requests and a daemon thread processing them
143-
private val cleanupShufflesQueue: BlockingQueue[CleanUp] = new LinkedBlockingQueue[CleanUp]()
144-
private val cleanupShufflesThread: Thread = new Thread(new Runnable {
145-
override def run(): Unit = {
146-
// if stopped and queue is empty, this thread terminates
147-
while (!stopped.get() || !cleanupShufflesQueue.isEmpty) {
148-
Utils.tryLogNonFatalError {
149-
// wait a second for another cleanup request, then check while condition
150-
val cleanup = Option(cleanupShufflesQueue.poll(1L, TimeUnit.SECONDS))
151-
cleanup.foreach { c => cleanUp(c.conf, c.hadoopConf, c.shuffleId) }
152-
}
153-
}
129+
// a daemon thread pool for shuffle cleanups
130+
private val cleanupShufflesNumThreads =
131+
SparkEnv.get.conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS)
132+
private val cleanupShufflesThreadPool =
133+
ThreadUtils.newDaemonFixedThreadPool(cleanupShufflesNumThreads, "fallback-storage-cleanup")
134+
private val cleanupShufflesExecutionContext =
135+
ExecutionContext.fromExecutor(cleanupShufflesThreadPool)
136+
137+
// Ensure cleanup work only blocks Spark shutdown when configured so
138+
private val cleanupShufflesWaitOnShutdown =
139+
SparkEnv.get.conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)
140+
141+
ShutdownHookManager.addShutdownHook { () =>
142+
// indicate the cleanup thread to terminate once the queue is drained
143+
stopped.set(true)
144+
145+
// only wait for cleanups to finish when configured so
146+
if (cleanupShufflesWaitOnShutdown) {
147+
cleanupShufflesThreadPool.shutdown()
148+
while (!cleanupShufflesThreadPool.awaitTermination(1L, TimeUnit.SECONDS)) {}
149+
} else {
150+
cleanupShufflesThreadPool.shutdownNow()
154151
}
155-
}, "fallback-storage-cleanup")
156-
// this is a daemon thread so JVM shutdown is not blocked by this thread running
157-
// we block ShutdownHook above if STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN
158-
// is true
159-
cleanupShufflesThread.setDaemon(true)
160-
cleanupShufflesThread.start()
152+
}
161153

162154
def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
163155
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
@@ -187,7 +179,7 @@ private[spark] object FallbackStorage extends Logging {
187179
if (stopped.get()) {
188180
logInfo("Not queueing cleanup due to shutdown")
189181
} else {
190-
cleanupShufflesQueue.put(CleanUp(conf, hadoopConf, shuffleId))
182+
Future { cleanUp(conf, hadoopConf, shuffleId) }(cleanupShufflesExecutionContext)
191183
}
192184
}
193185

core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.mockito.{ArgumentMatchers => mc}
2828
import org.mockito.Mockito.{mock, never, verify, when}
2929
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
3030

31-
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
31+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils}
3232
import org.apache.spark.LocalSparkContext.withSpark
3333
import org.apache.spark.deploy.SparkHadoopUtil
3434
import org.apache.spark.internal.config._
@@ -61,6 +61,13 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
6161
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
6262
}
6363

64+
override def beforeAll(): Unit = {
65+
// some tests need a SparkEnv set
66+
val sparkEnv = mock(classOf[SparkEnv])
67+
when(sparkEnv.conf).thenReturn(getSparkConf())
68+
SparkEnv.set(sparkEnv)
69+
}
70+
6471
test("fallback storage APIs - copy/exists") {
6572
val conf = new SparkConf(false)
6673
.set("spark.app.id", "testId")

core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.mockito.invocation.InvocationOnMock
3737
import org.mockito.stubbing.Answer
3838
import org.roaringbitmap.RoaringBitmap
3939

40-
import org.apache.spark.{MapOutputTracker, SparkFunSuite, TaskContext}
40+
import org.apache.spark.{MapOutputTracker, SparkConf, SparkEnv, SparkFunSuite, TaskContext}
4141
import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
4242
import org.apache.spark.network._
4343
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
@@ -59,6 +59,11 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
5959
mapOutputTracker = mock(classOf[MapOutputTracker])
6060
when(mapOutputTracker.getMapSizesForMergeResult(any(), any(), any()))
6161
.thenReturn(Seq.empty.iterator)
62+
63+
// some tests need a SparkEnv set
64+
val sparkEnv = mock(classOf[SparkEnv])
65+
when(sparkEnv.conf).thenReturn(new SparkConf())
66+
SparkEnv.set(sparkEnv)
6267
}
6368

6469
private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)

0 commit comments

Comments
 (0)