Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,13 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp.threads")
.doc("Number of threads that clean up fallback storage data.")
.version("4.2.0")
.intConf
.createWithDefault(5)

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp.waitOnShutdown")
.doc("If true, Spark waits for all fallback storage data to be cleaned up " +
Expand Down
66 changes: 29 additions & 37 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ package org.apache.spark.storage

import java.io.DataInputStream
import java.nio.ByteBuffer
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS, STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
import org.apache.spark.storage.FallbackStorage.{cleanupShufflesThread, stopped}
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}

/**
* A fallback storage used by storage decommissioners.
Expand All @@ -54,18 +53,6 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, hadoopConf)
private val appId = conf.getAppId

// Ensure cleanup work only blocks Spark shutdown when configured so
ShutdownHookManager.addShutdownHook { () =>
// indicate cleanup thread to shut down once queue is drained
stopped.set(true)

// only wait for cleanup thread to finish when configured so
// thread is set daemon so JVM can shut down while it is running
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)) {
cleanupShufflesThread.join()
}
}

// Visible for testing
def copy(
shuffleBlockInfo: ShuffleBlockInfo,
Expand Down Expand Up @@ -139,25 +126,30 @@ private[spark] object FallbackStorage extends Logging {

private val stopped = new AtomicBoolean(false)

// a queue of shuffle cleanup requests and a daemon thread processing them
private val cleanupShufflesQueue: BlockingQueue[CleanUp] = new LinkedBlockingQueue[CleanUp]()
private val cleanupShufflesThread: Thread = new Thread(new Runnable {
override def run(): Unit = {
// if stopped and queue is empty, this thread terminates
while (!stopped.get() || !cleanupShufflesQueue.isEmpty) {
Utils.tryLogNonFatalError {
// wait a second for another cleanup request, then check while condition
val cleanup = Option(cleanupShufflesQueue.poll(1L, TimeUnit.SECONDS))
cleanup.foreach { c => cleanUp(c.conf, c.hadoopConf, c.shuffleId) }
}
}
// a daemon thread pool for shuffle cleanups
private val cleanupShufflesNumThreads =
SparkEnv.get.conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_THREADS)
private val cleanupShufflesThreadPool =
ThreadUtils.newDaemonFixedThreadPool(cleanupShufflesNumThreads, "fallback-storage-cleanup")
private val cleanupShufflesExecutionContext =
ExecutionContext.fromExecutor(cleanupShufflesThreadPool)

// Ensure cleanup work only blocks Spark shutdown when configured so
private val cleanupShufflesWaitOnShutdown =
SparkEnv.get.conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN)

ShutdownHookManager.addShutdownHook { () =>
// indicate the cleanup thread to terminate once the queue is drained
stopped.set(true)

// only wait for cleanups to finish when configured so
if (cleanupShufflesWaitOnShutdown) {
cleanupShufflesThreadPool.shutdown()
while (!cleanupShufflesThreadPool.awaitTermination(1L, TimeUnit.SECONDS)) {}
} else {
cleanupShufflesThreadPool.shutdownNow()
}
}, "fallback-storage-cleanup")
// this is a daemon thread so JVM shutdown is not blocked by this thread running
// we block ShutdownHook above if STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP_WAIT_ON_SHUTDOWN
// is true
cleanupShufflesThread.setDaemon(true)
cleanupShufflesThread.start()
}

def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
Expand Down Expand Up @@ -187,7 +179,7 @@ private[spark] object FallbackStorage extends Logging {
if (stopped.get()) {
logInfo("Not queueing cleanup due to shutdown")
} else {
cleanupShufflesQueue.put(CleanUp(conf, hadoopConf, shuffleId))
Future { cleanUp(conf, hadoopConf, shuffleId) }(cleanupShufflesExecutionContext)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils}
import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -61,6 +61,13 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
}

override def beforeAll(): Unit = {
// some tests need a SparkEnv set
val sparkEnv = mock(classOf[SparkEnv])
when(sparkEnv.conf).thenReturn(getSparkConf())
SparkEnv.set(sparkEnv)
}

test("fallback storage APIs - copy/exists") {
val conf = new SparkConf(false)
.set("spark.app.id", "testId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.{MapOutputTracker, SparkFunSuite, TaskContext}
import org.apache.spark.{MapOutputTracker, SparkConf, SparkEnv, SparkFunSuite, TaskContext}
import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
import org.apache.spark.network._
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
Expand All @@ -59,6 +59,11 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
mapOutputTracker = mock(classOf[MapOutputTracker])
when(mapOutputTracker.getMapSizesForMergeResult(any(), any(), any()))
.thenReturn(Seq.empty.iterator)

// some tests need a SparkEnv set
val sparkEnv = mock(classOf[SparkEnv])
when(sparkEnv.conf).thenReturn(new SparkConf())
SparkEnv.set(sparkEnv)
}

private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
Expand Down
Loading