Skip to content

Commit ec3e998

Browse files
Thomas GravesMarcelo Vanzin
authored andcommitted
[SPARK-24909][CORE] Always unregister pending partition on task completion.
Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts. To fix this we change to always unregister the pending partition on task completion. ## What changes were proposed in this pull request? this PR is actually reverting the change in SPARK-19263, so that it always does shuffleStage.pendingPartitions -= task.partitionId. The change in SPARK-23433, should fix the issue originally from SPARK-19263. ## How was this patch tested? Unit tests. The condition happens on a race which I haven't reproduced on a real customer, just see it sometimes on customers jobs in a real cluster. I am also working on adding spark scheduler integration tests. Closes apache#21976 from tgravescs/SPARK-24909. Authored-by: Thomas Graves <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 6b1b10c commit ec3e998

File tree

2 files changed

+15
-24
lines changed

2 files changed

+15
-24
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,18 +1373,10 @@ private[spark] class DAGScheduler(
13731373

13741374
case smt: ShuffleMapTask =>
13751375
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
1376+
shuffleStage.pendingPartitions -= task.partitionId
13761377
val status = event.result.asInstanceOf[MapStatus]
13771378
val execId = status.location.executorId
13781379
logDebug("ShuffleMapTask finished on " + execId)
1379-
if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {
1380-
// This task was for the currently running attempt of the stage. Since the task
1381-
// completed successfully from the perspective of the TaskSetManager, mark it as
1382-
// no longer pending (the TaskSetManager may consider the task complete even
1383-
// when the output needs to be ignored because the task's epoch is too small below.
1384-
// In this case, when pending partitions is empty, there will still be missing
1385-
// output locations, which will cause the DAGScheduler to resubmit the stage below.)
1386-
shuffleStage.pendingPartitions -= task.partitionId
1387-
}
13881380
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
13891381
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
13901382
} else {
@@ -1393,13 +1385,6 @@ private[spark] class DAGScheduler(
13931385
// available.
13941386
mapOutputTracker.registerMapOutput(
13951387
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
1396-
// Remove the task's partition from pending partitions. This may have already been
1397-
// done above, but will not have been done yet in cases where the task attempt was
1398-
// from an earlier attempt of the stage (i.e., not the attempt that's currently
1399-
// running). This allows the DAGScheduler to mark the stage as complete when one
1400-
// copy of each task has finished successfully, even if the currently active stage
1401-
// still has tasks running.
1402-
shuffleStage.pendingPartitions -= task.partitionId
14031388
}
14041389

14051390
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2492,6 +2492,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
24922492
runEvent(makeCompletionEvent(
24932493
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
24942494

2495+
// task(stageId=1, stageAttemptId=1, partitionId=1) should be marked completed when
2496+
// task(stageId=1, stageAttemptId=0, partitionId=1) finished
2497+
// ideally we would verify that but no way to get into task scheduler to verify
2498+
24952499
// Both tasks in rddB should be resubmitted, because none of them has succeeded truly.
24962500
// Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully.
24972501
// Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt
@@ -2501,19 +2505,21 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
25012505
runEvent(makeCompletionEvent(
25022506
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
25032507

2504-
// There should be no new attempt of stage submitted,
2505-
// because task(stageId=1, stageAttempt=1, partitionId=1) is still running in
2506-
// the current attempt (and hasn't completed successfully in any earlier attempts).
2507-
assert(taskSets.size === 4)
2508+
// At this point there should be no active task set for stageId=1 and we need
2509+
// to resubmit because the output from (stageId=1, stageAttemptId=0, partitionId=1)
2510+
// was ignored due to executor failure
2511+
assert(taskSets.size === 5)
2512+
assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2
2513+
&& taskSets(4).tasks.size === 1)
25082514

2509-
// Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
2515+
// Complete task(stageId=1, stageAttempt=2, partitionId=1) successfully.
25102516
runEvent(makeCompletionEvent(
2511-
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
2517+
taskSets(4).tasks(0), Success, makeMapStatus("hostB", 2)))
25122518

25132519
// Now the ResultStage should be submitted, because all of the tasks of rddB have
25142520
// completed successfully on alive executors.
2515-
assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
2516-
complete(taskSets(4), Seq(
2521+
assert(taskSets.size === 6 && taskSets(5).tasks(0).isInstanceOf[ResultTask[_, _]])
2522+
complete(taskSets(5), Seq(
25172523
(Success, 1),
25182524
(Success, 1)))
25192525
}

0 commit comments

Comments
 (0)