Skip to content

Commit e9d3ca0

Browse files
patrickbrownsyncMarcelo Vanzin
authored andcommitted
[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when cleaning up stages
## What changes were proposed in this pull request? * Update `AppStatusListener` `cleanupStages` method to remove tasks for those stages in a single pass instead of 1 for each stage. * This fixes an issue where the cleanupStages method would get backed up, causing a backup in the executor in ElementTrackingStore, resulting in stages and jobs not getting cleaned up properly. Tasks seem most susceptible to this as there are a lot of them, however a similar issue could arise in other locations the `KVStore` `view` method is used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` as it appears this interface and implementation can lead to multiple and inefficient traversals of the stored data. ## How was this patch tested? Using existing tests in AppStatusListenerSuite This is my original work and I license the work to the project under the project’s open source license. Closes apache#22883 from patrickbrownsync/cleanup-stages-fix. Authored-by: Patrick Brown <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent fc82222 commit e9d3ca0

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,16 +1073,6 @@ private[spark] class AppStatusListener(
10731073
kvstore.delete(e.getClass(), e.id)
10741074
}
10751075

1076-
val tasks = kvstore.view(classOf[TaskDataWrapper])
1077-
.index("stage")
1078-
.first(key)
1079-
.last(key)
1080-
.asScala
1081-
1082-
tasks.foreach { t =>
1083-
kvstore.delete(t.getClass(), t.taskId)
1084-
}
1085-
10861076
// Check whether there are remaining attempts for the same stage. If there aren't, then
10871077
// also delete the RDD graph data.
10881078
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
@@ -1105,6 +1095,15 @@ private[spark] class AppStatusListener(
11051095

11061096
cleanupCachedQuantiles(key)
11071097
}
1098+
1099+
// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow
1100+
val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala
1101+
val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet
1102+
tasks.foreach { t =>
1103+
if (keys.contains((t.stageId, t.stageAttemptId))) {
1104+
kvstore.delete(t.getClass(), t.taskId)
1105+
}
1106+
}
11081107
}
11091108

11101109
private def cleanupTasks(stage: LiveStage): Unit = {

0 commit comments

Comments
 (0)