Skip to content

Commit 26ba0ed

Browse files
ivosoncloud-fan
authored andcommitted
[SPARK-53564][CORE] Avoid DAGScheduler exits due to blockManager RPC timeout in DAGSchedulerEventProcessLoop
### What changes were proposed in this pull request? Currently there are a few blocking RPC requests used in `DAGSchedulerEventProcessLoop`: 1. [blockManagerMaster.getLocations(blockIds)](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L451) used in `getCacheLocs` to get rdd cache locations. 2. [blockManagerMaster.removeShufflePushMergerLocation](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2754) used in `removeExecutorAndUnregisterOutputs`. 3. [blockManagerMaster.removeExecutor(execId)](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2765) used in `removeExecutorAndUnregisterOutputs`. `RpcTimeoutException` could be thrown if there are slow events blocking the `BlockManagerMasterEndpoint`, and the exception is not handled in the `DAGSchedulerEventProcessLoop`. Once this happens, the DAGScheduler will exit. This PR proposes to catch and handle the `RpcTimeoutException` properly instead of crashing the application. There are 2 scenarios: 1. Change the requests in `removeExecutorAndUnregisterOutputs` to be async since we don't rely on the response, and let `BlockManagerMasterEndpoint` to deal with the potential errors. 2. Abort the stage if rpc timeout happens while `submitStage`. ### Why are the changes needed? Avoid rpc timeout crashing spark application. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52335 from ivoson/SPARK-53564. Authored-by: Tengfei Huang <tengfei.h@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 37f8df4 commit 26ba0ed

File tree

5 files changed

+99
-19
lines changed

5 files changed

+99
-19
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
2828
/**
2929
* An exception thrown if RpcTimeout modifies a `TimeoutException`.
3030
*/
31-
private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
31+
private[spark] class RpcTimeoutException(message: String, cause: TimeoutException)
3232
extends TimeoutException(message) { initCause(cause) }
3333

3434

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import org.apache.spark.rdd.{RDD, RDDCheckpointData}
4949
import org.apache.spark.resource.{ResourceProfile, TaskResourceProfile}
5050
import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
5151
import org.apache.spark.rpc.RpcTimeout
52+
import org.apache.spark.rpc.RpcTimeoutException
5253
import org.apache.spark.storage._
5354
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
5455
import org.apache.spark.util._
@@ -771,7 +772,14 @@ private[spark] class DAGScheduler(
771772
def visit(rdd: RDD[_]): Unit = {
772773
if (!visited(rdd)) {
773774
visited += rdd
774-
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
775+
val rddHasUncachedPartitions = try {
776+
getCacheLocs(rdd).contains(Nil)
777+
} catch {
778+
case e: RpcTimeoutException =>
779+
logWarning(log"Failed to get cache locations for RDD ${MDC(RDD_ID, rdd.id)} due " +
780+
log"to rpc timeout, assuming not fully cached.", e)
781+
true
782+
}
775783
if (rddHasUncachedPartitions) {
776784
for (dep <- rdd.dependencies) {
777785
dep match {
@@ -1463,7 +1471,7 @@ private[spark] class DAGScheduler(
14631471
abortStage(stage, reason, None)
14641472
} else {
14651473
val missing = getMissingParentStages(stage).sortBy(_.id)
1466-
logDebug("missing: " + missing)
1474+
logInfo(log"Missing parents found for ${MDC(STAGE, stage)}: ${MDC(MISSING_PARENT_STAGES, missing)}")
14671475
if (missing.isEmpty) {
14681476
logInfo(log"Submitting ${MDC(STAGE, stage)} (${MDC(RDD_ID, stage.rdd)}), " +
14691477
log"which has no missing parents")
@@ -2807,7 +2815,7 @@ private[spark] class DAGScheduler(
28072815
hostToUnregisterOutputs.foreach(
28082816
host => blockManagerMaster.removeShufflePushMergerLocation(host))
28092817
}
2810-
blockManagerMaster.removeExecutor(execId)
2818+
blockManagerMaster.removeExecutorAsync(execId)
28112819
clearCacheLocs()
28122820
}
28132821
if (fileLost) {

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,12 @@ class BlockManagerMaster(
167167

168168
/**
169169
* Remove the host from the candidate list of shuffle push mergers. This can be
170-
* triggered if there is a FetchFailedException on the host
170+
* triggered if there is a FetchFailedException on the host. Non-blocking.
171171
* @param host
172172
*/
173173
def removeShufflePushMergerLocation(host: String): Unit = {
174-
driverEndpoint.askSync[Unit](RemoveShufflePushMergerLocation(host))
174+
logInfo(log"Request to remove shuffle push merger location ${MDC(HOST, host)}")
175+
driverEndpoint.ask[Unit](RemoveShufflePushMergerLocation(host))
175176
}
176177

177178
def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,13 @@ class BlockManagerMasterEndpoint(
530530
blockManagerInfo.get(candidateBMId).foreach { bm =>
531531
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
532532
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
533-
bm.storageEndpoint.ask[Boolean](replicateMsg)
533+
try {
534+
bm.storageEndpoint.ask[Boolean](replicateMsg)
535+
} catch {
536+
case e: Exception =>
537+
logWarning(log"Failed to request replication of ${MDC(BLOCK_ID, blockId)} " +
538+
log"from ${MDC(BLOCK_MANAGER_ID, candidateBMId)}", e)
539+
}
534540
}
535541
}
536542
}
@@ -554,6 +560,7 @@ class BlockManagerMasterEndpoint(
554560
private def removeExecutor(execId: String): Unit = {
555561
logInfo(log"Trying to remove executor ${MDC(EXECUTOR_ID, execId)} from BlockManagerMaster.")
556562
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
563+
logInfo(log"Removed ${MDC(EXECUTOR_ID, execId)} successfully in removeExecutor")
557564
}
558565

559566
/**

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import java.util.{ArrayList => JArrayList, Collections => JCollections, Properties}
2121
import java.util.concurrent.{CountDownLatch, Delayed, LinkedBlockingQueue, ScheduledFuture, TimeUnit}
22-
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
22+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference}
2323

2424
import scala.annotation.meta.param
2525
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -44,6 +44,7 @@ import org.apache.spark.network.shuffle.ExternalBlockStoreClient
4444
import org.apache.spark.rdd.{DeterministicLevel, RDD}
4545
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceProfile, TaskResourceRequests}
4646
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
47+
import org.apache.spark.rpc.RpcTimeoutException
4748
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
4849
import org.apache.spark.scheduler.local.LocalSchedulerBackend
4950
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
@@ -327,7 +328,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
327328
}.getOrElse(Seq())
328329
}.toIndexedSeq
329330
}
330-
override def removeExecutor(execId: String): Unit = {
331+
override def removeExecutorAsync(execId: String): Unit = {
331332
// don't need to propagate to the driver, which we don't have
332333
}
333334

@@ -751,7 +752,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
751752
// Now the executor on hostA is lost
752753
runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed")))
753754
// Executor is removed but shuffle files are not unregistered
754-
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
755+
verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
755756
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
756757

757758
// The MapOutputTracker has all the shuffle files
@@ -764,9 +765,9 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
764765
complete(taskSets(1), Seq(
765766
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)
766767
))
767-
// blockManagerMaster.removeExecutor is not called again
768+
// blockManagerMaster.removeExecutorAsync is not called again
768769
// but shuffle files are unregistered
769-
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
770+
verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
770771
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
771772

772773
// Shuffle files for hostA-exec should be lost
@@ -1010,7 +1011,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
10101011
complete(taskSets(1), Seq(
10111012
(Success, 42),
10121013
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)))
1013-
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
1014+
verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
10141015
// ask the scheduler to try it again
10151016
scheduler.resubmitFailedStages()
10161017
// have the 2nd attempt pass
@@ -1050,7 +1051,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
10501051
case _ => false
10511052
}
10521053
runEvent(ExecutorLost("hostA-exec", event))
1053-
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
1054+
verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
10541055
if (expectFileLoss) {
10551056
if (expectHostFileLoss) {
10561057
verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA")
@@ -1085,7 +1086,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
10851086
submit(reduceRdd, Array(0))
10861087
completeShuffleMapStageSuccessfully(0, 0, 1)
10871088
runEvent(ExecutorLost("hostA-exec", event))
1088-
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
1089+
verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
10891090
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
10901091
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
10911092
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
@@ -2187,7 +2188,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
21872188
// fail the third stage because hostA went down
21882189
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
21892190
// TODO assert this:
2190-
// blockManagerMaster.removeExecutor("hostA-exec")
2191+
// blockManagerMaster.removeExecutorAsync("hostA-exec")
21912192
// have DAGScheduler try again
21922193
scheduler.resubmitFailedStages()
21932194
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
@@ -2213,7 +2214,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
22132214
// pretend stage 2 failed because hostA went down
22142215
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
22152216
// TODO assert this:
2216-
// blockManagerMaster.removeExecutor("hostA-exec")
2217+
// blockManagerMaster.removeExecutorAsync("hostA-exec")
22172218
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
22182219
scheduler.resubmitFailedStages()
22192220
assertLocations(taskSets(3), Seq(Seq("hostD")))
@@ -5072,7 +5073,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
50725073
ExecutorExited(-100, false, "Container marked as failed")))
50735074

50745075
// Shuffle push merger executor should not be removed and the shuffle files are not unregistered
5075-
verify(blockManagerMaster, times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
5076+
verify(blockManagerMaster, times(0))
5077+
.removeExecutorAsync(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
50765078
verify(mapOutputTracker,
50775079
times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
50785080

@@ -5084,7 +5086,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
50845086

50855087
// Verify that we are not removing the executor,
50865088
// and that we are only removing the outputs on the host
5087-
verify(blockManagerMaster, times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
5089+
verify(blockManagerMaster, times(0))
5090+
.removeExecutorAsync(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
50885091
verify(blockManagerMaster, times(1)).removeShufflePushMergerLocation("hostA")
50895092
verify(mapOutputTracker,
50905093
times(1)).removeOutputsOnHost("hostA")
@@ -5472,6 +5475,67 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
54725475
}
54735476
}
54745477

5478+
test("SPARK-53564: RpcTimeout while submitting stage should not fail the job/application") {
5479+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
5480+
shuffleMapRdd.cache()
5481+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
5482+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
5483+
5484+
val callCount = new AtomicInteger(0)
5485+
doAnswer(invocation => {
5486+
if (callCount.incrementAndGet() == 1) {
5487+
// First call for finding missing parents, throw RpcTimeoutException
5488+
throw new RpcTimeoutException("RpcTimeout", null)
5489+
} else {
5490+
invocation.callRealMethod()
5491+
}
5492+
}).when(blockManagerMaster)
5493+
.getLocations(any(classOf[Array[BlockId]]))
5494+
5495+
submit(reduceRdd, Array(0))
5496+
assert(scheduler.stageIdToStage.size === 2)
5497+
completeShuffleMapStageSuccessfully(0, 0, 2)
5498+
completeAndCheckAnswer(taskSets(1), Seq((Success, 42)), Map(0 -> 42))
5499+
assertDataStructuresEmpty()
5500+
verify(blockManagerMaster, times(2))
5501+
.getLocations(any(classOf[Array[BlockId]]))
5502+
}
5503+
5504+
test("SPARK-53564: Resubmit missing parent when failed to get cache locations") {
5505+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
5506+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
5507+
val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
5508+
shuffleMapRdd1.cache()
5509+
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(1))
5510+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep1), tracker = mapOutputTracker)
5511+
5512+
// Cache for shuffleMapRdd1 are available
5513+
cacheLocations(shuffleMapRdd1.id -> 0) = Seq(makeBlockManagerId("hostA"))
5514+
cacheLocations(shuffleMapRdd1.id -> 1) = Seq(makeBlockManagerId("hostB"))
5515+
5516+
val callCount = new AtomicInteger(0)
5517+
doAnswer(invocation => {
5518+
if (callCount.incrementAndGet() == 1) {
5519+
// First call for finding missing parents, throw RpcTimeoutException
5520+
throw new RpcTimeoutException("RpcTimeout", null)
5521+
} else {
5522+
invocation.callRealMethod()
5523+
}
5524+
}).when(blockManagerMaster)
5525+
.getLocations(any(classOf[Array[BlockId]]))
5526+
5527+
submit(reduceRdd, Array(0))
5528+
// All 3 stages should be submitted even though caches are available for shuffleMapRdd1
5529+
// but failed to get cache locations.
5530+
assert(scheduler.stageIdToStage.size === 3)
5531+
completeShuffleMapStageSuccessfully(0, 0, 2)
5532+
completeShuffleMapStageSuccessfully(1, 0, 2)
5533+
completeAndCheckAnswer(taskSets(2), Seq((Success, 42)), Map(0 -> 42))
5534+
assertDataStructuresEmpty()
5535+
verify(blockManagerMaster, times(2))
5536+
.getLocations(any(classOf[Array[BlockId]]))
5537+
}
5538+
54755539
/**
54765540
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
54775541
* Note that this checks only the host and not the executor ID.

0 commit comments

Comments
 (0)