Skip to content

Commit a802c69

Browse files
mareksimunekash211
authored andcommitted
[SPARK-18364][YARN] Expose metrics for YarnShuffleService
## What changes were proposed in this pull request? This PR is follow-up of closed apache#17401 which only ended due to of inactivity, but its still nice feature to have. Given review by jerryshao taken in consideration and edited: - VisibleForTesting deleted because of dependency conflicts - removed unnecessary reflection for `MetricsSystemImpl` - added more available types for gauge ## How was this patch tested? Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that the metrics appear in the Node Manager-standard location. This is JMX with an query endpoint running on `hostname:port` Resulting metrics look like this: ``` curl -sk -XGET hostname:port | grep -v '#' | grep 'shuffleService' hadoop_nodemanager_openblockrequestlatencymillis_rate15{name="shuffleService",} 0.31428910657834713 hadoop_nodemanager_blocktransferratebytes_rate15{name="shuffleService",} 566144.9983653595 hadoop_nodemanager_blocktransferratebytes_ratemean{name="shuffleService",} 2464409.9678099006 hadoop_nodemanager_openblockrequestlatencymillis_rate1{name="shuffleService",} 1.2893844732240272 hadoop_nodemanager_registeredexecutorssize{name="shuffleService",} 2.0 hadoop_nodemanager_openblockrequestlatencymillis_ratemean{name="shuffleService",} 1.255574678369966 hadoop_nodemanager_openblockrequestlatencymillis_count{name="shuffleService",} 315.0 hadoop_nodemanager_openblockrequestlatencymillis_rate5{name="shuffleService",} 0.7661929192569739 hadoop_nodemanager_registerexecutorrequestlatencymillis_ratemean{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_count{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate1{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate5{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_count{name="shuffleService",} 6.18271213E8 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate15{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_rate5{name="shuffleService",} 1154114.4881816586 hadoop_nodemanager_blocktransferratebytes_rate1{name="shuffleService",} 574745.0749848988 ``` Closes apache#22485 from mareksimunek/SPARK-18364. Lead-authored-by: marek.simunek <[email protected]> Co-authored-by: Andrew Ash <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
1 parent b96fd44 commit a802c69

File tree

3 files changed

+221
-0
lines changed

3 files changed

+221
-0
lines changed

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.hadoop.fs.FileSystem;
3636
import org.apache.hadoop.fs.Path;
3737
import org.apache.hadoop.fs.permission.FsPermission;
38+
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
39+
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
3840
import org.apache.hadoop.yarn.api.records.ContainerId;
3941
import org.apache.hadoop.yarn.server.api.*;
4042
import org.apache.spark.network.util.LevelDBProvider;
@@ -168,6 +170,15 @@ protected void serviceInit(Configuration conf) throws Exception {
168170
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
169171
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
170172

173+
// register metrics on the block handler into the Node Manager's metrics system.
174+
YarnShuffleServiceMetrics serviceMetrics =
175+
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
176+
177+
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
178+
metricsSystem.register(
179+
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
180+
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
181+
171182
// If authentication is enabled, set up the shuffle server to use a
172183
// special RPC handler that filters out unauthenticated fetch requests
173184
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.yarn;
19+
20+
import java.util.Map;
21+
22+
import com.codahale.metrics.*;
23+
import org.apache.hadoop.metrics2.MetricsCollector;
24+
import org.apache.hadoop.metrics2.MetricsInfo;
25+
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
26+
import org.apache.hadoop.metrics2.MetricsSource;
27+
28+
/**
29+
* Forward {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}
30+
* to hadoop metrics system.
31+
* NodeManager by default exposes JMX endpoint where can be collected.
32+
*/
33+
class YarnShuffleServiceMetrics implements MetricsSource {
34+
35+
private final MetricSet metricSet;
36+
37+
YarnShuffleServiceMetrics(MetricSet metricSet) {
38+
this.metricSet = metricSet;
39+
}
40+
41+
/**
42+
* Get metrics from the source
43+
*
44+
* @param collector to contain the resulting metrics snapshot
45+
* @param all if true, return all metrics even if unchanged.
46+
*/
47+
@Override
48+
public void getMetrics(MetricsCollector collector, boolean all) {
49+
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
50+
51+
for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
52+
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
53+
}
54+
}
55+
56+
/**
57+
* The metric types used in
58+
* {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}.
59+
* Visible for testing.
60+
*/
61+
public static void collectMetric(
62+
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
63+
64+
if (metric instanceof Timer) {
65+
Timer t = (Timer) metric;
66+
metricsRecordBuilder
67+
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
68+
t.getCount())
69+
.addGauge(
70+
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name),
71+
t.getFifteenMinuteRate())
72+
.addGauge(
73+
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name),
74+
t.getFiveMinuteRate())
75+
.addGauge(
76+
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
77+
t.getOneMinuteRate())
78+
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
79+
t.getMeanRate());
80+
} else if (metric instanceof Meter) {
81+
Meter m = (Meter) metric;
82+
metricsRecordBuilder
83+
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name),
84+
m.getCount())
85+
.addGauge(
86+
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name),
87+
m.getFifteenMinuteRate())
88+
.addGauge(
89+
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name),
90+
m.getFiveMinuteRate())
91+
.addGauge(
92+
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name),
93+
m.getOneMinuteRate())
94+
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name),
95+
m.getMeanRate());
96+
} else if (metric instanceof Gauge) {
97+
final Object gaugeValue = ((Gauge) metric).getValue();
98+
if (gaugeValue instanceof Integer) {
99+
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Integer) gaugeValue);
100+
} else if (gaugeValue instanceof Long) {
101+
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Long) gaugeValue);
102+
} else if (gaugeValue instanceof Float) {
103+
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Float) gaugeValue);
104+
} else if (gaugeValue instanceof Double) {
105+
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Double) gaugeValue);
106+
} else {
107+
throw new IllegalStateException(
108+
"Not supported class type of metric[" + name + "] for value " + gaugeValue);
109+
}
110+
}
111+
}
112+
113+
private static MetricsInfo getShuffleServiceMetricsInfo(String name) {
114+
return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name);
115+
}
116+
117+
private static class ShuffleServiceMetricsInfo implements MetricsInfo {
118+
119+
private final String name;
120+
private final String description;
121+
122+
ShuffleServiceMetricsInfo(String name, String description) {
123+
this.name = name;
124+
this.description = description;
125+
}
126+
127+
@Override
128+
public String name() {
129+
return name;
130+
}
131+
132+
@Override
133+
public String description() {
134+
return description;
135+
}
136+
}
137+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.network.yarn
18+
19+
import scala.collection.JavaConverters._
20+
21+
import org.apache.hadoop.metrics2.MetricsRecordBuilder
22+
import org.mockito.Matchers._
23+
import org.mockito.Mockito.{mock, times, verify, when}
24+
import org.scalatest.Matchers
25+
26+
import org.apache.spark.SparkFunSuite
27+
import org.apache.spark.network.server.OneForOneStreamManager
28+
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
29+
30+
class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
31+
32+
val streamManager = mock(classOf[OneForOneStreamManager])
33+
val blockResolver = mock(classOf[ExternalShuffleBlockResolver])
34+
when(blockResolver.getRegisteredExecutorsSize).thenReturn(42)
35+
36+
val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics
37+
38+
test("metrics named as expected") {
39+
val allMetrics = Set(
40+
"openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
41+
"blockTransferRateBytes", "registeredExecutorsSize")
42+
43+
metrics.getMetrics.keySet().asScala should be (allMetrics)
44+
}
45+
46+
// these three metrics have the same effect on the collector
47+
for (testname <- Seq("openBlockRequestLatencyMillis",
48+
"registerExecutorRequestLatencyMillis",
49+
"blockTransferRateBytes")) {
50+
test(s"$testname - collector receives correct types") {
51+
val builder = mock(classOf[MetricsRecordBuilder])
52+
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
53+
when(builder.addGauge(any(), anyDouble())).thenReturn(builder)
54+
55+
YarnShuffleServiceMetrics.collectMetric(builder, testname,
56+
metrics.getMetrics.get(testname))
57+
58+
verify(builder).addCounter(anyObject(), anyLong())
59+
verify(builder, times(4)).addGauge(anyObject(), anyDouble())
60+
}
61+
}
62+
63+
// this metric writes only one gauge to the collector
64+
test("registeredExecutorsSize - collector receives correct types") {
65+
val builder = mock(classOf[MetricsRecordBuilder])
66+
67+
YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize",
68+
metrics.getMetrics.get("registeredExecutorsSize"))
69+
70+
// only one
71+
verify(builder).addGauge(anyObject(), anyInt())
72+
}
73+
}

0 commit comments

Comments
 (0)