Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2376,6 +2376,8 @@ class SparkContext(config: SparkConf) extends Logging {
}
Utils.tryLogNonFatalError {
// not calling cleanUpAsync here as we want cleanup to complete before exiting
// this returns immediately if STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN
// is false
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
}
Utils.tryLogNonFatalError {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,17 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp.waitOnShutdown")
.doc("If true, Spark waits for all fallback storage data to be cleaned up " +
"when shutting down. This may defer the termination of the Spark application " +
"for a significant time. " +
s"Only used when $STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP is true. " +
"Use an external clean up mechanism when false, for instance a TTL.")
.version("4.2.0")
.booleanConf
.createWithDefault(true)

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
.doc("The location for fallback storage during block manager decommissioning. " +
Expand Down
43 changes: 36 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.storage

import java.io.DataInputStream
import java.nio.ByteBuffer
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.Future
import scala.reflect.ClassTag
Expand All @@ -31,14 +32,15 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
import org.apache.spark.util.Utils
import org.apache.spark.storage.FallbackStorage.{cleanupShufflesThread, stopped}
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
* A fallback storage used by storage decommissioners.
Expand All @@ -52,6 +54,18 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, hadoopConf)
private val appId = conf.getAppId

// Ensure cleanup work only blocks Spark shutdown when configured so
ShutdownHookManager.addShutdownHook { () =>
// indicate cleanup thread to shut down once queue is drained
stopped.set(true)

// only wait for cleanup thread to finish when configured so
// thread is set daemon so JVM can shut down while it is running
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)) {
cleanupShufflesThread.join()
}
}

// Visible for testing
def copy(
shuffleBlockInfo: ShuffleBlockInfo,
Expand Down Expand Up @@ -123,17 +137,26 @@ private[spark] object FallbackStorage extends Logging {
private case class CleanUp(
conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None)

private val stopped = new AtomicBoolean(false)

// a queue of shuffle cleanup requests and a daemon thread processing them
private val cleanupShufflesQueue: BlockingQueue[CleanUp] = new LinkedBlockingQueue[CleanUp]()
private val cleanupShufflesThread: Thread = new Thread(new Runnable {
override def run(): Unit = {
while (true) {
// if stopped and queue is empty, this thread terminates
while (!stopped.get() || !cleanupShufflesQueue.isEmpty) {
Utils.tryLogNonFatalError {
val cleanup = cleanupShufflesQueue.poll()
cleanUp(cleanup.conf, cleanup.hadoopConf, cleanup.shuffleId)
// wait a second for another cleanup request, then check while condition
val cleanup = Option(cleanupShufflesQueue.poll(1L, TimeUnit.SECONDS))
cleanup.foreach { c => cleanUp(c.conf, c.hadoopConf, c.shuffleId) }
}
}
}
}, "fallback-storage-cleanup")
// this is a daemon thread so JVM shutdown is not blocked by this thread running
// we block ShutdownHook above if STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN
// is true
cleanupShufflesThread.setDaemon(true)
cleanupShufflesThread.start()

def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
Expand Down Expand Up @@ -161,13 +184,19 @@ private[spark] object FallbackStorage extends Logging {
*/
def cleanUpAsync(
conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = {
cleanupShufflesQueue.put(CleanUp(conf, hadoopConf, shuffleId))
if (stopped.get()) {
logInfo("Not queueing cleanup due to shutdown")
} else {
cleanupShufflesQueue.put(CleanUp(conf, hadoopConf, shuffleId))
}
}

/** Clean up the generated fallback location for this app (and shuffle id if given). */
def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = {
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
(shuffleId.isDefined ||
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)) &&
conf.contains("spark.app.id")) {
if (shuffleId.isDefined) {
logInfo(log"Cleaning up shuffle ${MDC(SHUFFLE_ID, shuffleId.get)}")
Expand Down
Loading