Skip to content

Commit 39a02d8

Browse files
ankuriitgMarcelo Vanzin
authored andcommitted
[SPARK-24415][CORE] Fixed the aggregated stage metrics by retaining stage objects in liveStages until all tasks are complete
The problem occurs because stage object is removed from liveStages in AppStatusListener onStageCompletion. Because of this any onTaskEnd event received after onStageCompletion event do not update stage metrics. The fix is to retain stage objects in liveStages until all tasks are complete. 1. Fixed the reproducible example posted in the JIRA 2. Added unit test Closes apache#22209 from ankuriitg/ankurgupta/SPARK-24415. Authored-by: ankurgupta <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 8440e30 commit 39a02d8

File tree

3 files changed

+108
-17
lines changed

3 files changed

+108
-17
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -350,11 +350,20 @@ private[spark] class AppStatusListener(
350350
val e = it.next()
351351
if (job.stageIds.contains(e.getKey()._1)) {
352352
val stage = e.getValue()
353-
stage.status = v1.StageStatus.SKIPPED
354-
job.skippedStages += stage.info.stageId
355-
job.skippedTasks += stage.info.numTasks
356-
it.remove()
357-
update(stage, now)
353+
if (v1.StageStatus.PENDING.equals(stage.status)) {
354+
stage.status = v1.StageStatus.SKIPPED
355+
job.skippedStages += stage.info.stageId
356+
job.skippedTasks += stage.info.numTasks
357+
job.activeStages -= 1
358+
359+
pools.get(stage.schedulingPool).foreach { pool =>
360+
pool.stageIds = pool.stageIds - stage.info.stageId
361+
update(pool, now)
362+
}
363+
364+
it.remove()
365+
update(stage, now, last = true)
366+
}
358367
}
359368
}
360369

@@ -506,7 +515,16 @@ private[spark] class AppStatusListener(
506515
if (killedDelta > 0) {
507516
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
508517
}
509-
maybeUpdate(stage, now)
518+
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
519+
val removeStage =
520+
stage.activeTasks == 0 &&
521+
(v1.StageStatus.COMPLETE.equals(stage.status) ||
522+
v1.StageStatus.FAILED.equals(stage.status))
523+
if (removeStage) {
524+
update(stage, now, last = true)
525+
} else {
526+
maybeUpdate(stage, now)
527+
}
510528

511529
// Store both stage ID and task index in a single long variable for tracking at job level.
512530
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index
@@ -521,7 +539,7 @@ private[spark] class AppStatusListener(
521539
if (killedDelta > 0) {
522540
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
523541
}
524-
maybeUpdate(job, now)
542+
conditionalLiveUpdate(job, now, removeStage)
525543
}
526544

527545
val esummary = stage.executorSummary(event.taskInfo.executorId)
@@ -532,14 +550,17 @@ private[spark] class AppStatusListener(
532550
if (metricsDelta != null) {
533551
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
534552
}
535-
maybeUpdate(esummary, now)
553+
conditionalLiveUpdate(esummary, now, removeStage)
536554

537555
if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
538556
stage.cleaning = true
539557
kvstore.doAsync {
540558
cleanupTasks(stage)
541559
}
542560
}
561+
if (removeStage) {
562+
liveStages.remove((event.stageId, event.stageAttemptId))
563+
}
543564
}
544565

545566
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@@ -564,17 +585,13 @@ private[spark] class AppStatusListener(
564585

565586
// Force an update on live applications when the number of active tasks reaches 0. This is
566587
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
567-
if (exec.activeTasks == 0) {
568-
liveUpdate(exec, now)
569-
} else {
570-
maybeUpdate(exec, now)
571-
}
588+
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
572589
}
573590
}
574591

575592
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
576593
val maybeStage =
577-
Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
594+
Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
578595
maybeStage.foreach { stage =>
579596
val now = System.nanoTime()
580597
stage.info = event.stageInfo
@@ -608,14 +625,20 @@ private[spark] class AppStatusListener(
608625
}
609626

610627
stage.executorSummaries.values.foreach(update(_, now))
611-
update(stage, now, last = true)
612628

613629
val executorIdsForStage = stage.blackListedExecutors
614630
executorIdsForStage.foreach { executorId =>
615631
liveExecutors.get(executorId).foreach { exec =>
616632
removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
617633
}
618634
}
635+
636+
// Remove stage only if there are no active tasks remaining
637+
val removeStage = stage.activeTasks == 0
638+
update(stage, now, last = removeStage)
639+
if (removeStage) {
640+
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
641+
}
619642
}
620643

621644
appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
@@ -882,6 +905,14 @@ private[spark] class AppStatusListener(
882905
}
883906
}
884907

908+
private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
909+
if (condition) {
910+
liveUpdate(entity, now)
911+
} else {
912+
maybeUpdate(entity, now)
913+
}
914+
}
915+
885916
private def cleanupExecutors(count: Long): Unit = {
886917
// Because the limit is on the number of *dead* executors, we need to calculate whether
887918
// there are actually enough dead executors to be deleted.

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
11901190
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
11911191
}
11921192

1193+
test("SPARK-24415: update metrics for tasks that finish late") {
1194+
val listener = new AppStatusListener(store, conf, true)
1195+
1196+
val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
1197+
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
1198+
1199+
// Start job
1200+
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))
1201+
1202+
// Start 2 stages
1203+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
1204+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
1205+
1206+
// Start 2 Tasks
1207+
val tasks = createTasks(2, Array("1"))
1208+
tasks.foreach { task =>
1209+
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
1210+
}
1211+
1212+
// Task 1 Finished
1213+
time += 1
1214+
tasks(0).markFinished(TaskState.FINISHED, time)
1215+
listener.onTaskEnd(
1216+
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
1217+
1218+
// Stage 1 Completed
1219+
stage1.failureReason = Some("Failed")
1220+
listener.onStageCompleted(SparkListenerStageCompleted(stage1))
1221+
1222+
// Stop job 1
1223+
time += 1
1224+
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
1225+
1226+
// Task 2 Killed
1227+
time += 1
1228+
tasks(1).markFinished(TaskState.FINISHED, time)
1229+
listener.onTaskEnd(
1230+
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
1231+
TaskKilled(reason = "Killed"), tasks(1), null))
1232+
1233+
// Ensure killed task metrics are updated
1234+
val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
1235+
val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
1236+
assert(failedStages.size == 1)
1237+
assert(failedStages.head.numKilledTasks == 1)
1238+
assert(failedStages.head.numCompleteTasks == 1)
1239+
1240+
val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
1241+
assert(allJobs.size == 1)
1242+
assert(allJobs.head.numKilledTasks == 1)
1243+
assert(allJobs.head.numCompletedTasks == 1)
1244+
assert(allJobs.head.numActiveStages == 1)
1245+
assert(allJobs.head.numFailedStages == 1)
1246+
}
1247+
11931248
test("driver logs") {
11941249
val listener = new AppStatusListener(store, conf, true)
11951250

streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,12 @@ class UISeleniumSuite
7777
inputStream.foreachRDD { rdd =>
7878
rdd.foreach(_ => {})
7979
try {
80-
rdd.foreach(_ => throw new RuntimeException("Oops"))
80+
rdd.foreach { _ =>
81+
// Failing the task with id 15 to ensure only one task fails
82+
if (TaskContext.get.taskAttemptId() % 15 == 0) {
83+
throw new RuntimeException("Oops")
84+
}
85+
}
8186
} catch {
8287
case e: SparkException if e.getMessage.contains("Oops") =>
8388
}
@@ -166,7 +171,7 @@ class UISeleniumSuite
166171

167172
// Check job progress
168173
findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
169-
List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
174+
List("4/4", "4/4", "4/4", "3/4 (1 failed)"))
170175

171176
// Check stacktrace
172177
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq

0 commit comments

Comments
 (0)