18
18
package org .apache .spark .deploy .history
19
19
20
20
import java .io .{File , FileNotFoundException , IOException }
21
- import java .util .{Date , ServiceLoader , UUID }
21
+ import java .util .{Date , ServiceLoader }
22
22
import java .util .concurrent .{ExecutorService , TimeUnit }
23
23
import java .util .zip .{ZipEntry , ZipOutputStream }
24
24
25
25
import scala .collection .JavaConverters ._
26
26
import scala .collection .mutable
27
+ import scala .io .Source
27
28
import scala .util .Try
28
29
import scala .xml .Node
29
30
@@ -58,10 +59,10 @@ import org.apache.spark.util.kvstore._
58
59
*
59
60
* == How new and updated attempts are detected ==
60
61
*
61
- * - New attempts are detected in [[checkForLogs ]]: the log dir is scanned, and any
62
- * entries in the log dir whose modification time is greater than the last scan time
63
- * are considered new or updated. These are replayed to create a new attempt info entry
64
- * and update or create a matching application info element in the list of applications.
62
+ * - New attempts are detected in [[checkForLogs ]]: the log dir is scanned, and any entries in the
63
+ * log dir whose size changed since the last scan time are considered new or updated. These are
64
+ * replayed to create a new attempt info entry and update or create a matching application info
65
+ * element in the list of applications.
65
66
* - Updated attempts are also found in [[checkForLogs ]] -- if the attempt's log file has grown, the
66
67
* attempt is replaced by another one with a larger log size.
67
68
*
@@ -125,6 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
125
126
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger (0 )
126
127
127
128
private val storePath = conf.get(LOCAL_STORE_DIR ).map(new File (_))
129
+ private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING )
128
130
129
131
// Visible for testing.
130
132
private [history] val listing : KVStore = storePath.map { path =>
@@ -402,13 +404,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
402
404
*/
403
405
private [history] def checkForLogs (): Unit = {
404
406
try {
405
- val newLastScanTime = getNewLastScanTime ()
407
+ val newLastScanTime = clock.getTimeMillis ()
406
408
logDebug(s " Scanning $logDir with lastScanTime== $lastScanTime" )
407
409
408
410
val updated = Option (fs.listStatus(new Path (logDir))).map(_.toSeq).getOrElse(Nil )
409
411
.filter { entry =>
410
412
! entry.isDirectory() &&
411
- // FsHistoryProvider generates a hidden file which can't be read. Accidentally
413
+ // FsHistoryProvider used to generate a hidden file which can't be read. Accidentally
412
414
// reading a garbage file is safe, but we would log an error which can be scary to
413
415
// the end-user.
414
416
! entry.getPath().getName().startsWith(" ." ) &&
@@ -417,15 +419,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
417
419
.filter { entry =>
418
420
try {
419
421
val info = listing.read(classOf [LogInfo ], entry.getPath().toString())
420
- if (info.fileSize < entry.getLen()) {
421
- // Log size has changed, it should be parsed.
422
- true
423
- } else {
422
+
423
+ if (info.appId.isDefined) {
424
424
// If the SHS view has a valid application, update the time the file was last seen so
425
- // that the entry is not deleted from the SHS listing.
426
- if (info.appId.isDefined) {
427
- listing.write(info.copy(lastProcessed = newLastScanTime))
425
+ // that the entry is not deleted from the SHS listing. Also update the file size, in
426
+ // case the code below decides we don't need to parse the log.
427
+ listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen()))
428
+ }
429
+
430
+ if (info.fileSize < entry.getLen()) {
431
+ if (info.appId.isDefined && fastInProgressParsing) {
432
+ // When fast in-progress parsing is on, we don't need to re-parse when the
433
+ // size changes, but we do need to invalidate any existing UIs.
434
+ invalidateUI(info.appId.get, info.attemptId)
435
+ false
436
+ } else {
437
+ true
428
438
}
439
+ } else {
429
440
false
430
441
}
431
442
} catch {
@@ -449,7 +460,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
449
460
val tasks = updated.map { entry =>
450
461
try {
451
462
replayExecutor.submit(new Runnable {
452
- override def run (): Unit = mergeApplicationListing(entry, newLastScanTime)
463
+ override def run (): Unit = mergeApplicationListing(entry, newLastScanTime, true )
453
464
})
454
465
} catch {
455
466
// let the iteration over the updated entries break, since an exception on
@@ -542,25 +553,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
542
553
}
543
554
}
544
555
545
- private [history] def getNewLastScanTime (): Long = {
546
- val fileName = " ." + UUID .randomUUID().toString
547
- val path = new Path (logDir, fileName)
548
- val fos = fs.create(path)
549
-
550
- try {
551
- fos.close()
552
- fs.getFileStatus(path).getModificationTime
553
- } catch {
554
- case e : Exception =>
555
- logError(" Exception encountered when attempting to update last scan time" , e)
556
- lastScanTime.get()
557
- } finally {
558
- if (! fs.delete(path, true )) {
559
- logWarning(s " Error deleting ${path}" )
560
- }
561
- }
562
- }
563
-
564
556
override def writeEventLogs (
565
557
appId : String ,
566
558
attemptId : Option [String ],
@@ -607,7 +599,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
607
599
/**
608
600
* Replay the given log file, saving the application in the listing db.
609
601
*/
610
- protected def mergeApplicationListing (fileStatus : FileStatus , scanTime : Long ): Unit = {
602
+ protected def mergeApplicationListing (
603
+ fileStatus : FileStatus ,
604
+ scanTime : Long ,
605
+ enableOptimizations : Boolean ): Unit = {
611
606
val eventsFilter : ReplayEventsFilter = { eventString =>
612
607
eventString.startsWith(APPL_START_EVENT_PREFIX ) ||
613
608
eventString.startsWith(APPL_END_EVENT_PREFIX ) ||
@@ -616,32 +611,118 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
616
611
}
617
612
618
613
val logPath = fileStatus.getPath()
614
+ val appCompleted = isCompleted(logPath.getName())
615
+ val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE )
616
+
617
+ // Enable halt support in listener if:
618
+ // - app in progress && fast parsing enabled
619
+ // - skipping to end event is enabled (regardless of in-progress state)
620
+ val shouldHalt = enableOptimizations &&
621
+ ((! appCompleted && fastInProgressParsing) || reparseChunkSize > 0 )
622
+
619
623
val bus = new ReplayListenerBus ()
620
- val listener = new AppListingListener (fileStatus, clock)
624
+ val listener = new AppListingListener (fileStatus, clock, shouldHalt )
621
625
bus.addListener(listener)
622
- replay(fileStatus, bus, eventsFilter = eventsFilter)
623
-
624
- val (appId, attemptId) = listener.applicationInfo match {
625
- case Some (app) =>
626
- // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
627
- // discussion on the UI lifecycle.
628
- synchronized {
629
- activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
630
- ui.invalidate()
631
- ui.ui.store.close()
626
+
627
+ logInfo(s " Parsing $logPath for listing data... " )
628
+ Utils .tryWithResource(EventLoggingListener .openEventLog(logPath, fs)) { in =>
629
+ bus.replay(in, logPath.toString, ! appCompleted, eventsFilter)
630
+ }
631
+
632
+ // If enabled above, the listing listener will halt parsing when there's enough information to
633
+ // create a listing entry. When the app is completed, or fast parsing is disabled, we still need
634
+ // to replay until the end of the log file to try to find the app end event. Instead of reading
635
+ // and parsing line by line, this code skips bytes from the underlying stream so that it is
636
+ // positioned somewhere close to the end of the log file.
637
+ //
638
+ // Because the application end event is written while some Spark subsystems such as the
639
+ // scheduler are still active, there is no guarantee that the end event will be the last
640
+ // in the log. So, to be safe, the code uses a configurable chunk to be re-parsed at
641
+ // the end of the file, and retries parsing the whole log later if the needed data is
642
+ // still not found.
643
+ //
644
+ // Note that skipping bytes in compressed files is still not cheap, but there are still some
645
+ // minor gains over the normal log parsing done by the replay bus.
646
+ //
647
+ // This code re-opens the file so that it knows where it's skipping to. This isn't as cheap as
648
+ // just skipping from the current position, but there isn't a a good way to detect what the
649
+ // current position is, since the replay listener bus buffers data internally.
650
+ val lookForEndEvent = shouldHalt && (appCompleted || ! fastInProgressParsing)
651
+ if (lookForEndEvent && listener.applicationInfo.isDefined) {
652
+ Utils .tryWithResource(EventLoggingListener .openEventLog(logPath, fs)) { in =>
653
+ val target = fileStatus.getLen() - reparseChunkSize
654
+ if (target > 0 ) {
655
+ logInfo(s " Looking for end event; skipping $target bytes from $logPath... " )
656
+ var skipped = 0L
657
+ while (skipped < target) {
658
+ skipped += in.skip(target - skipped)
632
659
}
633
660
}
634
661
662
+ val source = Source .fromInputStream(in).getLines()
663
+
664
+ // Because skipping may leave the stream in the middle of a line, read the next line
665
+ // before replaying.
666
+ if (target > 0 ) {
667
+ source.next()
668
+ }
669
+
670
+ bus.replay(source, logPath.toString, ! appCompleted, eventsFilter)
671
+ }
672
+ }
673
+
674
+ logInfo(s " Finished parsing $logPath" )
675
+
676
+ listener.applicationInfo match {
677
+ case Some (app) if ! lookForEndEvent || app.attempts.head.info.completed =>
678
+ // In this case, we either didn't care about the end event, or we found it. So the
679
+ // listing data is good.
680
+ invalidateUI(app.info.id, app.attempts.head.info.attemptId)
635
681
addListing(app)
636
- (Some (app.info.id), app.attempts.head.info.attemptId)
682
+ listing.write(LogInfo (logPath.toString(), scanTime, Some (app.info.id),
683
+ app.attempts.head.info.attemptId, fileStatus.getLen()))
684
+
685
+ // For a finished log, remove the corresponding "in progress" entry from the listing DB if
686
+ // the file is really gone.
687
+ if (appCompleted) {
688
+ val inProgressLog = logPath.toString() + EventLoggingListener .IN_PROGRESS
689
+ try {
690
+ // Fetch the entry first to avoid an RPC when it's already removed.
691
+ listing.read(classOf [LogInfo ], inProgressLog)
692
+ if (! fs.isFile(new Path (inProgressLog))) {
693
+ listing.delete(classOf [LogInfo ], inProgressLog)
694
+ }
695
+ } catch {
696
+ case _ : NoSuchElementException =>
697
+ }
698
+ }
699
+
700
+ case Some (_) =>
701
+ // In this case, the attempt is still not marked as finished but was expected to. This can
702
+ // mean the end event is before the configured threshold, so call the method again to
703
+ // re-parse the whole log.
704
+ logInfo(s " Reparsing $logPath since end event was not found. " )
705
+ mergeApplicationListing(fileStatus, scanTime, false )
637
706
638
707
case _ =>
639
708
// If the app hasn't written down its app ID to the logs, still record the entry in the
640
709
// listing db, with an empty ID. This will make the log eligible for deletion if the app
641
710
// does not make progress after the configured max log age.
642
- (None , None )
711
+ listing.write(LogInfo (logPath.toString(), scanTime, None , None , fileStatus.getLen()))
712
+ }
713
+ }
714
+
715
+ /**
716
+ * Invalidate an existing UI for a given app attempt. See LoadedAppUI for a discussion on the
717
+ * UI lifecycle.
718
+ */
719
+ private def invalidateUI (appId : String , attemptId : Option [String ]): Unit = {
720
+ synchronized {
721
+ activeUIs.get((appId, attemptId)).foreach { ui =>
722
+ ui.invalidate()
723
+ ui.ui.store.close()
724
+ }
643
725
}
644
- listing.write(LogInfo (logPath.toString(), scanTime, appId, attemptId, fileStatus.getLen()))
645
726
}
646
727
647
728
/**
@@ -696,29 +777,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
696
777
}
697
778
}
698
779
699
- /**
700
- * Replays the events in the specified log file on the supplied `ReplayListenerBus`.
701
- * `ReplayEventsFilter` determines what events are replayed.
702
- */
703
- private def replay (
704
- eventLog : FileStatus ,
705
- bus : ReplayListenerBus ,
706
- eventsFilter : ReplayEventsFilter = SELECT_ALL_FILTER ): Unit = {
707
- val logPath = eventLog.getPath()
708
- val isCompleted = ! logPath.getName().endsWith(EventLoggingListener .IN_PROGRESS )
709
- logInfo(s " Replaying log path: $logPath" )
710
- // Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
711
- // and when we read the file here. That is OK -- it may result in an unnecessary refresh
712
- // when there is no update, but will not result in missing an update. We *must* prevent
713
- // an error the other way -- if we report a size bigger (ie later) than the file that is
714
- // actually read, we may never refresh the app. FileStatus is guaranteed to be static
715
- // after it's created, so we get a file size that is no bigger than what is actually read.
716
- Utils .tryWithResource(EventLoggingListener .openEventLog(logPath, fs)) { in =>
717
- bus.replay(in, logPath.toString, ! isCompleted, eventsFilter)
718
- logInfo(s " Finished parsing $logPath" )
719
- }
720
- }
721
-
722
780
/**
723
781
* Rebuilds the application state store from its event log.
724
782
*/
@@ -741,8 +799,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
741
799
} replayBus.addListener(listener)
742
800
743
801
try {
744
- replay(eventLog, replayBus)
802
+ val path = eventLog.getPath()
803
+ logInfo(s " Parsing $path to re-build UI... " )
804
+ Utils .tryWithResource(EventLoggingListener .openEventLog(path, fs)) { in =>
805
+ replayBus.replay(in, path.toString(), maybeTruncated = ! isCompleted(path.toString()))
806
+ }
745
807
trackingStore.close(false )
808
+ logInfo(s " Finished parsing $path" )
746
809
} catch {
747
810
case e : Exception =>
748
811
Utils .tryLogNonFatalError {
@@ -881,6 +944,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
881
944
}
882
945
}
883
946
947
+ private def isCompleted (name : String ): Boolean = {
948
+ ! name.endsWith(EventLoggingListener .IN_PROGRESS )
949
+ }
950
+
884
951
}
885
952
886
953
private [history] object FsHistoryProvider {
@@ -945,11 +1012,17 @@ private[history] class ApplicationInfoWrapper(
945
1012
946
1013
}
947
1014
948
- private [history] class AppListingListener (log : FileStatus , clock : Clock ) extends SparkListener {
1015
+ private [history] class AppListingListener (
1016
+ log : FileStatus ,
1017
+ clock : Clock ,
1018
+ haltEnabled : Boolean ) extends SparkListener {
949
1019
950
1020
private val app = new MutableApplicationInfo ()
951
1021
private val attempt = new MutableAttemptInfo (log.getPath().getName(), log.getLen())
952
1022
1023
+ private var gotEnvUpdate = false
1024
+ private var halted = false
1025
+
953
1026
override def onApplicationStart (event : SparkListenerApplicationStart ): Unit = {
954
1027
app.id = event.appId.orNull
955
1028
app.name = event.appName
@@ -958,6 +1031,8 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
958
1031
attempt.startTime = new Date (event.time)
959
1032
attempt.lastUpdated = new Date (clock.getTimeMillis())
960
1033
attempt.sparkUser = event.sparkUser
1034
+
1035
+ checkProgress()
961
1036
}
962
1037
963
1038
override def onApplicationEnd (event : SparkListenerApplicationEnd ): Unit = {
@@ -968,11 +1043,18 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
968
1043
}
969
1044
970
1045
override def onEnvironmentUpdate (event : SparkListenerEnvironmentUpdate ): Unit = {
971
- val allProperties = event.environmentDetails(" Spark Properties" ).toMap
972
- attempt.viewAcls = allProperties.get(" spark.ui.view.acls" )
973
- attempt.adminAcls = allProperties.get(" spark.admin.acls" )
974
- attempt.viewAclsGroups = allProperties.get(" spark.ui.view.acls.groups" )
975
- attempt.adminAclsGroups = allProperties.get(" spark.admin.acls.groups" )
1046
+ // Only parse the first env update, since any future changes don't have any effect on
1047
+ // the ACLs set for the UI.
1048
+ if (! gotEnvUpdate) {
1049
+ val allProperties = event.environmentDetails(" Spark Properties" ).toMap
1050
+ attempt.viewAcls = allProperties.get(" spark.ui.view.acls" )
1051
+ attempt.adminAcls = allProperties.get(" spark.admin.acls" )
1052
+ attempt.viewAclsGroups = allProperties.get(" spark.ui.view.acls.groups" )
1053
+ attempt.adminAclsGroups = allProperties.get(" spark.admin.acls.groups" )
1054
+
1055
+ gotEnvUpdate = true
1056
+ checkProgress()
1057
+ }
976
1058
}
977
1059
978
1060
override def onOtherEvent (event : SparkListenerEvent ): Unit = event match {
@@ -989,6 +1071,17 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
989
1071
}
990
1072
}
991
1073
1074
+ /**
1075
+ * Throws a halt exception to stop replay if enough data to create the app listing has been
1076
+ * read.
1077
+ */
1078
+ private def checkProgress (): Unit = {
1079
+ if (haltEnabled && ! halted && app.id != null && gotEnvUpdate) {
1080
+ halted = true
1081
+ throw new HaltReplayException ()
1082
+ }
1083
+ }
1084
+
992
1085
private class MutableApplicationInfo {
993
1086
var id : String = null
994
1087
var name : String = null
0 commit comments