Skip to content

Commit 7eeca02

Browse files
dongjoon-hyundbtsai
authored andcommitted
[SPARK-28157][CORE] Make SHS clear KVStore LogInfos for the blacklisted entries
## What changes were proposed in this pull request? At Spark 2.4.0/2.3.2/2.2.3, [SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated access permission checks to the file system, and maintains a blacklist for all event log files failed once at reading. The blacklisted log files are released back after `CLEAN_INTERVAL_S` seconds. However, the released files whose sizes don't changes are ignored forever due to `info.fileSize < entry.getLen()` condition (previously [here](apache@3c96937#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454) and now at [shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571)) which returns `false` always when the size is the same with the existing value in `KVStore`. This is recovered only via SHS restart. This PR aims to remove the existing entry from `KVStore` when it goes to the blacklist. ## How was this patch tested? Pass the Jenkins with the updated test case. Closes apache#24966 from dongjoon-hyun/SPARK-28157. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: DB Tsai <[email protected]>
1 parent bd232b9 commit 7eeca02

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
536536
// We don't have read permissions on the log file
537537
logWarning(s"Unable to read log $path", e.getCause)
538538
blacklist(path)
539+
// SPARK-28157 We should remove this blacklisted entry from the KVStore
540+
// to handle permission-only changes with the same file sizes later.
541+
listing.delete(classOf[LogInfo], path.toString)
539542
case e: Exception =>
540543
logError("Exception while merging application listings", e)
541544
} finally {

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,26 +1122,31 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
11221122
writeFile(accessGranted, true, None,
11231123
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
11241124
SparkListenerApplicationEnd(5L))
1125+
var isReadable = false
11251126
val mockedFs = spy(provider.fs)
11261127
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
1127-
argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied"))
1128+
argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied" &&
1129+
!isReadable))
11281130
val mockedProvider = spy(provider)
11291131
when(mockedProvider.fs).thenReturn(mockedFs)
11301132
updateAndCheck(mockedProvider) { list =>
11311133
list.size should be(1)
11321134
}
1133-
writeFile(accessDenied, true, None,
1134-
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
1135-
SparkListenerApplicationEnd(5L))
11361135
// Doing 2 times in order to check the blacklist filter too
11371136
updateAndCheck(mockedProvider) { list =>
11381137
list.size should be(1)
11391138
}
11401139
val accessDeniedPath = new Path(accessDenied.getPath)
11411140
assert(mockedProvider.isBlacklisted(accessDeniedPath))
11421141
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
1142+
isReadable = true
11431143
mockedProvider.cleanLogs()
1144-
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
1144+
updateAndCheck(mockedProvider) { list =>
1145+
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
1146+
assert(list.exists(_.name == "accessDenied"))
1147+
assert(list.exists(_.name == "accessGranted"))
1148+
list.size should be(2)
1149+
}
11451150
}
11461151

11471152
test("check in-progress event logs absolute length") {

0 commit comments

Comments
 (0)