@@ -32,7 +32,6 @@ import org.apache.spark.status.api.v1
32
32
import org .apache .spark .storage ._
33
33
import org .apache .spark .ui .SparkUI
34
34
import org .apache .spark .ui .scope ._
35
- import org .apache .spark .util .kvstore .KVStore
36
35
37
36
/**
38
37
* A Spark listener that writes application information to a data store. The types written to the
@@ -42,7 +41,7 @@ import org.apache.spark.util.kvstore.KVStore
42
41
* unfinished tasks can be more accurately calculated (see SPARK-21922).
43
42
*/
44
43
private [spark] class AppStatusListener (
45
- kvstore : KVStore ,
44
+ kvstore : ElementTrackingStore ,
46
45
conf : SparkConf ,
47
46
live : Boolean ,
48
47
lastUpdateTime : Option [Long ] = None ) extends SparkListener with Logging {
@@ -51,13 +50,15 @@ private[spark] class AppStatusListener(
51
50
52
51
private var sparkVersion = SPARK_VERSION
53
52
private var appInfo : v1.ApplicationInfo = null
53
+ private var appSummary = new AppSummary (0 , 0 )
54
54
private var coresPerTask : Int = 1
55
55
56
56
// How often to update live entities. -1 means "never update" when replaying applications,
57
57
// meaning only the last write will happen. For live applications, this avoids a few
58
58
// operations that we can live without when rapidly processing incoming task events.
59
59
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD ) else - 1L
60
60
61
+ private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE )
61
62
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES )
62
63
63
64
// Keep track of live entities, so that task metrics can be efficiently updated (without
@@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
68
69
private val liveTasks = new HashMap [Long , LiveTask ]()
69
70
private val liveRDDs = new HashMap [Int , LiveRDD ]()
70
71
private val pools = new HashMap [String , SchedulerPool ]()
72
+ // Keep the active executor count as a separate variable to avoid having to do synchronization
73
+ // around liveExecutors.
74
+ @ volatile private var activeExecutorCount = 0
71
75
72
- override def onOtherEvent (event : SparkListenerEvent ): Unit = event match {
73
- case SparkListenerLogStart (version) => sparkVersion = version
74
- case _ =>
76
+ kvstore.addTrigger(classOf [ExecutorSummaryWrapper ], conf.get(MAX_RETAINED_DEAD_EXECUTORS ))
77
+ { count => cleanupExecutors(count) }
78
+
79
+ kvstore.addTrigger(classOf [JobDataWrapper ], conf.get(MAX_RETAINED_JOBS )) { count =>
80
+ cleanupJobs(count)
81
+ }
82
+
83
+ kvstore.addTrigger(classOf [StageDataWrapper ], conf.get(MAX_RETAINED_STAGES )) { count =>
84
+ cleanupStages(count)
85
+ }
86
+
87
+ kvstore.onFlush {
88
+ if (! live) {
89
+ flush()
90
+ }
75
91
}
76
92
77
93
override def onApplicationStart (event : SparkListenerApplicationStart ): Unit = {
@@ -97,6 +113,7 @@ private[spark] class AppStatusListener(
97
113
Seq (attempt))
98
114
99
115
kvstore.write(new ApplicationInfoWrapper (appInfo))
116
+ kvstore.write(appSummary)
100
117
}
101
118
102
119
override def onEnvironmentUpdate (event : SparkListenerEnvironmentUpdate ): Unit = {
@@ -158,10 +175,11 @@ private[spark] class AppStatusListener(
158
175
override def onExecutorRemoved (event : SparkListenerExecutorRemoved ): Unit = {
159
176
liveExecutors.remove(event.executorId).foreach { exec =>
160
177
val now = System .nanoTime()
178
+ activeExecutorCount = math.max(0 , activeExecutorCount - 1 )
161
179
exec.isActive = false
162
180
exec.removeTime = new Date (event.time)
163
181
exec.removeReason = event.reason
164
- update(exec, now)
182
+ update(exec, now, last = true )
165
183
166
184
// Remove all RDD distributions that reference the removed executor, in case there wasn't
167
185
// a corresponding event.
@@ -290,8 +308,11 @@ private[spark] class AppStatusListener(
290
308
}
291
309
292
310
job.completionTime = if (event.time > 0 ) Some (new Date (event.time)) else None
293
- update(job, now)
311
+ update(job, now, last = true )
294
312
}
313
+
314
+ appSummary = new AppSummary (appSummary.numCompletedJobs + 1 , appSummary.numCompletedStages)
315
+ kvstore.write(appSummary)
295
316
}
296
317
297
318
override def onStageSubmitted (event : SparkListenerStageSubmitted ): Unit = {
@@ -350,6 +371,13 @@ private[spark] class AppStatusListener(
350
371
job.activeTasks += 1
351
372
maybeUpdate(job, now)
352
373
}
374
+
375
+ if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && ! stage.cleaning) {
376
+ stage.cleaning = true
377
+ kvstore.doAsync {
378
+ cleanupTasks(stage)
379
+ }
380
+ }
353
381
}
354
382
355
383
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@@ -449,6 +477,13 @@ private[spark] class AppStatusListener(
449
477
esummary.metrics.update(metricsDelta)
450
478
}
451
479
maybeUpdate(esummary, now)
480
+
481
+ if (! stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
482
+ stage.cleaning = true
483
+ kvstore.doAsync {
484
+ cleanupTasks(stage)
485
+ }
486
+ }
452
487
}
453
488
454
489
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@@ -516,8 +551,11 @@ private[spark] class AppStatusListener(
516
551
}
517
552
518
553
stage.executorSummaries.values.foreach(update(_, now))
519
- update(stage, now)
554
+ update(stage, now, last = true )
520
555
}
556
+
557
+ appSummary = new AppSummary (appSummary.numCompletedJobs, appSummary.numCompletedStages + 1 )
558
+ kvstore.write(appSummary)
521
559
}
522
560
523
561
override def onBlockManagerAdded (event : SparkListenerBlockManagerAdded ): Unit = {
@@ -573,7 +611,7 @@ private[spark] class AppStatusListener(
573
611
}
574
612
575
613
/** Flush all live entities' data to the underlying store. */
576
- def flush (): Unit = {
614
+ private def flush (): Unit = {
577
615
val now = System .nanoTime()
578
616
liveStages.values.asScala.foreach { stage =>
579
617
update(stage, now)
@@ -708,7 +746,10 @@ private[spark] class AppStatusListener(
708
746
}
709
747
710
748
private def getOrCreateExecutor (executorId : String , addTime : Long ): LiveExecutor = {
711
- liveExecutors.getOrElseUpdate(executorId, new LiveExecutor (executorId, addTime))
749
+ liveExecutors.getOrElseUpdate(executorId, {
750
+ activeExecutorCount += 1
751
+ new LiveExecutor (executorId, addTime)
752
+ })
712
753
}
713
754
714
755
private def updateStreamBlock (event : SparkListenerBlockUpdated , stream : StreamBlockId ): Unit = {
@@ -754,8 +795,8 @@ private[spark] class AppStatusListener(
754
795
}
755
796
}
756
797
757
- private def update (entity : LiveEntity , now : Long ): Unit = {
758
- entity.write(kvstore, now)
798
+ private def update (entity : LiveEntity , now : Long , last : Boolean = false ): Unit = {
799
+ entity.write(kvstore, now, checkTriggers = last )
759
800
}
760
801
761
802
/** Update a live entity only if it hasn't been updated in the last configured period. */
@@ -772,4 +813,127 @@ private[spark] class AppStatusListener(
772
813
}
773
814
}
774
815
816
+ private def cleanupExecutors (count : Long ): Unit = {
817
+ // Because the limit is on the number of *dead* executors, we need to calculate whether
818
+ // there are actually enough dead executors to be deleted.
819
+ val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS )
820
+ val dead = count - activeExecutorCount
821
+
822
+ if (dead > threshold) {
823
+ val countToDelete = calculateNumberToRemove(dead, threshold)
824
+ val toDelete = kvstore.view(classOf [ExecutorSummaryWrapper ]).index(" active" )
825
+ .max(countToDelete).first(false ).last(false ).asScala.toSeq
826
+ toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
827
+ }
828
+ }
829
+
830
+ private def cleanupJobs (count : Long ): Unit = {
831
+ val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS ))
832
+ if (countToDelete <= 0L ) {
833
+ return
834
+ }
835
+
836
+ val toDelete = KVUtils .viewToSeq(kvstore.view(classOf [JobDataWrapper ]),
837
+ countToDelete.toInt) { j =>
838
+ j.info.status != JobExecutionStatus .RUNNING && j.info.status != JobExecutionStatus .UNKNOWN
839
+ }
840
+ toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
841
+ }
842
+
843
+ private def cleanupStages (count : Long ): Unit = {
844
+ val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES ))
845
+ if (countToDelete <= 0L ) {
846
+ return
847
+ }
848
+
849
+ val stages = KVUtils .viewToSeq(kvstore.view(classOf [StageDataWrapper ]),
850
+ countToDelete.toInt) { s =>
851
+ s.info.status != v1.StageStatus .ACTIVE && s.info.status != v1.StageStatus .PENDING
852
+ }
853
+
854
+ stages.foreach { s =>
855
+ val key = s.id
856
+ kvstore.delete(s.getClass(), key)
857
+
858
+ val execSummaries = kvstore.view(classOf [ExecutorStageSummaryWrapper ])
859
+ .index(" stage" )
860
+ .first(key)
861
+ .last(key)
862
+ .asScala
863
+ .toSeq
864
+ execSummaries.foreach { e =>
865
+ kvstore.delete(e.getClass(), e.id)
866
+ }
867
+
868
+ val tasks = kvstore.view(classOf [TaskDataWrapper ])
869
+ .index(" stage" )
870
+ .first(key)
871
+ .last(key)
872
+ .asScala
873
+
874
+ tasks.foreach { t =>
875
+ kvstore.delete(t.getClass(), t.info.taskId)
876
+ }
877
+
878
+ // Check whether there are remaining attempts for the same stage. If there aren't, then
879
+ // also delete the RDD graph data.
880
+ val remainingAttempts = kvstore.view(classOf [StageDataWrapper ])
881
+ .index(" stageId" )
882
+ .first(s.stageId)
883
+ .last(s.stageId)
884
+ .closeableIterator()
885
+
886
+ val hasMoreAttempts = try {
887
+ remainingAttempts.asScala.exists { other =>
888
+ other.info.attemptId != s.info.attemptId
889
+ }
890
+ } finally {
891
+ remainingAttempts.close()
892
+ }
893
+
894
+ if (! hasMoreAttempts) {
895
+ kvstore.delete(classOf [RDDOperationGraphWrapper ], s.stageId)
896
+ }
897
+ }
898
+ }
899
+
900
+ private def cleanupTasks (stage : LiveStage ): Unit = {
901
+ val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
902
+ if (countToDelete > 0 ) {
903
+ val stageKey = Array (stage.info.stageId, stage.info.attemptId)
904
+ val view = kvstore.view(classOf [TaskDataWrapper ]).index(" stage" ).first(stageKey)
905
+ .last(stageKey)
906
+
907
+ // Try to delete finished tasks only.
908
+ val toDelete = KVUtils .viewToSeq(view, countToDelete) { t =>
909
+ ! live || t.info.status != TaskState .RUNNING .toString()
910
+ }
911
+ toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
912
+ stage.savedTasks.addAndGet(- toDelete.size)
913
+
914
+ // If there are more running tasks than the configured limit, delete running tasks. This
915
+ // should be extremely rare since the limit should generally far exceed the number of tasks
916
+ // that can run in parallel.
917
+ val remaining = countToDelete - toDelete.size
918
+ if (remaining > 0 ) {
919
+ val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
920
+ runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
921
+ stage.savedTasks.addAndGet(- remaining)
922
+ }
923
+ }
924
+ stage.cleaning = false
925
+ }
926
+
927
+ /**
928
+ * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
929
+ * asynchronously, this method may return 0 in case enough items have been deleted already.
930
+ */
931
+ private def calculateNumberToRemove (dataSize : Long , retainedSize : Long ): Long = {
932
+ if (dataSize > retainedSize) {
933
+ math.max(retainedSize / 10L , dataSize - retainedSize)
934
+ } else {
935
+ 0L
936
+ }
937
+ }
938
+
775
939
}
0 commit comments