Skip to content

Commit 51d4279

Browse files
committed
[SPARK-55974][CORE][YARN] Relaunch new executors if the executor launching take too long time
1 parent 587dfa4 commit 51d4279

File tree

8 files changed

+238
-9
lines changed

8 files changed

+238
-9
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ private[spark] trait ExecutorAllocationClient {
3737
*/
3838
def isExecutorActive(id: String): Boolean
3939

40+
/**
41+
* Hook for subclasses to be notified when an executor has registered with the driver.
42+
* YarnSchedulerBackend overrides this to send ExecutorRegisteredWithDriver to the AM for
43+
* executor launch timeout tracking.
44+
*/
45+
def onExecutorRegistered(executorId: String, resourceProfileId: Int): Unit
46+
4047
/**
4148
* Update the cluster manager on our scheduling needs. Three bits of information are included
4249
* to help it make decisions.

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ private[spark] object CoarseGrainedClusterMessages {
123123
// indicating the executor starts to decommission.
124124
object ExecutorDecommissionSigReceived extends CoarseGrainedClusterMessage
125125

126+
// Driver notifies AM when an executor has registered with the driver. Used for executor launch
127+
// timeout tracking: AM stops tracking the executor for timeout once it receives this message.
128+
case class ExecutorRegistered(executorId: String, resourceProfileId: Int)
129+
extends CoarseGrainedClusterMessage
130+
126131
case class RemoveWorker(workerId: String, host: String, message: String)
127132
extends CoarseGrainedClusterMessage
128133

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
318318
decommissionExecutors(Array((executorId, v._1)), v._2, v._3)
319319
unknownExecutorsPendingDecommission.invalidate(executorId)
320320
})
321+
CoarseGrainedSchedulerBackend.this.onExecutorRegistered(executorId, resourceProfileId)
321322
context.reply(true)
322323
}
323324

@@ -754,6 +755,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
754755
!executorsPendingDecommission.contains(id)
755756
}
756757

758+
override def onExecutorRegistered(executorId: String, resourceProfileId: Int): Unit = {}
759+
757760
/**
758761
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
759762
* could be used, even if some of them are being used at the moment.

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,9 @@ private[spark] class ApplicationMaster(
848848
case None =>
849849
logWarning("Container allocator is not ready to find executor loss reasons yet.")
850850
}
851+
852+
case ExecutorRegistered(executorId, resourceProfileId) =>
853+
Option(allocator).foreach(_.onExecutorRegistered(executorId, resourceProfileId))
851854
}
852855

853856
override def onDisconnected(remoteAddress: RpcAddress): Unit = {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,14 @@ private[yarn] class YarnAllocator(
9696
@GuardedBy("this")
9797
private val launchingExecutorContainerIds = collection.mutable.HashSet[ContainerId]()
9898

99+
// ResourceProfileId -> (executorId -> Option[(firstSeenMs, isRelaunching)])
100+
// Tracks all running executors and their launch timeout state. Value: Some((firstSeenMs,
101+
// isRelaunching)) = not yet confirmed by driver (firstSeenMs = launch time, isRelaunching =
102+
// true if replacement already requested after timeout); None = confirmed by driver; entry
103+
// removed when executor/container is lost.
99104
@GuardedBy("this")
100-
private val runningExecutorsPerResourceProfileId = new HashMap[Int, mutable.Set[String]]()
105+
private val runningExecutorsPerResourceProfileId =
106+
new HashMap[Int, mutable.HashMap[String, Option[(Long, Boolean)]]]()
101107

102108
@GuardedBy("this")
103109
private val numExecutorsStartingPerResourceProfileId = new HashMap[Int, AtomicInteger]
@@ -175,6 +181,8 @@ private[yarn] class YarnAllocator(
175181

176182
private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
177183

184+
private val maxDelayLaunchMillis = sparkConf.get(CONTAINER_LAUNCH_TIMEOUT)
185+
178186
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
179187
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
180188

@@ -216,7 +224,8 @@ private[yarn] class YarnAllocator(
216224
private def initDefaultProfile(): Unit = synchronized {
217225
allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
218226
new HashMap[String, mutable.Set[ContainerId]]()
219-
runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]())
227+
runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID,
228+
mutable.HashMap[String, Option[(Long, Boolean)]]())
220229
numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0)
221230
val initTargetExecNum = SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
222231
targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initTargetExecNum
@@ -280,8 +289,44 @@ private[yarn] class YarnAllocator(
280289
new HashMap[String, mutable.Set[ContainerId]]())
281290
}
282291

283-
private def getOrUpdateRunningExecutorForRPId(rpId: Int): mutable.Set[String] = synchronized {
284-
runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[String]())
292+
private def getOrUpdateRunningExecutorForRPId(
293+
rpId: Int): mutable.HashMap[String, Option[(Long, Boolean)]] = synchronized {
294+
runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId,
295+
mutable.HashMap[String, Option[(Long, Boolean)]]())
296+
}
297+
298+
/** Count executors in the given map that are marked for relaunch (isRelaunching = true). */
299+
private def countRelaunching(map: mutable.HashMap[String, Option[(Long, Boolean)]]): Int = {
300+
map.count { case (_, v) => v.exists { case (_, isRelaunching) => isRelaunching } }
301+
}
302+
303+
def getNumExecutorsMissLaunched: Int = synchronized {
304+
runningExecutorsPerResourceProfileId.values.map { m =>
305+
m.count { case (_, v) => v.exists { case (_, isRelaunching) => !isRelaunching } }
306+
}.sum
307+
}
308+
309+
/**
310+
* Updates launch timeout tracking: marks unconfirmed executors that exceed maxDelayLaunchMillis
311+
* as needing relaunch. Returns the count of executors for which replacement has been requested.
312+
*/
313+
private def updateAndGetRelaunchingCount(rpId: Int): Int = {
314+
val running = getOrUpdateRunningExecutorForRPId(rpId)
315+
val now = clock.getTimeMillis()
316+
running.keys.toSeq.foreach { executorId =>
317+
running(executorId).foreach { case (firstSeen, isRelaunching) =>
318+
if (!isRelaunching && now - firstSeen > maxDelayLaunchMillis) {
319+
val hostOpt = executorIdToContainer
320+
.get(executorId)
321+
.flatMap(c => allocatedContainerToHostMap.get(c.getId))
322+
allocatorNodeHealthTracker.handleResourceAllocationFailure(hostOpt)
323+
running(executorId) = Some((firstSeen, true))
324+
logWarning(s"Requesting new resources since launching executor " +
325+
s"$executorId takes more than $maxDelayLaunchMillis ms")
326+
}
327+
}
328+
}
329+
countRelaunching(running)
285330
}
286331

287332
private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger = synchronized {
@@ -428,6 +473,12 @@ private[yarn] class YarnAllocator(
428473
}
429474
}
430475

476+
private[yarn] def onExecutorRegistered(executorId: String, resourceProfileId: Int): Unit = {
477+
synchronized {
478+
getOrUpdateRunningExecutorForRPId(resourceProfileId).update(executorId, None)
479+
}
480+
}
481+
431482
/**
432483
* Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
433484
* equal to maxExecutors.
@@ -500,10 +551,12 @@ private[yarn] class YarnAllocator(
500551
val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) =>
501552
val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get
502553
val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty).size
554+
val relaunching = updateAndGetRelaunchingCount(rpId)
503555
val running = getOrUpdateRunningExecutorForRPId(rpId).size
504556
logDebug(s"Updating resource requests for ResourceProfile id: $rpId, target: " +
505-
s"$targetNum, pending: $pending, running: $running, executorsStarting: $starting")
506-
(rpId, targetNum - pending - running - starting)
557+
s"$targetNum, pending: $pending, running: $running, executorsStarting: $starting, " +
558+
s"relaunchingExecutors: $relaunching")
559+
(rpId, targetNum - pending - running - starting + relaunching)
507560
}.toMap
508561

509562
missingPerProfile.foreach { case (rpId, missing) =>
@@ -776,7 +829,8 @@ private[yarn] class YarnAllocator(
776829
val containerCores = rp.getExecutorCores.getOrElse(defaultCores)
777830

778831
val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
779-
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
832+
val relaunchingCount = countRelaunching(getOrUpdateRunningExecutorForRPId(rpId))
833+
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId) + relaunchingCount) {
780834
getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
781835
launchingExecutorContainerIds.add(containerId)
782836
if (launchContainers) {
@@ -819,7 +873,8 @@ private[yarn] class YarnAllocator(
819873
} else {
820874
logInfo(log"Skip launching executorRunnable as running executors count: " +
821875
log"${MDC(LogKeys.COUNT, rpRunningExecs)} reached target executors count: " +
822-
log"${MDC(LogKeys.NUM_EXECUTOR_TARGET, getOrUpdateTargetNumExecutorsForRPId(rpId))}.")
876+
log"${MDC(LogKeys.NUM_EXECUTOR_TARGET, getOrUpdateTargetNumExecutorsForRPId(rpId))} " +
877+
log"and relaunching count: ${MDC(LogKeys.COUNT, relaunchingCount)}.")
823878
internalReleaseContainer(container)
824879
}
825880
}
@@ -829,7 +884,7 @@ private[yarn] class YarnAllocator(
829884
container: Container): Unit = synchronized {
830885
val containerId = container.getId
831886
if (launchingExecutorContainerIds.contains(containerId)) {
832-
getOrUpdateRunningExecutorForRPId(rpId).add(executorId)
887+
getOrUpdateRunningExecutorForRPId(rpId)(executorId) = Some((clock.getTimeMillis(), false))
833888
executorIdToContainer(executorId) = container
834889
containerIdToExecutorIdAndResourceProfileId(containerId) = (executorId, rpId)
835890

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config/package.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,16 @@ package object config extends Logging {
269269
.intConf
270270
.createWithDefault(25)
271271

272+
private[spark] val CONTAINER_LAUNCH_TIMEOUT =
273+
ConfigBuilder("spark.yarn.containerLaunchTimeout")
274+
.doc("Maximum time to wait for an executor to successfully launch before considering " +
275+
"it stunk and requesting a replacement. This timeout helps detect stuck " +
276+
"executor launches in YARN mode. If an executor takes longer than this timeout to " +
277+
"launch, it will be marked for relaunch and the host may be marked as unhealthy.")
278+
.version("3.2.0-sdi-136")
279+
.timeConf(TimeUnit.MILLISECONDS)
280+
.createWithDefaultString("10min")
281+
272282
private[spark] val MAX_REPORTER_THREAD_FAILURES =
273283
ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures")
274284
.version("1.2.0")

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ private[spark] abstract class YarnSchedulerBackend(
170170
yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
171171
}
172172

173+
override protected def onExecutorRegistered(executorId: String, resourceProfileId: Int): Unit = {
174+
amEndpoint.foreach(_.send(ExecutorRegisteredWithDriver(executorId, resourceProfileId)))
175+
}
176+
173177
override def sufficientResourcesRegistered(): Boolean = {
174178
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
175179
}

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,4 +904,146 @@ class YarnAllocatorSuite extends SparkFunSuite
904904
handler.getNumExecutorsRunning should be(0)
905905
handler.getNumExecutorsStarting should be(0)
906906
}
907+
908+
test("SPARK-55974: should not mark executors for relaunch before timeout") {
909+
val timeoutMs = 5000L
910+
val (handler, _) = createAllocator(
911+
maxExecutors = 2,
912+
additionalConfigs = Map(CONTAINER_LAUNCH_TIMEOUT.key -> timeoutMs.toString))
913+
handler.updateResourceRequests()
914+
handler.getNumContainersPendingAllocate should be (2)
915+
916+
val container1 = createContainer("host1")
917+
val container2 = createContainer("host2")
918+
919+
handler.handleAllocatedContainers(Array(container1, container2).toImmutableArraySeq)
920+
handler.getNumExecutorsRunning should be (2)
921+
922+
// Executors becoming missing (not confirmed by driver)
923+
handler.updateResourceRequests()
924+
925+
// Advance time but not enough to trigger timeout
926+
clock.advance(timeoutMs - 1000L)
927+
handler.updateResourceRequests()
928+
929+
handler.getNumExecutorsRunning should be (2)
930+
handler.getNumContainersPendingAllocate should be (0)
931+
}
932+
933+
test("SPARK-55974: should mark executors for relaunch after timeout") {
934+
val timeoutMs = 5000L
935+
val (handler, _) = createAllocator(
936+
maxExecutors = 2,
937+
additionalConfigs = Map(CONTAINER_LAUNCH_TIMEOUT.key -> timeoutMs.toString))
938+
handler.updateResourceRequests()
939+
handler.getNumContainersPendingAllocate should be (2)
940+
941+
val container1 = createContainer("host1")
942+
val container2 = createContainer("host2")
943+
944+
handler.handleAllocatedContainers(Array(container1, container2).toImmutableArraySeq)
945+
handler.getNumExecutorsRunning should be (2)
946+
947+
// Executors becoming missing (not confirmed by driver)
948+
handler.updateResourceRequests()
949+
950+
// Advance time past timeout
951+
clock.advance(timeoutMs + 1000L)
952+
handler.updateResourceRequests()
953+
954+
// Should relaunch new containers
955+
handler.getNumContainersPendingAllocate should be (2)
956+
handler.getNumExecutorsRunning should be (2)
957+
}
958+
959+
test("SPARK-55974: should handle mixed active and missing executors") {
960+
val timeoutMs = 5000L
961+
val (handler, _) = createAllocator(
962+
maxExecutors = 3,
963+
additionalConfigs = Map(CONTAINER_LAUNCH_TIMEOUT.key -> timeoutMs.toString))
964+
handler.updateResourceRequests()
965+
handler.getNumContainersPendingAllocate should be (3)
966+
967+
val container1 = createContainer("host1")
968+
val container2 = createContainer("host2")
969+
val container3 = createContainer("host3")
970+
971+
handler.handleAllocatedContainers(Array(container1, container2, container3).toImmutableArraySeq)
972+
handler.getNumExecutorsRunning should be (3)
973+
974+
// Only 1 executor is active, 2 are missing
975+
handler.onExecutorRegistered("1", defaultRPId)
976+
handler.updateResourceRequests()
977+
978+
// Advance time past timeout
979+
clock.advance(timeoutMs + 1000L)
980+
handler.updateResourceRequests()
981+
982+
// Should relaunch 2 new containers
983+
handler.getNumContainersPendingAllocate should be (2)
984+
handler.getNumExecutorsRunning should be (3)
985+
}
986+
987+
test("SPARK-55974: should reset tracking when executors become active again") {
988+
val timeoutMs = 5000L
989+
val (handler, _) = createAllocator(
990+
maxExecutors = 2,
991+
additionalConfigs = Map(CONTAINER_LAUNCH_TIMEOUT.key -> timeoutMs.toString))
992+
handler.updateResourceRequests()
993+
handler.getNumContainersPendingAllocate should be (2)
994+
995+
val container1 = createContainer("host1")
996+
val container2 = createContainer("host2")
997+
998+
handler.handleAllocatedContainers(Array(container1, container2).toImmutableArraySeq)
999+
handler.getNumExecutorsRunning should be (2)
1000+
1001+
handler.updateResourceRequests()
1002+
handler.getNumExecutorsMissLaunched should be (2)
1003+
1004+
clock.advance(timeoutMs - 1000L)
1005+
handler.updateResourceRequests()
1006+
handler.getNumExecutorsMissLaunched should be (2)
1007+
1008+
// Executors become active (confirmed by driver)
1009+
handler.onExecutorRegistered("1", defaultRPId)
1010+
handler.onExecutorRegistered("2", defaultRPId)
1011+
handler.updateResourceRequests()
1012+
handler.getNumExecutorsMissLaunched should be (0)
1013+
1014+
// Should not request new containers since executors are active again
1015+
handler.getNumContainersPendingAllocate should be (0)
1016+
}
1017+
1018+
test("SPARK-55974: executor launch timeout - should handle multiple timeout cycles") {
1019+
val timeoutMs = 3000L
1020+
val (handler, _) = createAllocator(
1021+
maxExecutors = 1,
1022+
additionalConfigs = Map(CONTAINER_LAUNCH_TIMEOUT.key -> timeoutMs.toString))
1023+
handler.updateResourceRequests()
1024+
handler.getNumContainersPendingAllocate should be (1)
1025+
1026+
val container = createContainer("host1")
1027+
handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)
1028+
handler.getNumExecutorsRunning should be (1)
1029+
1030+
// First timeout cycle
1031+
handler.updateResourceRequests()
1032+
clock.advance(timeoutMs + 1000L)
1033+
handler.updateResourceRequests()
1034+
handler.getNumContainersPendingAllocate should be (1)
1035+
1036+
// Simulate new container allocated
1037+
val newContainer = createContainer("host2")
1038+
handler.handleAllocatedContainers(Array(newContainer).toImmutableArraySeq)
1039+
handler.getNumExecutorsRunning should be (2)
1040+
1041+
// Second timeout cycle
1042+
handler.updateResourceRequests()
1043+
clock.advance(timeoutMs + 1000L)
1044+
handler.updateResourceRequests()
1045+
1046+
// Should request another container
1047+
handler.getNumContainersPendingAllocate should be (1)
1048+
}
9071049
}

0 commit comments

Comments
 (0)