Skip to content

Commit bbc2be4

Browse files
committed
[SPARK-28294][CORE] Support spark.history.fs.cleaner.maxNum configuration
## What changes were proposed in this pull request? Up to now, Apache Spark maintains the given event log directory by **time** policy, `spark.history.fs.cleaner.maxAge`. However, there are two issues. 1. Some file system has a limitation on the maximum number of files in a single directory. For example, HDFS `dfs.namenode.fs-limits.max-directory-items` is 1024 * 1024 by default. https://hadoop.apache.org/docs/r3.2.0/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml 2. Spark is sometimes unable to to clean up some old log files due to permission issues (mainly, security policy). To handle both (1) and (2), this PR aims to support an additional policy configuration for the maximum number of files in the event log directory, `spark.history.fs.cleaner.maxNum`. Spark will try to keep the number of files in the event log directory according to this policy. ## How was this patch tested? Pass the Jenkins with a newly added test case. Closes apache#25072 from dongjoon-hyun/SPARK-28294. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 90c64ea commit bbc2be4

File tree

4 files changed

+125
-20
lines changed

4 files changed

+125
-20
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
805805
*/
806806
private[history] def cleanLogs(): Unit = Utils.tryLog {
807807
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
808+
val maxNum = conf.get(MAX_LOG_NUM)
808809

809810
val expired = listing.view(classOf[ApplicationInfoWrapper])
810811
.index("oldestAttempt")
@@ -817,23 +818,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
817818
val (remaining, toDelete) = app.attempts.partition { attempt =>
818819
attempt.info.lastUpdated.getTime() >= maxTime
819820
}
820-
821-
if (remaining.nonEmpty) {
822-
val newApp = new ApplicationInfoWrapper(app.info, remaining)
823-
listing.write(newApp)
824-
}
825-
826-
toDelete.foreach { attempt =>
827-
logInfo(s"Deleting expired event log for ${attempt.logPath}")
828-
val logPath = new Path(logDir, attempt.logPath)
829-
listing.delete(classOf[LogInfo], logPath.toString())
830-
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
831-
deleteLog(fs, logPath)
832-
}
833-
834-
if (remaining.isEmpty) {
835-
listing.delete(app.getClass(), app.id)
836-
}
821+
deleteAttemptLogs(app, remaining, toDelete)
837822
}
838823

839824
// Delete log files that don't have a valid application and exceed the configured max age.
@@ -851,10 +836,59 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
851836
listing.delete(classOf[LogInfo], log.logPath)
852837
}
853838
}
839+
840+
// If the number of files is bigger than MAX_LOG_NUM,
841+
// clean up all completed attempts per application one by one.
842+
val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.size
843+
var count = num - maxNum
844+
if (count > 0) {
845+
logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.")
846+
val oldAttempts = listing.view(classOf[ApplicationInfoWrapper])
847+
.index("oldestAttempt")
848+
.asScala
849+
oldAttempts.foreach { app =>
850+
if (count > 0) {
851+
// Applications may have multiple attempts, some of which may not be completed yet.
852+
val (toDelete, remaining) = app.attempts.partition(_.info.completed)
853+
count -= deleteAttemptLogs(app, remaining, toDelete)
854+
}
855+
}
856+
if (count > 0) {
857+
logWarning(s"Fail to clean up according to MAX_LOG_NUM policy ($maxNum).")
858+
}
859+
}
860+
854861
// Clean the blacklist from the expired entries.
855862
clearBlacklist(CLEAN_INTERVAL_S)
856863
}
857864

865+
private def deleteAttemptLogs(
866+
app: ApplicationInfoWrapper,
867+
remaining: List[AttemptInfoWrapper],
868+
toDelete: List[AttemptInfoWrapper]): Int = {
869+
if (remaining.nonEmpty) {
870+
val newApp = new ApplicationInfoWrapper(app.info, remaining)
871+
listing.write(newApp)
872+
}
873+
874+
var countDeleted = 0
875+
toDelete.foreach { attempt =>
876+
logInfo(s"Deleting expired event log for ${attempt.logPath}")
877+
val logPath = new Path(logDir, attempt.logPath)
878+
listing.delete(classOf[LogInfo], logPath.toString())
879+
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
880+
if (deleteLog(fs, logPath)) {
881+
countDeleted += 1
882+
}
883+
}
884+
885+
if (remaining.isEmpty) {
886+
listing.delete(app.getClass(), app.id)
887+
}
888+
889+
countDeleted
890+
}
891+
858892
/**
859893
* Delete driver logs from the configured spark dfs dir that exceed the configured max age
860894
*/
@@ -1066,19 +1100,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
10661100
throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId."))
10671101
}
10681102

1069-
private def deleteLog(fs: FileSystem, log: Path): Unit = {
1103+
private def deleteLog(fs: FileSystem, log: Path): Boolean = {
1104+
var deleted = false
10701105
if (isBlacklisted(log)) {
10711106
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
10721107
} else {
10731108
try {
1074-
fs.delete(log, true)
1109+
deleted = fs.delete(log, true)
10751110
} catch {
10761111
case _: AccessControlException =>
10771112
logInfo(s"No permission to delete $log, ignoring.")
10781113
case ioe: IOException =>
10791114
logError(s"IOException in cleaning $log", ioe)
10801115
}
10811116
}
1117+
deleted
10821118
}
10831119

10841120
private def isCompleted(name: String): Boolean = {

core/src/main/scala/org/apache/spark/internal/config/History.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ private[spark] object History {
4949
.timeConf(TimeUnit.SECONDS)
5050
.createWithDefaultString("7d")
5151

52+
val MAX_LOG_NUM = ConfigBuilder("spark.history.fs.cleaner.maxNum")
53+
.doc("The maximum number of log files in the event log directory.")
54+
.intConf
55+
.createWithDefault(Int.MaxValue)
56+
5257
val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
5358
.doc("Local directory where to cache application history information. By default this is " +
5459
"not set, meaning all history information will be kept in memory.")

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
11851185
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
11861186
}
11871187

1188+
test("log cleaner with the maximum number of log files") {
1189+
val clock = new ManualClock(0)
1190+
(5 to 0 by -1).foreach { num =>
1191+
val log1_1 = newLogFile("app1", Some("attempt1"), inProgress = false)
1192+
writeFile(log1_1, true, None,
1193+
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
1194+
SparkListenerApplicationEnd(2L)
1195+
)
1196+
log1_1.setLastModified(2L)
1197+
1198+
val log2_1 = newLogFile("app2", Some("attempt1"), inProgress = false)
1199+
writeFile(log2_1, true, None,
1200+
SparkListenerApplicationStart("app2", Some("app2"), 3L, "test", Some("attempt1")),
1201+
SparkListenerApplicationEnd(4L)
1202+
)
1203+
log2_1.setLastModified(4L)
1204+
1205+
val log3_1 = newLogFile("app3", Some("attempt1"), inProgress = false)
1206+
writeFile(log3_1, true, None,
1207+
SparkListenerApplicationStart("app3", Some("app3"), 5L, "test", Some("attempt1")),
1208+
SparkListenerApplicationEnd(6L)
1209+
)
1210+
log3_1.setLastModified(6L)
1211+
1212+
val log1_2_incomplete = newLogFile("app1", Some("attempt2"), inProgress = false)
1213+
writeFile(log1_2_incomplete, true, None,
1214+
SparkListenerApplicationStart("app1", Some("app1"), 7L, "test", Some("attempt2"))
1215+
)
1216+
log1_2_incomplete.setLastModified(8L)
1217+
1218+
val log3_2 = newLogFile("app3", Some("attempt2"), inProgress = false)
1219+
writeFile(log3_2, true, None,
1220+
SparkListenerApplicationStart("app3", Some("app3"), 9L, "test", Some("attempt2")),
1221+
SparkListenerApplicationEnd(10L)
1222+
)
1223+
log3_2.setLastModified(10L)
1224+
1225+
val provider = new FsHistoryProvider(createTestConf().set(MAX_LOG_NUM.key, s"$num"), clock)
1226+
updateAndCheck(provider) { list =>
1227+
assert(log1_1.exists() == (num > 4))
1228+
assert(log1_2_incomplete.exists()) // Always exists for all configurations
1229+
1230+
assert(log2_1.exists() == (num > 3))
1231+
1232+
assert(log3_1.exists() == (num > 2))
1233+
assert(log3_2.exists() == (num > 2))
1234+
}
1235+
}
1236+
}
1237+
11881238
/**
11891239
* Asks the provider to check for logs and calls a function to perform checks on the updated
11901240
* app list. Example:

docs/monitoring.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,11 @@ Security options for the Spark History Server are covered more detail in the
190190
<td>1d</td>
191191
<td>
192192
How often the filesystem job history cleaner checks for files to delete.
193-
Files are only deleted if they are older than <code>spark.history.fs.cleaner.maxAge</code>
193+
Files are deleted if at least one of two conditions holds.
194+
First, they're deleted if they're older than <code>spark.history.fs.cleaner.maxAge</code>.
195+
They are also deleted if the number of files is more than
196+
<code>spark.history.fs.cleaner.maxNum</code>, Spark tries to clean up the completed attempts
197+
from the applications based on the order of their oldest attempt time.
194198
</td>
195199
</tr>
196200
<tr>
@@ -200,6 +204,16 @@ Security options for the Spark History Server are covered more detail in the
200204
Job history files older than this will be deleted when the filesystem history cleaner runs.
201205
</td>
202206
</tr>
207+
<tr>
208+
<td>spark.history.fs.cleaner.maxNum</td>
209+
<td>Int.MaxValue</td>
210+
<td>
211+
The maximum number of files in the event log directory.
212+
Spark tries to clean up the completed attempt logs to maintain the log directory under this limit.
213+
This should be smaller than the underlying file system limit like
214+
`dfs.namenode.fs-limits.max-directory-items` in HDFS.
215+
</td>
216+
</tr>
203217
<tr>
204218
<td>spark.history.fs.endEventReparseChunkSize</td>
205219
<td>1m</td>

0 commit comments

Comments
 (0)