Skip to content

Commit 0bb716b

Browse files
committed
Revert [SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions
## What changes were proposed in this pull request? Our customer has a very complicated job. Sometimes it successes and sometimes it fails with ``` Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4 has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException ``` However, with the patch apache#23871 , the job hangs forever. When I investigated it, I found that `DAGScheduler` and `TaskSchedulerImpl` define stage completion differently. `DAGScheduler` thinks a stage is completed if all its partitions are marked as completed ([result stage](https://github.com/apache/spark/blob/v2.4.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1362-L1368) and [shuffle stage](https://github.com/apache/spark/blob/v2.4.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1400)). `TaskSchedulerImpl` thinks a stage's task set is completed when all tasks finish (see the [code](https://github.com/apache/spark/blob/v2.4.1/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L779-L784)). Ideally this two definition should be consistent, but apache#23871 breaks it. In our customer's Spark log, I found that, a stage's task set completes, but the stage never completes. More specifically, `DAGScheduler` submits a task set for stage 4.1 with 1000 tasks, but the `TaskSetManager` skips to run the first 100 tasks. Later on, `TaskSetManager` finishes 900 tasks and marks the task set as completed. However, `DAGScheduler` doesn't agree with it and hangs forever, waiting for more task completion events of stage 4.1. With hindsight, I think `TaskSchedulerIImpl.stageIdToFinishedPartitions` is fragile. We need to pay more effort to make sure this is consistent with `DAGScheduler`'s knowledge. When `DAGScheduler` marks some partitions from finished to unfinished, `TaskSchedulerIImpl.stageIdToFinishedPartitions` should be updated as well. This PR reverts apache#23871, let's think of a more robust idea later. ## How was this patch tested? N/A Closes apache#24359 from cloud-fan/revert. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent eea3f55 commit 0bb716b

File tree

3 files changed

+20
-79
lines changed

3 files changed

+20
-79
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.{Locale, Timer, TimerTask}
2222
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

25-
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
25+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2626
import scala.util.Random
2727

2828
import org.apache.spark._
@@ -101,9 +101,6 @@ private[spark] class TaskSchedulerImpl(
101101
// Protected by `this`
102102
val taskIdToExecutorId = new HashMap[Long, String]
103103

104-
// Protected by `this`
105-
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
106-
107104
@volatile private var hasReceivedTask = false
108105
@volatile private var hasLaunchedTask = false
109106
private val starvationTimer = new Timer(true)
@@ -252,20 +249,7 @@ private[spark] class TaskSchedulerImpl(
252249
private[scheduler] def createTaskSetManager(
253250
taskSet: TaskSet,
254251
maxTaskFailures: Int): TaskSetManager = {
255-
// only create a BitSet once for a certain stage since we only remove
256-
// that stage when an active TaskSetManager succeed.
257-
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
258-
val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
259-
// TaskSet got submitted by DAGScheduler may have some already completed
260-
// tasks since DAGScheduler does not always know all the tasks that have
261-
// been completed by other tasksets when completing a stage, so we mark
262-
// those tasks as finished here to avoid launching duplicate tasks, while
263-
// holding the TaskSchedulerImpl lock.
264-
// See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
265-
stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
266-
finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
267-
}
268-
tsm
252+
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
269253
}
270254

271255
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -886,31 +870,19 @@ private[spark] class TaskSchedulerImpl(
886870
}
887871

888872
/**
889-
* Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
873+
* Marks the task has completed in all TaskSetManagers for the given stage.
890874
*
891875
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
892876
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
893877
* do not also submit those same tasks. That also means that a task completion from an earlier
894878
* attempt can lead to the entire stage getting marked as successful.
895-
* And there is also the possibility that the DAGScheduler submits another taskset at the same
896-
* time as we're marking a task completed here -- that taskset would have a task for a partition
897-
* that was already completed. We maintain the set of finished partitions in
898-
* stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
899-
* is submitted. See SPARK-25250 for more details.
900-
*
901-
* note: this method must be called with a lock on this.
902879
*/
903880
private[scheduler] def markPartitionCompletedInAllTaskSets(
904881
stageId: Int,
905882
partitionId: Int,
906883
taskInfo: TaskInfo) = {
907-
// if we do not find a BitSet for this stage, which means an active TaskSetManager
908-
// has already succeeded and removed the stage.
909-
stageIdToFinishedPartitions.get(stageId).foreach{
910-
finishedPartitions => finishedPartitions += partitionId
911-
}
912884
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
913-
tsm.markPartitionCompleted(partitionId, Some(taskInfo))
885+
tsm.markPartitionCompleted(partitionId, taskInfo)
914886
}
915887
}
916888

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
2121
import java.nio.ByteBuffer
2222
import java.util.concurrent.ConcurrentLinkedQueue
2323

24-
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
24+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2525
import scala.math.max
2626
import scala.util.control.NonFatal
2727

@@ -800,11 +800,7 @@ private[spark] class TaskSetManager(
800800
// Mark successful and stop if all the tasks have succeeded.
801801
successful(index) = true
802802
if (tasksSuccessful == numTasks) {
803-
// clean up finished partitions for the stage when the active TaskSetManager succeed
804-
if (!isZombie) {
805-
sched.stageIdToFinishedPartitions -= stageId
806-
isZombie = true
807-
}
803+
isZombie = true
808804
}
809805
} else {
810806
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
@@ -823,21 +819,16 @@ private[spark] class TaskSetManager(
823819
maybeFinishTaskSet()
824820
}
825821

826-
private[scheduler] def markPartitionCompleted(
827-
partitionId: Int,
828-
taskInfo: Option[TaskInfo]): Unit = {
822+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
829823
partitionToIndex.get(partitionId).foreach { index =>
830824
if (!successful(index)) {
831825
if (speculationEnabled && !isZombie) {
832-
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
826+
successfulTaskDurations.insert(taskInfo.duration)
833827
}
834828
tasksSuccessful += 1
835829
successful(index) = true
836830
if (tasksSuccessful == numTasks) {
837-
if (!isZombie) {
838-
sched.stageIdToFinishedPartitions -= stageId
839-
isZombie = true
840-
}
831+
isZombie = true
841832
}
842833
maybeFinishTaskSet()
843834
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,7 +1121,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11211121
}
11221122
}
11231123

1124-
test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
1124+
test("Completions in zombie tasksets update status of non-zombie taskset") {
11251125
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
11261126
val valueSer = SparkEnv.get.serializer.newInstance()
11271127

@@ -1133,9 +1133,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11331133
}
11341134

11351135
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
1136-
// two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
1137-
// to really happen, you'd need the previous stage to also get restarted, and then succeed,
1138-
// in between each attempt, but that happens outside what we're mocking here.)
1136+
// two times, so we have three active task sets for one stage. (For this to really happen,
1137+
// you'd need the previous stage to also get restarted, and then succeed, in between each
1138+
// attempt, but that happens outside what we're mocking here.)
11391139
val zombieAttempts = (0 until 2).map { stageAttempt =>
11401140
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
11411141
taskScheduler.submitTasks(attempt)
@@ -1152,51 +1152,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11521152
assert(tsm.runningTasks === 9)
11531153
tsm
11541154
}
1155-
// we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
1156-
// attempt exists in taskScheduler by now.
1157-
1158-
// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
1159-
// This is possible since the behaviour of submitting new attempt and handling successful task
1160-
// is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
1161-
// separately.
1162-
(0 until 2).foreach { i =>
1163-
completeTaskSuccessfully(zombieAttempts(i), i + 1)
1164-
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
1165-
}
11661155

1167-
// Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
1168-
// "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
1169-
// already completed tasks. And this time with insufficient resources so not all tasks are
1170-
// active.
1156+
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
1157+
// the stage, but this time with insufficient resources so not all tasks are active.
1158+
11711159
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
11721160
taskScheduler.submitTasks(finalAttempt)
11731161
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
1174-
// Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
1175-
// realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
1176-
// any duplicate tasks later (SPARK-25250).
1177-
(0 until 2).map(_ + 1).foreach { partitionId =>
1178-
val index = finalTsm.partitionToIndex(partitionId)
1179-
assert(finalTsm.successful(index))
1180-
}
1181-
11821162
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
11831163
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
11841164
finalAttempt.tasks(task.index).partitionId
11851165
}.toSet
11861166
assert(finalTsm.runningTasks === 5)
11871167
assert(!finalTsm.isZombie)
11881168

1189-
// We continually simulate late completions from our zombie tasksets(but this time, there's one
1190-
// active attempt exists in taskScheduler), corresponding to all the pending partitions in our
1191-
// final attempt. This means we're only waiting on the tasks we've already launched.
1169+
// We simulate late completions from our zombie tasksets, corresponding to all the pending
1170+
// partitions in our final attempt. This means we're only waiting on the tasks we've already
1171+
// launched.
11921172
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
11931173
finalAttemptPendingPartitions.foreach { partition =>
11941174
completeTaskSuccessfully(zombieAttempts(0), partition)
1195-
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
11961175
}
11971176

11981177
// If there is another resource offer, we shouldn't run anything. Though our final attempt
1199-
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
1178+
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
12001179
// remaining tasks to compute are already active in the non-zombie attempt.
12011180
assert(
12021181
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@@ -1244,7 +1223,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12441223
// perspective, as the failures weren't from a problem w/ the tasks themselves.
12451224
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), any())
12461225
}
1247-
assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
12481226
}
12491227

12501228
test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {

0 commit comments

Comments
 (0)