Skip to content

Commit d2436a8

Browse files
attilapirosjerryshao
authored andcommitted
[SPARK-24594][YARN] Introducing metrics for YARN
## What changes were proposed in this pull request? In this PR metrics are introduced for YARN. As up to now there was no metrics in the YARN module a new metric system is created with the name "applicationMaster". To support both client and cluster mode the metric system lifecycle is bound to the AM. ## How was this patch tested? Both client and cluster mode was tested manually. Before the test on one of the YARN node spark-core was removed to cause the allocation failure. Spark was started as (in case of client mode): ``` spark2-submit \ --class org.apache.spark.examples.SparkPi \ --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true" --conf "spark.blacklist.application.maxFailedExecutorsPerNode=2" --conf "spark.dynamicAllocation.enabled=true" --conf "spark.metrics.conf.*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink" \ --master yarn \ --deploy-mode client \ original-spark-examples_2.11-2.4.0-SNAPSHOT.jar \ 1000 ``` In both cases the YARN logs contained the new metrics as: ``` $ yarn logs --applicationId application_1529926424933_0015 ... -- Gauges ---------------------------------------------------------------------- application_1531751594108_0046.applicationMaster.numContainersPendingAllocate value = 0 application_1531751594108_0046.applicationMaster.numExecutorsFailed value = 3 application_1531751594108_0046.applicationMaster.numExecutorsRunning value = 9 application_1531751594108_0046.applicationMaster.numLocalityAwareTasks value = 0 application_1531751594108_0046.applicationMaster.numReleasedContainers value = 0 ... ``` Author: “attilapiros” <[email protected]> Author: Attila Zsolt Piros <[email protected]> Closes apache#21635 from attilapiros/SPARK-24594.
1 parent cfc3e1a commit d2436a8

File tree

7 files changed

+90
-3
lines changed

7 files changed

+90
-3
lines changed

docs/monitoring.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl
435435
* `executor`: A Spark executor.
436436
* `driver`: The Spark driver process (the process in which your SparkContext is created).
437437
* `shuffleService`: The Spark shuffle service.
438+
* `applicationMaster`: The Spark ApplicationMaster when running on YARN.
438439

439440
Each instance can report to zero or more _sinks_. Sinks are contained in the
440441
`org.apache.spark.metrics.sink` package:

docs/running-on-yarn.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,14 @@ To use a custom metrics.properties for the application master and executors, upd
421421
<code>spark.blacklist.application.maxFailedExecutorsPerNode</code>.
422422
</td>
423423
</tr>
424-
424+
<tr>
425+
<td><code>spark.yarn.metrics.namespace</code></td>
426+
<td>(none)</td>
427+
<td>
428+
The root namespace for AM metrics reporting.
429+
If it is not set then the YARN application ID is used.
430+
</td>
431+
</tr>
425432
</table>
426433

427434
# Important notes

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.deploy.yarn.config._
4343
import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
4444
import org.apache.spark.internal.Logging
4545
import org.apache.spark.internal.config._
46+
import org.apache.spark.metrics.MetricsSystem
4647
import org.apache.spark.rpc._
4748
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
4849
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -67,6 +68,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
6768

6869
private val securityMgr = new SecurityManager(sparkConf)
6970

71+
private var metricsSystem: Option[MetricsSystem] = None
72+
7073
// Set system properties for each config entry. This covers two use cases:
7174
// - The default configuration stored by the SparkHadoopUtil class
7275
// - The user application creating a new SparkConf in cluster mode
@@ -309,6 +312,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
309312
finish(FinalApplicationStatus.FAILED,
310313
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
311314
"Uncaught exception: " + StringUtils.stringifyException(e))
315+
} finally {
316+
try {
317+
metricsSystem.foreach { ms =>
318+
ms.report()
319+
ms.stop()
320+
}
321+
} catch {
322+
case e: Exception =>
323+
logWarning("Exception during stopping of the metric system: ", e)
324+
}
312325
}
313326
}
314327

@@ -434,6 +447,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
434447
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
435448

436449
allocator.allocateResources()
450+
val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
451+
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
452+
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
453+
ms.start()
454+
metricsSystem = Some(ms)
437455
reporterThread = launchReporterThread()
438456
}
439457

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import com.codahale.metrics.{Gauge, MetricRegistry}
21+
22+
import org.apache.spark.metrics.source.Source
23+
24+
private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: YarnAllocator)
25+
extends Source {
26+
27+
override val sourceName: String = prefix + ".applicationMaster"
28+
override val metricRegistry: MetricRegistry = new MetricRegistry()
29+
30+
metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] {
31+
override def getValue: Int = yarnAllocator.getNumExecutorsFailed
32+
})
33+
34+
metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new Gauge[Int] {
35+
override def getValue: Int = yarnAllocator.getNumExecutorsRunning
36+
})
37+
38+
metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new Gauge[Int] {
39+
override def getValue: Int = yarnAllocator.getNumReleasedContainers
40+
})
41+
42+
metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
43+
override def getValue: Int = yarnAllocator.numLocalityAwareTasks
44+
})
45+
46+
metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
47+
override def getValue: Int = yarnAllocator.numContainersPendingAllocate
48+
})
49+
50+
}

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,16 @@ private[yarn] class YarnAllocator(
150150
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
151151

152152
// Number of tasks that have locality preferences in active stages
153-
private var numLocalityAwareTasks: Int = 0
153+
private[yarn] var numLocalityAwareTasks: Int = 0
154154

155155
// A container placement strategy based on pending tasks' locality preference
156156
private[yarn] val containerPlacementStrategy =
157157
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
158158

159159
def getNumExecutorsRunning: Int = runningExecutors.size()
160160

161+
def getNumReleasedContainers: Int = releasedContainers.size()
162+
161163
def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
162164

163165
def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted
@@ -167,6 +169,10 @@ private[yarn] class YarnAllocator(
167169
*/
168170
def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)
169171

172+
def numContainersPendingAllocate: Int = synchronized {
173+
getPendingAllocate.size
174+
}
175+
170176
/**
171177
* A sequence of pending container requests at the given location that have not yet been
172178
* fulfilled.

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.yarn.config._
2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.scheduler.BlacklistTracker
31-
import org.apache.spark.util.{Clock, SystemClock, Utils}
31+
import org.apache.spark.util.{Clock, SystemClock}
3232

3333
/**
3434
* YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted nodes

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ package object config {
152152
.timeConf(TimeUnit.MILLISECONDS)
153153
.createWithDefaultString("100s")
154154

155+
private[spark] val YARN_METRICS_NAMESPACE = ConfigBuilder("spark.yarn.metrics.namespace")
156+
.doc("The root namespace for AM metrics reporting.")
157+
.stringConf
158+
.createOptional
159+
155160
private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
156161
.doc("Node label expression for the AM.")
157162
.stringConf

0 commit comments

Comments
 (0)