Skip to content

Commit 3d73189

Browse files
cloud-fanSumedh Wale
authored andcommitted
[SPARK-27065][CORE] avoid more than one active task set managers for a stage
## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <[email protected]>
1 parent 4dcecd7 commit 3d73189

File tree

2 files changed

+35
-18
lines changed

2 files changed

+35
-18
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,20 @@ private[spark] class TaskSchedulerImpl(
212212
val stage = taskSet.stageId
213213
val stageTaskSets =
214214
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
215-
stageTaskSets(taskSet.stageAttemptId) = manager
216-
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
217-
ts.taskSet != taskSet && !ts.isZombie
218-
}
219-
if (conflictingTaskSet) {
220-
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
221-
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
215+
216+
// Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one.
217+
// This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2
218+
// TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10
219+
// and it completes. TSM2 finishes tasks for partition 1-9, and thinks he is still active
220+
// because partition 10 is not completed yet. However, DAGScheduler gets task completion
221+
// events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage
222+
// and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a
223+
// TSM3 for it. As a stage can't have more than one active task set managers, we must mark
224+
// TSM2 as zombie (it actually is).
225+
stageTaskSets.foreach { case (_, ts) =>
226+
ts.isZombie = true
222227
}
228+
stageTaskSets(taskSet.stageAttemptId) = manager
223229
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
224230

225231
if (!isLocal && !hasReceivedTask) {

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -165,28 +165,39 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
165165
// Even if one of the task sets has not-serializable tasks, the other task set should
166166
// still be processed without error
167167
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
168-
taskScheduler.submitTasks(taskSet)
168+
val taskSet2 = new TaskSet(
169+
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 1, 0, 0, null)
170+
taskScheduler.submitTasks(taskSet2)
169171
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
170172
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
171173
}
172174

173-
test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") {
175+
test("concurrent attempts for the same stage only have one active taskset") {
174176
val taskScheduler = setupScheduler()
177+
def isTasksetZombie(taskset: TaskSet): Boolean = {
178+
taskScheduler.taskSetManagerForAttempt(taskset.stageId, taskset.stageAttemptId).get.isZombie
179+
}
180+
175181
val attempt1 = FakeTask.createTaskSet(1, 0)
176-
val attempt2 = FakeTask.createTaskSet(1, 1)
177182
taskScheduler.submitTasks(attempt1)
178-
intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) }
183+
// The first submitted taskset is active
184+
assert(!isTasksetZombie(attempt1))
179185

180-
// OK to submit multiple if previous attempts are all zombie
181-
taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId)
182-
.get.isZombie = true
186+
val attempt2 = FakeTask.createTaskSet(1, 1)
183187
taskScheduler.submitTasks(attempt2)
188+
// The first submitted taskset is zombie now
189+
assert(isTasksetZombie(attempt1))
190+
// The newly submitted taskset is active
191+
assert(!isTasksetZombie(attempt2))
192+
184193
val attempt3 = FakeTask.createTaskSet(1, 2)
185-
intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) }
186-
taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId)
187-
.get.isZombie = true
188194
taskScheduler.submitTasks(attempt3)
189-
assert(!failedTaskSet)
195+
// The first submitted taskset remains zombie
196+
assert(isTasksetZombie(attempt1))
197+
// The second submitted taskset is zombie now
198+
assert(isTasksetZombie(attempt2))
199+
// The newly submitted taskset is active
200+
assert(!isTasksetZombie(attempt3))
190201
}
191202

192203
test("don't schedule more tasks after a taskset is zombie") {

0 commit comments

Comments
 (0)