Skip to content

Commit 52e9711

Browse files
dhruvetgravescs
andcommitted
[SPARK-22148][SPARK-15815][SCHEDULER] Acquire new executors to avoid hang because of blacklisting
## What changes were proposed in this pull request? Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors so that we can complete the job successfully. We try to acquire a new executor only when we can kill an existing idle executor. We fallback to the older implementation where we abort the job if we cannot find an idle executor. ## How was this patch tested? I performed some manual tests to check and validate the behavior. ```scala val rdd = sc.parallelize(Seq(1 to 10), 3) import org.apache.spark.TaskContext val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } ) mapped.collect ``` Closes apache#22288 from dhruve/bug/SPARK-22148. Lead-authored-by: Dhruve Ashar <[email protected]> Co-authored-by: Dhruve Ashar <[email protected]> Co-authored-by: Tom Graves <[email protected]> Signed-off-by: Thomas Graves <[email protected]> (cherry picked from commit fdd3bac) Signed-off-by: Thomas Graves <[email protected]>
1 parent f98c0ad commit 52e9711

File tree

7 files changed

+318
-36
lines changed

7 files changed

+318
-36
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,14 @@ package object config {
592592
.checkValue(v => v > 0, "The value should be a positive time value.")
593593
.createWithDefaultString("365d")
594594

595+
private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT =
596+
ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout")
597+
.doc("The timeout in seconds to wait to acquire a new executor and schedule a task " +
598+
"before aborting a TaskSet which is unschedulable because of being completely blacklisted.")
599+
.timeConf(TimeUnit.SECONDS)
600+
.checkValue(v => v >= 0, "The value should be a non negative time value.")
601+
.createWithDefault(120)
602+
595603
private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
596604
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
597605
.doc("Time in seconds to wait between a max concurrent tasks check failure and the next " +

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker (
146146
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
147147
}
148148

149+
private def killExecutor(exec: String, msg: String): Unit = {
150+
allocationClient match {
151+
case Some(a) =>
152+
logInfo(msg)
153+
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
154+
force = true)
155+
case None =>
156+
logInfo(s"Not attempting to kill blacklisted executor id $exec " +
157+
s"since allocation client is not defined.")
158+
}
159+
}
160+
149161
private def killBlacklistedExecutor(exec: String): Unit = {
150162
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
151-
allocationClient match {
152-
case Some(a) =>
153-
logInfo(s"Killing blacklisted executor id $exec " +
154-
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
155-
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
156-
force = true)
157-
case None =>
158-
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
159-
s"since allocation client is not defined.")
160-
}
163+
killExecutor(exec,
164+
s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
161165
}
162166
}
163167

168+
private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = {
169+
killExecutor(exec,
170+
s"Killing blacklisted idle executor id $exec because of task unschedulability and trying " +
171+
"to acquire a new executor.")
172+
}
173+
164174
private def killExecutorsOnBlacklistedNode(node: String): Unit = {
165175
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
166176
allocationClient match {

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.rpc.RpcEndpoint
3434
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
3535
import org.apache.spark.scheduler.TaskLocality.TaskLocality
3636
import org.apache.spark.storage.BlockManagerId
37-
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
37+
import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
3838

3939
/**
4040
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
@@ -116,6 +116,11 @@ private[spark] class TaskSchedulerImpl(
116116

117117
protected val executorIdToHost = new HashMap[String, String]
118118

119+
private val abortTimer = new Timer(true)
120+
private val clock = new SystemClock
121+
// Exposed for testing
122+
val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]
123+
119124
// Listener object to pass upcalls into
120125
var dagScheduler: DAGScheduler = null
121126

@@ -414,9 +419,53 @@ private[spark] class TaskSchedulerImpl(
414419
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
415420
} while (launchedTaskAtCurrentMaxLocality)
416421
}
422+
417423
if (!launchedAnyTask) {
418-
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
424+
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
425+
// If the taskSet is unschedulable we try to find an existing idle blacklisted
426+
// executor. If we cannot find one, we abort immediately. Else we kill the idle
427+
// executor and kick off an abortTimer which if it doesn't schedule a task within the
428+
// the timeout will abort the taskSet if we were unable to schedule any task from the
429+
// taskSet.
430+
// Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
431+
// task basis.
432+
// Note 2: The taskSet can still be aborted when there are more than one idle
433+
// blacklisted executors and dynamic allocation is on. This can happen when a killed
434+
// idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
435+
// pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
436+
// timer to expire and abort the taskSet.
437+
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
438+
case Some ((executorId, _)) =>
439+
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
440+
blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))
441+
442+
val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
443+
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
444+
logInfo(s"Waiting for $timeout ms for completely "
445+
+ s"blacklisted task to be schedulable again before aborting $taskSet.")
446+
abortTimer.schedule(
447+
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
448+
}
449+
case None => // Abort Immediately
450+
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
451+
s" executors can be found to kill. Aborting $taskSet." )
452+
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
453+
}
454+
}
455+
} else {
456+
// We want to defer killing any taskSets as long as we have a non blacklisted executor
457+
// which can be used to schedule a task from any active taskSets. This ensures that the
458+
// job can make progress.
459+
// Note: It is theoretically possible that a taskSet never gets scheduled on a
460+
// non-blacklisted executor and the abort timer doesn't kick in because of a constant
461+
// submission of new TaskSets. See the PR for more details.
462+
if (unschedulableTaskSetToExpiryTime.nonEmpty) {
463+
logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
464+
"recently scheduled.")
465+
unschedulableTaskSetToExpiryTime.clear()
466+
}
419467
}
468+
420469
if (launchedAnyTask && taskSet.isBarrier) {
421470
// Check whether the barrier tasks are partially launched.
422471
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
@@ -452,6 +501,23 @@ private[spark] class TaskSchedulerImpl(
452501
return tasks
453502
}
454503

504+
private def createUnschedulableTaskSetAbortTimer(
505+
taskSet: TaskSetManager,
506+
taskIndex: Int): TimerTask = {
507+
new TimerTask() {
508+
override def run() {
509+
if (unschedulableTaskSetToExpiryTime.contains(taskSet) &&
510+
unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis()) {
511+
logInfo("Cannot schedule any task because of complete blacklisting. " +
512+
s"Wait time for scheduling expired. Aborting $taskSet.")
513+
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
514+
} else {
515+
this.cancel()
516+
}
517+
}
518+
}
519+
}
520+
455521
/**
456522
* Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
457523
* overriding in tests, so it can be deterministic.
@@ -587,6 +653,7 @@ private[spark] class TaskSchedulerImpl(
587653
barrierCoordinator.stop()
588654
}
589655
starvationTimer.cancel()
656+
abortTimer.cancel()
590657
}
591658

592659
override def defaultParallelism(): Int = backend.defaultParallelism()

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -623,8 +623,8 @@ private[spark] class TaskSetManager(
623623
*
624624
* It is possible that this taskset has become impossible to schedule *anywhere* due to the
625625
* blacklist. The most common scenario would be if there are fewer executors than
626-
* spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
627-
* will hang.
626+
* spark.task.maxFailures. We need to detect this so we can avoid the job from being hung.
627+
* We try to acquire new executor/s by killing an existing idle blacklisted executor.
628628
*
629629
* There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that
630630
* would add extra time to each iteration of the scheduling loop. Here, we take the approach of
@@ -635,9 +635,9 @@ private[spark] class TaskSetManager(
635635
* failures (this is because the method picks one unscheduled task, and then iterates through each
636636
* executor until it finds one that the task isn't blacklisted on).
637637
*/
638-
private[scheduler] def abortIfCompletelyBlacklisted(
639-
hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
640-
taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
638+
private[scheduler] def getCompletelyBlacklistedTaskIfAny(
639+
hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
640+
taskSetBlacklistHelperOpt.flatMap { taskSetBlacklist =>
641641
val appBlacklist = blacklistTracker.get
642642
// Only look for unschedulable tasks when at least one executor has registered. Otherwise,
643643
// task sets will be (unnecessarily) aborted in cases when no executors have registered yet.
@@ -658,11 +658,11 @@ private[spark] class TaskSetManager(
658658
}
659659
}
660660

661-
pendingTask.foreach { indexInTaskSet =>
661+
pendingTask.find { indexInTaskSet =>
662662
// try to find some executor this task can run on. Its possible that some *other*
663663
// task isn't schedulable anywhere, but we will discover that in some later call,
664664
// when that unschedulable task is the last task remaining.
665-
val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) =>
665+
hostToExecutors.forall { case (host, execsOnHost) =>
666666
// Check if the task can run on the node
667667
val nodeBlacklisted =
668668
appBlacklist.isNodeBlacklisted(host) ||
@@ -679,22 +679,27 @@ private[spark] class TaskSetManager(
679679
}
680680
}
681681
}
682-
if (blacklistedEverywhere) {
683-
val partition = tasks(indexInTaskSet).partitionId
684-
abort(s"""
685-
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
686-
|cannot run anywhere due to node and executor blacklist.
687-
|Most recent failure:
688-
|${taskSetBlacklist.getLatestFailureReason}
689-
|
690-
|Blacklisting behavior can be configured via spark.blacklist.*.
691-
|""".stripMargin)
692-
}
693682
}
683+
} else {
684+
None
694685
}
695686
}
696687
}
697688

689+
private[scheduler] def abortSinceCompletelyBlacklisted(indexInTaskSet: Int): Unit = {
690+
taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
691+
val partition = tasks(indexInTaskSet).partitionId
692+
abort(s"""
693+
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
694+
|cannot run anywhere due to node and executor blacklist.
695+
|Most recent failure:
696+
|${taskSetBlacklist.getLatestFailureReason}
697+
|
698+
|Blacklisting behavior can be configured via spark.blacklist.*.
699+
|""".stripMargin)
700+
}
701+
}
702+
698703
/**
699704
* Marks the task as getting result and notifies the DAG Scheduler
700705
*/

core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,16 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
9696
assertDataStructuresEmpty(noFailure = true)
9797
}
9898

99-
// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job
100-
// doesn't hang
99+
// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, we try
100+
// to acquire a new executor and if we aren't able to get one, the job doesn't hang and we abort
101101
testScheduler(
102102
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
103103
extraConfs = Seq(
104104
config.BLACKLIST_ENABLED.key -> "true",
105105
"spark.testing.nHosts" -> "2",
106106
"spark.testing.nExecutorsPerHost" -> "1",
107-
"spark.testing.nCoresPerExecutor" -> "1"
107+
"spark.testing.nCoresPerExecutor" -> "1",
108+
"spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
108109
)
109110
) {
110111
def runBackend(): Unit = {

0 commit comments

Comments
 (0)