Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 4b88393

Browse files
caneGuyjerryshao
authored andcommitted
[SPARK-21922] Fix duration always updating when task failed but status is still RUN…
…NING ## What changes were proposed in this pull request? When driver quit abnormally which cause executor shutdown and task metrics can not be sent to driver for updating.In this case the status will always be 'RUNNING' and the duration on history UI will be 'CurrentTime - launchTime' which increase infinitely. We can fix this time by modify time of event log since this time has gotten when `FSHistoryProvider` fetch event log from File System. And the result picture is uploaded in [SPARK-21922](https://issues.apache.org/jira/browse/SPARK-21922). How to reproduce? (1) Submit a job to spark on yarn (2) Mock an oom(or other case can make driver quit abnormally) senario for driver (3) Make sure executor is running task when driver quitting (4) Open the history server and checkout result It is not a corner case since there are many such jobs in our current cluster. ## How was this patch tested? Deploy historyserver and open a job has this problem. Author: zhoukang <[email protected]> Closes apache#19132 from caneGuy/zhoukang/fix-duration.
1 parent 4e6fc69 commit 4b88393

File tree

7 files changed

+27
-11
lines changed

7 files changed

+27
-11
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
249249
val appSecManager = new SecurityManager(conf)
250250
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name,
251251
HistoryServer.getAttemptURI(appId, attempt.attemptId),
252-
attempt.startTime)
252+
Some(attempt.lastUpdated), attempt.startTime)
253253
// Do not call ui.bind() to avoid creating a new server for each application
254254
}
255255

core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private[v1] class AllStagesResource(ui: SparkUI) {
4747
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
4848
}
4949
} yield {
50+
stageUiData.lastUpdateTime = ui.lastUpdateTime
5051
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
5152
}
5253
}
@@ -69,7 +70,8 @@ private[v1] object AllStagesResource {
6970
}
7071

7172
val taskData = if (includeDetails) {
72-
Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } )
73+
Some(stageUiData.taskData.map { case (k, v) =>
74+
k -> convertTaskData(v, stageUiData.lastUpdateTime) })
7375
} else {
7476
None
7577
}
@@ -136,13 +138,13 @@ private[v1] object AllStagesResource {
136138
}
137139
}
138140

139-
def convertTaskData(uiData: TaskUIData): TaskData = {
141+
def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): TaskData = {
140142
new TaskData(
141143
taskId = uiData.taskInfo.taskId,
142144
index = uiData.taskInfo.index,
143145
attempt = uiData.taskInfo.attemptNumber,
144146
launchTime = new Date(uiData.taskInfo.launchTime),
145-
duration = uiData.taskDuration,
147+
duration = uiData.taskDuration(lastUpdateTime),
146148
executorId = uiData.taskInfo.executorId,
147149
host = uiData.taskInfo.host,
148150
status = uiData.taskInfo.status,

core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[v1] class OneStageResource(ui: SparkUI) {
3535
def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = {
3636
withStage(stageId) { stageAttempts =>
3737
stageAttempts.map { stage =>
38+
stage.ui.lastUpdateTime = ui.lastUpdateTime
3839
AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
3940
includeDetails = true)
4041
}
@@ -47,6 +48,7 @@ private[v1] class OneStageResource(ui: SparkUI) {
4748
@PathParam("stageId") stageId: Int,
4849
@PathParam("stageAttemptId") stageAttemptId: Int): StageData = {
4950
withStageAttempt(stageId, stageAttemptId) { stage =>
51+
stage.ui.lastUpdateTime = ui.lastUpdateTime
5052
AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
5153
includeDetails = true)
5254
}
@@ -81,7 +83,8 @@ private[v1] class OneStageResource(ui: SparkUI) {
8183
@DefaultValue("20") @QueryParam("length") length: Int,
8284
@DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = {
8385
withStageAttempt(stageId, stageAttemptId) { stage =>
84-
val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
86+
val tasks = stage.ui.taskData.values
87+
.map{ AllStagesResource.convertTaskData(_, ui.lastUpdateTime)}.toIndexedSeq
8588
.sorted(OneStageResource.ordering(sortBy))
8689
tasks.slice(offset, offset + length)
8790
}

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ private[spark] class SparkUI private (
5050
val operationGraphListener: RDDOperationGraphListener,
5151
var appName: String,
5252
val basePath: String,
53+
val lastUpdateTime: Option[Long] = None,
5354
val startTime: Long)
5455
extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
5556
conf, basePath, "SparkUI")
@@ -176,9 +177,11 @@ private[spark] object SparkUI {
176177
securityManager: SecurityManager,
177178
appName: String,
178179
basePath: String,
180+
lastUpdateTime: Option[Long],
179181
startTime: Long): SparkUI = {
180182
val sparkUI = create(
181-
None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
183+
None, conf, listenerBus, securityManager, appName, basePath,
184+
lastUpdateTime = lastUpdateTime, startTime = startTime)
182185

183186
val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
184187
Utils.getContextOrSparkClassLoader).asScala
@@ -204,6 +207,7 @@ private[spark] object SparkUI {
204207
appName: String,
205208
basePath: String = "",
206209
jobProgressListener: Option[JobProgressListener] = None,
210+
lastUpdateTime: Option[Long] = None,
207211
startTime: Long): SparkUI = {
208212

209213
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
@@ -226,6 +230,6 @@ private[spark] object SparkUI {
226230

227231
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
228232
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
229-
appName, basePath, startTime)
233+
appName, basePath, lastUpdateTime, startTime)
230234
}
231235
}

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
299299
stageData.hasShuffleRead,
300300
stageData.hasShuffleWrite,
301301
stageData.hasBytesSpilled,
302+
parent.lastUpdateTime,
302303
currentTime,
303304
pageSize = taskPageSize,
304305
sortColumn = taskSortColumn,
@@ -863,6 +864,7 @@ private[ui] class TaskDataSource(
863864
hasShuffleRead: Boolean,
864865
hasShuffleWrite: Boolean,
865866
hasBytesSpilled: Boolean,
867+
lastUpdateTime: Option[Long],
866868
currentTime: Long,
867869
pageSize: Int,
868870
sortColumn: String,
@@ -889,8 +891,9 @@ private[ui] class TaskDataSource(
889891
private def taskRow(taskData: TaskUIData): TaskTableRowData = {
890892
val info = taskData.taskInfo
891893
val metrics = taskData.metrics
892-
val duration = taskData.taskDuration.getOrElse(1L)
893-
val formatDuration = taskData.taskDuration.map(d => UIUtils.formatDuration(d)).getOrElse("")
894+
val duration = taskData.taskDuration(lastUpdateTime).getOrElse(1L)
895+
val formatDuration =
896+
taskData.taskDuration(lastUpdateTime).map(d => UIUtils.formatDuration(d)).getOrElse("")
894897
val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
895898
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
896899
val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
@@ -1154,6 +1157,7 @@ private[ui] class TaskPagedTable(
11541157
hasShuffleRead: Boolean,
11551158
hasShuffleWrite: Boolean,
11561159
hasBytesSpilled: Boolean,
1160+
lastUpdateTime: Option[Long],
11571161
currentTime: Long,
11581162
pageSize: Int,
11591163
sortColumn: String,
@@ -1179,6 +1183,7 @@ private[ui] class TaskPagedTable(
11791183
hasShuffleRead,
11801184
hasShuffleWrite,
11811185
hasBytesSpilled,
1186+
lastUpdateTime,
11821187
currentTime,
11831188
pageSize,
11841189
sortColumn,

core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
3030
val progressListener = parent.jobProgressListener
3131
val operationGraphListener = parent.operationGraphListener
3232
val executorsListener = parent.executorsListener
33+
val lastUpdateTime = parent.lastUpdateTime
3334

3435
attachPage(new AllStagesPage(this))
3536
attachPage(new StagePage(this))

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ private[spark] object UIData {
9797
var memoryBytesSpilled: Long = _
9898
var diskBytesSpilled: Long = _
9999
var isBlacklisted: Int = _
100+
var lastUpdateTime: Option[Long] = None
100101

101102
var schedulingPool: String = ""
102103
var description: Option[String] = None
@@ -133,9 +134,9 @@ private[spark] object UIData {
133134
_metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
134135
}
135136

136-
def taskDuration: Option[Long] = {
137+
def taskDuration(lastUpdateTime: Option[Long] = None): Option[Long] = {
137138
if (taskInfo.status == "RUNNING") {
138-
Some(_taskInfo.timeRunning(System.currentTimeMillis))
139+
Some(_taskInfo.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis)))
139140
} else {
140141
_metrics.map(_.executorRunTime)
141142
}

0 commit comments

Comments
 (0)