Skip to content

Commit 449e26e

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-22559][CORE] history server: handle exception on opening corrupted listing.ldb
## What changes were proposed in this pull request? Currently history server v2 failed to start if `listing.ldb` is corrupted. This patch get rid of the corrupted `listing.ldb` and re-create it. The exception handling follows [opening disk store for app](https://github.com/apache/spark/blob/0ffa7c488fa8156e2a1aa282e60b7c36b86d8af8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L307) ## How was this patch tested? manual test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang Gengliang <[email protected]> Closes #19786 from gengliangwang/listingException.
1 parent 554adc7 commit 449e26e

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ import org.apache.hadoop.fs.permission.FsAction
3434
import org.apache.hadoop.hdfs.DistributedFileSystem
3535
import org.apache.hadoop.hdfs.protocol.HdfsConstants
3636
import org.apache.hadoop.security.AccessControlException
37+
import org.fusesource.leveldbjni.internal.NativeDB
3738

3839
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3940
import org.apache.spark.deploy.SparkHadoopUtil
40-
import org.apache.spark.deploy.history.config._
4141
import org.apache.spark.internal.Logging
4242
import org.apache.spark.scheduler._
4343
import org.apache.spark.scheduler.ReplayListenerBus._
@@ -132,15 +132,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
132132
AppStatusStore.CURRENT_VERSION, logDir.toString())
133133

134134
try {
135-
open(new File(path, "listing.ldb"), metadata)
135+
open(dbPath, metadata)
136136
} catch {
137137
// If there's an error, remove the listing database and any existing UI database
138138
// from the store directory, since it's extremely likely that they'll all contain
139139
// incompatible information.
140140
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
141141
logInfo("Detected incompatible DB versions, deleting...")
142142
path.listFiles().foreach(Utils.deleteRecursively)
143-
open(new File(path, "listing.ldb"), metadata)
143+
open(dbPath, metadata)
144+
case dbExc: NativeDB.DBException =>
145+
// Get rid of the corrupted listing.ldb and re-create it.
146+
logWarning(s"Failed to load disk store $dbPath :", dbExc)
147+
Utils.deleteRecursively(dbPath)
148+
open(dbPath, metadata)
144149
}
145150
}.getOrElse(new InMemoryStore())
146151

@@ -568,7 +573,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
568573
}
569574

570575
val logPath = fileStatus.getPath()
571-
logInfo(s"Replaying log path: $logPath")
572576

573577
val bus = new ReplayListenerBus()
574578
val listener = new AppListingListener(fileStatus, clock)

0 commit comments

Comments
 (0)