Skip to content

Commit ae5b2a6

Browse files
HeartSaVioRMarcelo Vanzin
authored andcommitted
[SPARK-26311][CORE] New feature: apply custom log URL pattern for executor log URLs in SHS
## What changes were proposed in this pull request? This patch proposes adding a new configuration on SHS: custom executor log URL pattern. This will enable end users to replace executor logs to other than RM provide, like external log service, which enables to serve executor logs when NodeManager becomes unavailable in case of YARN. End users can build their own of custom executor log URLs with pre-defined patterns which would be vary on each resource manager. This patch adds some patterns to YARN resource manager. (For others, there's even no executor log url available so cannot define patterns as well.) Please refer the doc change as well as added UTs in this patch to see how to set up the feature. ## How was this patch tested? Added UT, as well as manual test with YARN cluster Closes apache#23260 from HeartSaVioR/SPARK-26311. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 25b97a4 commit ae5b2a6

File tree

39 files changed

+717
-126
lines changed

39 files changed

+717
-126
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2355,7 +2355,8 @@ class SparkContext(config: SparkConf) extends Logging {
23552355
// Note: this code assumes that the task scheduler has been initialized and has contacted
23562356
// the cluster manager to get an application ID (in case the cluster manager provides one).
23572357
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
2358-
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
2358+
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls,
2359+
schedulerBackend.getDriverAttributes))
23592360
_driverLogger.foreach(_.startSync(_hadoopConfiguration))
23602361
}
23612362

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
359359
return None
360360
}
361361

362-
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name,
363-
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
364-
attempt.info.startTime.getTime(),
365-
attempt.info.appSparkVersion)
362+
val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager,
363+
app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
364+
attempt.info.startTime.getTime(), attempt.info.appSparkVersion)
366365
loadPlugins().foreach(_.setupUI(ui))
367366

368367
val loadedUI = LoadedAppUI(ui)
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import java.util.concurrent.atomic.AtomicBoolean
21+
22+
import scala.util.matching.Regex
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.internal.Logging
26+
import org.apache.spark.internal.config.History._
27+
import org.apache.spark.status.AppStatusStore
28+
import org.apache.spark.status.api.v1
29+
import org.apache.spark.util.kvstore.KVStore
30+
31+
private[spark] class HistoryAppStatusStore(
32+
conf: SparkConf,
33+
store: KVStore)
34+
extends AppStatusStore(store, None) with Logging {
35+
36+
import HistoryAppStatusStore._
37+
38+
private val logUrlPattern: Option[String] = {
39+
val appInfo = super.applicationInfo()
40+
val applicationCompleted = appInfo.attempts.nonEmpty && appInfo.attempts.head.completed
41+
if (applicationCompleted || conf.get(APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP)) {
42+
conf.get(CUSTOM_EXECUTOR_LOG_URL)
43+
} else {
44+
None
45+
}
46+
}
47+
48+
private val informedForMissingAttributes = new AtomicBoolean(false)
49+
50+
override def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
51+
val execList = super.executorList(activeOnly)
52+
logUrlPattern match {
53+
case Some(pattern) => execList.map(replaceLogUrls(_, pattern))
54+
case None => execList
55+
}
56+
}
57+
58+
override def executorSummary(executorId: String): v1.ExecutorSummary = {
59+
val execSummary = super.executorSummary(executorId)
60+
logUrlPattern match {
61+
case Some(pattern) => replaceLogUrls(execSummary, pattern)
62+
case None => execSummary
63+
}
64+
}
65+
66+
private def replaceLogUrls(exec: v1.ExecutorSummary, urlPattern: String): v1.ExecutorSummary = {
67+
val attributes = exec.attributes
68+
69+
// Relation between pattern {{FILE_NAME}} and attribute {{LOG_FILES}}
70+
// Given that HistoryAppStatusStore don't know which types of log files can be provided
71+
// from resource manager, we require resource manager to provide available types of log
72+
// files, which are encouraged to be same as types of log files provided in original log URLs.
73+
// Once we get the list of log files, we need to expose them to end users as a pattern
74+
// so that end users can compose custom log URL(s) including log file name(s).
75+
val allPatterns = CUSTOM_URL_PATTERN_REGEX.findAllMatchIn(urlPattern).map(_.group(1)).toSet
76+
val allPatternsExceptFileName = allPatterns.filter(_ != "FILE_NAME")
77+
val allAttributeKeys = attributes.keySet
78+
val allAttributeKeysExceptLogFiles = allAttributeKeys.filter(_ != "LOG_FILES")
79+
80+
if (allPatternsExceptFileName.diff(allAttributeKeysExceptLogFiles).nonEmpty) {
81+
logFailToRenewLogUrls("some of required attributes are missing in app's event log.",
82+
allPatternsExceptFileName, allAttributeKeys)
83+
return exec
84+
} else if (allPatterns.contains("FILE_NAME") && !allAttributeKeys.contains("LOG_FILES")) {
85+
logFailToRenewLogUrls("'FILE_NAME' parameter is provided, but file information is " +
86+
"missing in app's event log.", allPatternsExceptFileName, allAttributeKeys)
87+
return exec
88+
}
89+
90+
val updatedUrl = allPatternsExceptFileName.foldLeft(urlPattern) { case (orig, patt) =>
91+
// we already checked the existence of attribute when comparing keys
92+
orig.replace(s"{{$patt}}", attributes(patt))
93+
}
94+
95+
val newLogUrlMap = if (allPatterns.contains("FILE_NAME")) {
96+
// allAttributeKeys should contain "LOG_FILES"
97+
attributes("LOG_FILES").split(",").map { file =>
98+
file -> updatedUrl.replace("{{FILE_NAME}}", file)
99+
}.toMap
100+
} else {
101+
Map("log" -> updatedUrl)
102+
}
103+
104+
replaceExecutorLogs(exec, newLogUrlMap)
105+
}
106+
107+
private def logFailToRenewLogUrls(
108+
reason: String,
109+
allPatterns: Set[String],
110+
allAttributes: Set[String]): Unit = {
111+
if (informedForMissingAttributes.compareAndSet(false, true)) {
112+
logInfo(s"Fail to renew executor log urls: $reason. Required: $allPatterns / " +
113+
s"available: $allAttributes. Falling back to show app's original log urls.")
114+
}
115+
}
116+
117+
private def replaceExecutorLogs(
118+
source: v1.ExecutorSummary,
119+
newExecutorLogs: Map[String, String]): v1.ExecutorSummary = {
120+
new v1.ExecutorSummary(source.id, source.hostPort, source.isActive, source.rddBlocks,
121+
source.memoryUsed, source.diskUsed, source.totalCores, source.maxTasks, source.activeTasks,
122+
source.failedTasks, source.completedTasks, source.totalTasks, source.totalDuration,
123+
source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
124+
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
125+
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
126+
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes)
127+
}
128+
129+
}
130+
131+
private[spark] object HistoryAppStatusStore {
132+
val CUSTOM_URL_PATTERN_REGEX: Regex = "\\{\\{([A-Za-z0-9_\\-]+)\\}\\}".r
133+
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ private[spark] class CoarseGrainedExecutorBackend(
6060
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
6161
// This is a very fast action so we can use "ThreadUtils.sameThread"
6262
driver = Some(ref)
63-
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
63+
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
64+
extractAttributes))
6465
}(ThreadUtils.sameThread).onComplete {
6566
// This is a very fast action so we can use "ThreadUtils.sameThread"
6667
case Success(msg) =>
@@ -76,6 +77,12 @@ private[spark] class CoarseGrainedExecutorBackend(
7677
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
7778
}
7879

80+
def extractAttributes: Map[String, String] = {
81+
val prefix = "SPARK_EXECUTOR_ATTRIBUTE_"
82+
sys.env.filterKeys(_.startsWith(prefix))
83+
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2))
84+
}
85+
7986
override def receive: PartialFunction[Any, Unit] = {
8087
case RegisteredExecutor =>
8188
logInfo("Successfully registered with driver")

core/src/main/scala/org/apache/spark/internal/config/History.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,22 @@ private[spark] object History {
125125
val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
126126
.stringConf
127127
.createOptional
128+
129+
val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.history.custom.executor.log.url")
130+
.doc("Specifies custom spark executor log url for supporting external log service instead of " +
131+
"using cluster managers' application log urls in the history server. Spark will support " +
132+
"some path variables via patterns which can vary on cluster manager. Please check the " +
133+
"documentation for your cluster manager to see which patterns are supported, if any. " +
134+
"This configuration has no effect on a live application, it only affects the history server.")
135+
.stringConf
136+
.createOptional
137+
138+
val APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP =
139+
ConfigBuilder("spark.history.custom.executor.log.url.applyIncompleteApplication")
140+
.doc("Whether to apply custom executor log url, as specified by " +
141+
"`spark.history.custom.executor.log.url`, to incomplete application as well. " +
142+
"Even if this is true, this still only affects the behavior of the history server, " +
143+
"not running spark applications.")
144+
.booleanConf
145+
.createWithDefault(true)
128146
}

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ private[spark] trait SchedulerBackend {
6969
*/
7070
def getDriverLogUrls: Option[Map[String, String]] = None
7171

72+
/**
73+
* Get the attributes on driver. These attributes are used to replace log URLs when
74+
* custom log url pattern is specified.
75+
* @return Map containing attributes on driver.
76+
*/
77+
def getDriverAttributes: Option[Map[String, String]] = None
78+
7279
/**
7380
* Get the max number of tasks that can be concurrent launched currently.
7481
* Note that please don't cache the value returned by this method, because the number can change

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ case class SparkListenerApplicationStart(
192192
time: Long,
193193
sparkUser: String,
194194
appAttemptId: Option[String],
195-
driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
195+
driverLogs: Option[Map[String, String]] = None,
196+
driverAttributes: Option[Map[String, String]] = None) extends SparkListenerEvent
196197

197198
@DeveloperApi
198199
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ private[spark] object CoarseGrainedClusterMessages {
6363
executorRef: RpcEndpointRef,
6464
hostname: String,
6565
cores: Int,
66-
logUrls: Map[String, String])
66+
logUrls: Map[String, String],
67+
attributes: Map[String, String])
6768
extends CoarseGrainedClusterMessage
6869

6970
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
183183

184184
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
185185

186-
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
186+
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) =>
187187
if (executorDataMap.contains(executorId)) {
188188
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
189189
context.reply(true)
@@ -207,7 +207,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
207207
totalCoreCount.addAndGet(cores)
208208
totalRegisteredExecutors.addAndGet(1)
209209
val data = new ExecutorData(executorRef, executorAddress, hostname,
210-
cores, cores, logUrls)
210+
cores, cores, logUrls, attributes)
211211
// This must be synchronized because variables mutated
212212
// in this block are read when requesting executors
213213
CoarseGrainedSchedulerBackend.this.synchronized {

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ private[cluster] class ExecutorData(
3434
override val executorHost: String,
3535
var freeCores: Int,
3636
override val totalCores: Int,
37-
override val logUrlMap: Map[String, String]
38-
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
37+
override val logUrlMap: Map[String, String],
38+
override val attributes: Map[String, String]
39+
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes)

0 commit comments

Comments
 (0)