Skip to content

Commit fbf62b7

Browse files
shahidki31Marcelo Vanzin
authored andcommitted
[SPARK-25451][SPARK-26100][CORE] Aggregated metrics table doesn't show the right number of the total tasks
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI. We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end. Tests to reproduce: ``` bin/spark-shell --master yarn --conf spark.executor.instances=3 sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect() ``` Before patch: ![screenshot from 2018-11-15 02-24-05](https://user-images.githubusercontent.com/23054875/48511776-b0d36480-e87d-11e8-89a8-ab97216e2c21.png) After patch: ![screenshot from 2018-11-15 02-32-38](https://user-images.githubusercontent.com/23054875/48512141-c39a6900-e87e-11e8-8535-903e1d11d13e.png) Closes apache#23038 from shahidki31/SPARK-25451. Authored-by: Shahid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 76ef02e commit fbf62b7

File tree

3 files changed

+64
-2
lines changed

3 files changed

+64
-2
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ private[spark] class AppStatusListener(
473473
val locality = event.taskInfo.taskLocality.toString()
474474
val count = stage.localitySummary.getOrElse(locality, 0L) + 1L
475475
stage.localitySummary = stage.localitySummary ++ Map(locality -> count)
476+
stage.activeTasksPerExecutor(event.taskInfo.executorId) += 1
476477
maybeUpdate(stage, now)
477478

478479
stage.jobs.foreach { job =>
@@ -558,6 +559,7 @@ private[spark] class AppStatusListener(
558559
if (killedDelta > 0) {
559560
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
560561
}
562+
stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1
561563
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
562564
val removeStage =
563565
stage.activeTasks == 0 &&
@@ -582,7 +584,11 @@ private[spark] class AppStatusListener(
582584
if (killedDelta > 0) {
583585
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
584586
}
585-
conditionalLiveUpdate(job, now, removeStage)
587+
if (removeStage) {
588+
update(job, now)
589+
} else {
590+
maybeUpdate(job, now)
591+
}
586592
}
587593

588594
val esummary = stage.executorSummary(event.taskInfo.executorId)
@@ -593,7 +599,16 @@ private[spark] class AppStatusListener(
593599
if (metricsDelta != null) {
594600
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
595601
}
596-
conditionalLiveUpdate(esummary, now, removeStage)
602+
603+
val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0
604+
605+
// If the last task of the executor finished, then update the esummary
606+
// for both live and history events.
607+
if (isLastTask) {
608+
update(esummary, now)
609+
} else {
610+
maybeUpdate(esummary, now)
611+
}
597612

598613
if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
599614
stage.cleaning = true

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,8 @@ private class LiveStage extends LiveEntity {
376376

377377
val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()
378378

379+
val activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0)
380+
379381
var blackListedExecutors = new HashSet[String]()
380382

381383
// Used for cleanup of tasks after they reach the configured limit. Not written to the store.

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,51 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
12731273
assert(allJobs.head.numFailedStages == 1)
12741274
}
12751275

1276+
test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
1277+
val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
1278+
1279+
val listener = new AppStatusListener(store, testConf, true)
1280+
1281+
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
1282+
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
1283+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
1284+
1285+
val tasks = createTasks(4, Array("1", "2"))
1286+
tasks.foreach { task =>
1287+
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
1288+
}
1289+
1290+
time += 1
1291+
tasks(0).markFinished(TaskState.FINISHED, time)
1292+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1293+
Success, tasks(0), null))
1294+
time += 1
1295+
tasks(1).markFinished(TaskState.FINISHED, time)
1296+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1297+
Success, tasks(1), null))
1298+
1299+
stage.failureReason = Some("Failed")
1300+
listener.onStageCompleted(SparkListenerStageCompleted(stage))
1301+
time += 1
1302+
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))
1303+
1304+
time += 1
1305+
tasks(2).markFinished(TaskState.FAILED, time)
1306+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1307+
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
1308+
time += 1
1309+
tasks(3).markFinished(TaskState.FAILED, time)
1310+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
1311+
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
1312+
1313+
val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
1314+
esummary.foreach { execSummary =>
1315+
assert(execSummary.failedTasks === 1)
1316+
assert(execSummary.succeededTasks === 1)
1317+
assert(execSummary.killedTasks === 0)
1318+
}
1319+
}
1320+
12761321
test("driver logs") {
12771322
val listener = new AppStatusListener(store, conf, true)
12781323

0 commit comments

Comments
 (0)