Skip to content

Commit b56e9c6

Browse files
attilapirossquito
authored andcommitted
[SPARK-16630][YARN] Blacklist a node if executors won't launch on it
## What changes were proposed in this pull request? This change extends YARN resource allocation handling with blacklisting functionality. This handles cases when node is messed up or misconfigured such that a container won't launch on it. Before this change backlisting only focused on task execution but this change introduces YarnAllocatorBlacklistTracker which tracks allocation failures per host (when enabled via "spark.yarn.blacklist.executor.launch.blacklisting.enabled"). ## How was this patch tested? ### With unit tests Including a new suite: YarnAllocatorBlacklistTrackerSuite. #### Manually It was tested on a cluster by deleting the Spark jars on one of the node. #### Behaviour before these changes Starting Spark as: ``` spark2-shell --master yarn --deploy-mode client --num-executors 4 --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6" ``` Log is: ``` 18/04/12 06:49:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (6) reached) 18/04/12 06:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: Max number of executor failures (6) reached) 18/04/12 06:49:39 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. 18/04/12 06:49:39 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://apiros-1.gce.test.com:8020/user/systest/.sparkStaging/application_1523459048274_0016 18/04/12 06:49:39 INFO util.ShutdownHookManager: Shutdown hook called ``` #### Behaviour after these changes Starting Spark as: ``` spark2-shell --master yarn --deploy-mode client --num-executors 4 --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6" --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true" ``` And the log is: ``` 18/04/13 05:37:43 INFO yarn.YarnAllocator: Will request 1 executor container(s), each with 1 core(s) and 4505 MB memory (including 409 MB of overhead) 18/04/13 05:37:43 INFO yarn.YarnAllocator: Submitted 1 unlocalized container requests. 18/04/13 05:37:43 INFO yarn.YarnAllocator: Launching container container_1523459048274_0025_01_000008 on host apiros-4.gce.test.com for executor with ID 6 18/04/13 05:37:43 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 18/04/13 05:37:43 INFO yarn.YarnAllocator: Completed container container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com (state: COMPLETE, exit status: 1) 18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com 18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com) 18/04/13 05:37:43 WARN yarn.YarnAllocator: Container marked as failed: container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com. Exit status: 1. Diagnostics: Exception from container-launch. Container id: container_1523459048274_0025_01_000007 Exit code: 1 Stack trace: ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:604) at org.apache.hadoop.util.Shell.run(Shell.java:507) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` Where the most important part is: ``` 18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com 18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com) ``` And execution was continued (no shutdown called). ### Testing the backlisting of the whole cluster Starting Spark with YARN blacklisting enabled then removing a the Spark core jar one by one from all the cluster nodes. Then executing a simple spark job which fails checking the yarn log the expected exit status is contained: ``` 18/06/15 01:07:10 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Due to executor failures all available nodes are blacklisted) 18/06/15 01:07:13 INFO util.ShutdownHookManager: Shutdown hook called ``` Author: “attilapiros” <[email protected]> Closes apache#21068 from attilapiros/SPARK-16630.
1 parent c0cad59 commit b56e9c6

File tree

12 files changed

+479
-56
lines changed

12 files changed

+479
-56
lines changed

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ private[scheduler] class BlacklistTracker (
371371

372372
}
373373

374-
private[scheduler] object BlacklistTracker extends Logging {
374+
private[spark] object BlacklistTracker extends Logging {
375375

376376
private val DEFAULT_TIMEOUT = "1h"
377377

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
170170
if (executorDataMap.contains(executorId)) {
171171
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
172172
context.reply(true)
173-
} else if (scheduler.nodeBlacklist != null &&
174-
scheduler.nodeBlacklist.contains(hostname)) {
173+
} else if (scheduler.nodeBlacklist.contains(hostname)) {
175174
// If the cluster manager gives us an executor on a blacklisted node (because it
176175
// already started allocating those resources before we informed it of our blacklist,
177176
// or if it ignored our blacklist), then we reject that executor immediately.

core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark
1919

2020
import java.util.concurrent.{ExecutorService, TimeUnit}
2121

22-
import scala.collection.Map
2322
import scala.collection.mutable
2423
import scala.concurrent.Future
2524
import scala.concurrent.duration._
@@ -73,6 +72,7 @@ class HeartbeatReceiverSuite
7372
sc = spy(new SparkContext(conf))
7473
scheduler = mock(classOf[TaskSchedulerImpl])
7574
when(sc.taskScheduler).thenReturn(scheduler)
75+
when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
7676
when(scheduler.sc).thenReturn(sc)
7777
heartbeatReceiverClock = new ManualClock
7878
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
@@ -241,7 +241,7 @@ class HeartbeatReceiverSuite
241241
} === Some(true))
242242
}
243243

244-
private def getTrackedExecutors: Map[String, Long] = {
244+
private def getTrackedExecutors: collection.Map[String, Long] = {
245245
// We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend,
246246
// so exclude it from the map. See SPARK-10800.
247247
heartbeatReceiver.invokePrivate(_executorLastSeen()).
@@ -272,7 +272,7 @@ private class FakeSchedulerBackend(
272272

273273
protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
274274
clusterManagerEndpoint.ask[Boolean](
275-
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String]))
275+
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty))
276276
}
277277

278278
protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {

docs/running-on-yarn.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,16 @@ To use a custom metrics.properties for the application master and executors, upd
411411
name matches both the include and the exclude pattern, this file will be excluded eventually.
412412
</td>
413413
</tr>
414+
<tr>
415+
<td><code>spark.yarn.blacklist.executor.launch.blacklisting.enabled</code></td>
416+
<td>false</td>
417+
<td>
418+
Flag to enable blacklisting of nodes having YARN resource allocation problems.
419+
The error limit for blacklisting can be configured by
420+
<code>spark.blacklist.application.maxFailedExecutorsPerNode</code>.
421+
</td>
422+
</tr>
423+
414424
</table>
415425

416426
# Important notes

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
789789
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
790790

791791
taskScheduler = mock[TaskSchedulerImpl]
792+
when(taskScheduler.nodeBlacklist).thenReturn(Set[String]())
792793
when(taskScheduler.sc).thenReturn(sc)
793794

794795
externalShuffleClient = mock[MesosExternalShuffleClient]

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
515515
finish(FinalApplicationStatus.FAILED,
516516
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
517517
s"Max number of executor failures ($maxNumExecutorFailures) reached")
518+
} else if (allocator.isAllNodeBlacklisted) {
519+
finish(FinalApplicationStatus.FAILED,
520+
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
521+
"Due to executor failures all available nodes are blacklisted")
518522
} else {
519523
logDebug("Sending progress")
520524
allocator.allocateResources()

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 15 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.regex.Pattern
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.mutable
27-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
27+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2828
import scala.util.control.NonFatal
2929

3030
import org.apache.hadoop.yarn.api.records._
@@ -66,7 +66,8 @@ private[yarn] class YarnAllocator(
6666
appAttemptId: ApplicationAttemptId,
6767
securityMgr: SecurityManager,
6868
localResources: Map[String, LocalResource],
69-
resolver: SparkRackResolver)
69+
resolver: SparkRackResolver,
70+
clock: Clock = new SystemClock)
7071
extends Logging {
7172

7273
import YarnAllocator._
@@ -102,18 +103,14 @@ private[yarn] class YarnAllocator(
102103
private var executorIdCounter: Int =
103104
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
104105

105-
// Queue to store the timestamp of failed executors
106-
private val failedExecutorsTimeStamps = new Queue[Long]()
106+
private[spark] val failureTracker = new FailureTracker(sparkConf, clock)
107107

108-
private var clock: Clock = new SystemClock
109-
110-
private val executorFailuresValidityInterval =
111-
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
108+
private val allocatorBlacklistTracker =
109+
new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker)
112110

113111
@volatile private var targetNumExecutors =
114112
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
115113

116-
private var currentNodeBlacklist = Set.empty[String]
117114

118115
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
119116
// list of requesters that should be responded to once we find out why the given executor
@@ -149,7 +146,6 @@ private[yarn] class YarnAllocator(
149146

150147
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
151148

152-
153149
// A map to store preferred hostname and possible task numbers running on it.
154150
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
155151

@@ -160,26 +156,11 @@ private[yarn] class YarnAllocator(
160156
private[yarn] val containerPlacementStrategy =
161157
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
162158

163-
/**
164-
* Use a different clock for YarnAllocator. This is mainly used for testing.
165-
*/
166-
def setClock(newClock: Clock): Unit = {
167-
clock = newClock
168-
}
169-
170159
def getNumExecutorsRunning: Int = runningExecutors.size()
171160

172-
def getNumExecutorsFailed: Int = synchronized {
173-
val endTime = clock.getTimeMillis()
161+
def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
174162

175-
while (executorFailuresValidityInterval > 0
176-
&& failedExecutorsTimeStamps.nonEmpty
177-
&& failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
178-
failedExecutorsTimeStamps.dequeue()
179-
}
180-
181-
failedExecutorsTimeStamps.size
182-
}
163+
def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted
183164

184165
/**
185166
* A sequence of pending container requests that have not yet been fulfilled.
@@ -204,9 +185,8 @@ private[yarn] class YarnAllocator(
204185
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint
205186
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
206187
* container placement hint.
207-
* @param nodeBlacklist a set of blacklisted nodes, which is passed in to avoid allocating new
208-
* containers on them. It will be used to update the application master's
209-
* blacklist.
188+
* @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers
189+
* on them. It will be used to update the application master's blacklist.
210190
* @return Whether the new requested total is different than the old value.
211191
*/
212192
def requestTotalExecutorsWithPreferredLocalities(
@@ -220,19 +200,7 @@ private[yarn] class YarnAllocator(
220200
if (requestedTotal != targetNumExecutors) {
221201
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
222202
targetNumExecutors = requestedTotal
223-
224-
// Update blacklist infomation to YARN ResouceManager for this application,
225-
// in order to avoid allocating new Containers on the problematic nodes.
226-
val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist
227-
val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist
228-
if (blacklistAdditions.nonEmpty) {
229-
logInfo(s"adding nodes to YARN application master's blacklist: $blacklistAdditions")
230-
}
231-
if (blacklistRemovals.nonEmpty) {
232-
logInfo(s"removing nodes from YARN application master's blacklist: $blacklistRemovals")
233-
}
234-
amClient.updateBlacklist(blacklistAdditions.toList.asJava, blacklistRemovals.toList.asJava)
235-
currentNodeBlacklist = nodeBlacklist
203+
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
236204
true
237205
} else {
238206
false
@@ -268,6 +236,7 @@ private[yarn] class YarnAllocator(
268236
val allocateResponse = amClient.allocate(progressIndicator)
269237

270238
val allocatedContainers = allocateResponse.getAllocatedContainers()
239+
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
271240

272241
if (allocatedContainers.size > 0) {
273242
logDebug(("Allocated containers: %d. Current executor count: %d. " +
@@ -602,8 +571,9 @@ private[yarn] class YarnAllocator(
602571
completedContainer.getDiagnostics,
603572
PMEM_EXCEEDED_PATTERN))
604573
case _ =>
605-
// Enqueue the timestamp of failed executor
606-
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
574+
// all the failures which not covered above, like:
575+
// disk failure, kill by app master or resource manager, ...
576+
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
607577
(true, "Container marked as failed: " + containerId + onHostStr +
608578
". Exit status: " + completedContainer.getExitStatus +
609579
". Diagnostics: " + completedContainer.getDiagnostics)

0 commit comments

Comments
 (0)