@@ -19,7 +19,8 @@ package org.apache.spark.storage
1919
2020import java .io .DataInputStream
2121import java .nio .ByteBuffer
22- import java .util .concurrent .{BlockingQueue , LinkedBlockingQueue }
22+ import java .util .concurrent .{BlockingQueue , LinkedBlockingQueue , TimeUnit }
23+ import java .util .concurrent .atomic .AtomicBoolean
2324
2425import scala .concurrent .Future
2526import scala .reflect .ClassTag
@@ -31,14 +32,15 @@ import org.apache.spark.{SparkConf, SparkException}
3132import org .apache .spark .deploy .SparkHadoopUtil
3233import org .apache .spark .internal .Logging
3334import org .apache .spark .internal .LogKeys ._
34- import org .apache .spark .internal .config .{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP , STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH }
35+ import org .apache .spark .internal .config .{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP , STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN , STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH }
3536import org .apache .spark .network .buffer .{ManagedBuffer , NioManagedBuffer }
3637import org .apache .spark .network .util .JavaUtils
3738import org .apache .spark .rpc .{RpcAddress , RpcEndpointRef , RpcTimeout }
3839import org .apache .spark .shuffle .{IndexShuffleBlockResolver , ShuffleBlockInfo }
3940import org .apache .spark .shuffle .IndexShuffleBlockResolver .NOOP_REDUCE_ID
4041import org .apache .spark .storage .BlockManagerMessages .RemoveShuffle
41- import org .apache .spark .util .Utils
42+ import org .apache .spark .storage .FallbackStorage .{cleanupShufflesThread , stopped }
43+ import org .apache .spark .util .{ShutdownHookManager , Utils }
4244
4345/**
4446 * A fallback storage used by storage decommissioners.
@@ -52,6 +54,18 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
5254 private val fallbackFileSystem = FileSystem .get(fallbackPath.toUri, hadoopConf)
5355 private val appId = conf.getAppId
5456
57+ // Ensure cleanup work only blocks Spark shutdown when configured so
58+ ShutdownHookManager .addShutdownHook { () =>
59+ // indicate cleanup thread to shut down once queue is drained
60+ stopped.set(true )
61+
62+ // only wait for cleanup thread to finish when configured so
63+ // thread is set daemon so JVM can shut down while it is running
64+ if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN )) {
65+ cleanupShufflesThread.join()
66+ }
67+ }
68+
5569 // Visible for testing
5670 def copy (
5771 shuffleBlockInfo : ShuffleBlockInfo ,
@@ -123,17 +137,26 @@ private[spark] object FallbackStorage extends Logging {
123137 private case class CleanUp (
124138 conf : SparkConf , hadoopConf : Configuration , shuffleId : Option [Int ] = None )
125139
140+ private val stopped = new AtomicBoolean (false )
141+
142+ // a queue of shuffle cleanup requests and a daemon thread processing them
126143 private val cleanupShufflesQueue : BlockingQueue [CleanUp ] = new LinkedBlockingQueue [CleanUp ]()
127144 private val cleanupShufflesThread : Thread = new Thread (new Runnable {
128145 override def run (): Unit = {
129- while (true ) {
146+ // if stopped and queue is empty, this thread terminates
147+ while (! stopped.get() || ! cleanupShufflesQueue.isEmpty) {
130148 Utils .tryLogNonFatalError {
131- val cleanup = cleanupShufflesQueue.poll()
132- cleanUp(cleanup.conf, cleanup.hadoopConf, cleanup.shuffleId)
149+ // wait a second for another cleanup request, then check while condition
150+ val cleanup = Option (cleanupShufflesQueue.poll(1L , TimeUnit .SECONDS ))
151+ cleanup.foreach { c => cleanUp(c.conf, c.hadoopConf, c.shuffleId) }
133152 }
134153 }
135154 }
136155 }, " FallbackStorage-Cleanup" )
156+ // this is a daemon thread so JVM shutdown is not blocked by this thread running
157+ // we block ShutdownHook above if STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN
158+ // is true
159+ cleanupShufflesThread.setDaemon(true )
137160 cleanupShufflesThread.start()
138161
139162 def getFallbackStorage (conf : SparkConf ): Option [FallbackStorage ] = {
@@ -161,13 +184,19 @@ private[spark] object FallbackStorage extends Logging {
161184 */
162185 def cleanUpAsync (
163186 conf : SparkConf , hadoopConf : Configuration , shuffleId : Option [Int ] = None ): Unit = {
164- cleanupShufflesQueue.put(CleanUp (conf, hadoopConf, shuffleId))
187+ if (stopped.get()) {
188+ logInfo(" Not queueing cleanup due to shutdown" )
189+ } else {
190+ cleanupShufflesQueue.put(CleanUp (conf, hadoopConf, shuffleId))
191+ }
165192 }
166193
167194 /** Clean up the generated fallback location for this app (and shuffle id if given). */
168195 def cleanUp (conf : SparkConf , hadoopConf : Configuration , shuffleId : Option [Int ] = None ): Unit = {
169196 if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH ).isDefined &&
170197 conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP ) &&
198+ (shuffleId.isDefined ||
199+ conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN )) &&
171200 conf.contains(" spark.app.id" )) {
172201 if (shuffleId.isDefined) {
173202 logInfo(log " Cleaning up shuffle ${MDC (SHUFFLE_ID , shuffleId.get)}" )
0 commit comments