Skip to content

Commit 559b899

Browse files
pgandhitgravescs
authored andcommitted
[SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl
Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. ## What changes were proposed in this pull request? The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. ## How was this patch tested? Screenshots of the thread dump have been attached below: **heartbeat-receiver-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png"> **dispatcher-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png"> Closes apache#22221 from pgandhi999/SPARK-25231. Authored-by: pgandhi <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
1 parent 9254492 commit 559b899

File tree

4 files changed

+12
-11
lines changed

4 files changed

+12
-11
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import java.nio.ByteBuffer
2121
import java.util.{Locale, Timer, TimerTask}
22-
import java.util.concurrent.TimeUnit
22+
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

2525
import scala.collection.Set
@@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl(
9191
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
9292

9393
// Protected by `this`
94-
private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
94+
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
9595
val taskIdToExecutorId = new HashMap[Long, String]
9696

9797
@volatile private var hasReceivedTask = false
@@ -315,7 +315,7 @@ private[spark] class TaskSchedulerImpl(
315315
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
316316
tasks(i) += task
317317
val tid = task.taskId
318-
taskIdToTaskSetManager(tid) = taskSet
318+
taskIdToTaskSetManager.put(tid, taskSet)
319319
taskIdToExecutorId(tid) = execId
320320
executorIdToRunningTaskIds(execId).add(tid)
321321
availableCpus(i) -= CPUS_PER_TASK
@@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
465465
var reason: Option[ExecutorLossReason] = None
466466
synchronized {
467467
try {
468-
taskIdToTaskSetManager.get(tid) match {
468+
Option(taskIdToTaskSetManager.get(tid)) match {
469469
case Some(taskSet) =>
470470
if (state == TaskState.LOST) {
471471
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
@@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
517517
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
518518
blockManagerId: BlockManagerId): Boolean = {
519519
// (taskId, stageId, stageAttemptId, accumUpdates)
520-
val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
520+
val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = {
521521
accumUpdates.flatMap { case (id, updates) =>
522522
val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
523-
taskIdToTaskSetManager.get(id).map { taskSetMgr =>
523+
Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
524524
(id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
525525
}
526526
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
290290
for (task <- tasks.flatten) {
291291
val serializedTask = TaskDescription.encode(task)
292292
if (serializedTask.limit() >= maxRpcMessageSize) {
293-
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
293+
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
294294
try {
295295
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
296296
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +

core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ private[spark] abstract class MockBackend(
400400
// get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
401401
// tests from introducing a race if they need it.
402402
val newTasks = newTaskDescriptions.map { taskDescription =>
403-
val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
403+
val taskSet =
404+
Option(taskScheduler.taskIdToTaskSetManager.get(taskDescription.taskId).taskSet).get
404405
val task = taskSet.tasks(taskDescription.index)
405406
(taskDescription, task)
406407
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
248248
taskScheduler.submitTasks(attempt2)
249249
val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten
250250
assert(1 === taskDescriptions3.length)
251-
val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get
251+
val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId)).get
252252
assert(mgr.taskSet.stageAttemptId === 1)
253253
assert(!failedTaskSet)
254254
}
@@ -286,7 +286,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
286286
assert(10 === taskDescriptions3.length)
287287

288288
taskDescriptions3.foreach { task =>
289-
val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get
289+
val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(task.taskId)).get
290290
assert(mgr.taskSet.stageAttemptId === 1)
291291
}
292292
assert(!failedTaskSet)
@@ -724,7 +724,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
724724
// only schedule one task because of locality
725725
assert(taskDescs.size === 1)
726726

727-
val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get
727+
val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId)).get
728728
assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY))
729729
// we should know about both executors, even though we only scheduled tasks on one of them
730730
assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0")))

0 commit comments

Comments
 (0)