Skip to content

Commit 83f2b2d

Browse files
authored
Merge existing registry with default one or configure default metric registry (apache-spark-on-k8s#214)
1 parent 83bde70 commit 83f2b2d

File tree

4 files changed

+103
-46
lines changed

4 files changed

+103
-46
lines changed

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
2222

2323
import scala.collection.mutable
2424

25-
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
25+
import com.codahale.metrics._
2626
import org.eclipse.jetty.servlet.ServletContextHandler
2727

2828
import org.apache.spark.{SecurityManager, SparkConf}
@@ -70,14 +70,15 @@ import org.apache.spark.util.Utils
7070
private[spark] class MetricsSystem private (
7171
val instance: String,
7272
conf: SparkConf,
73-
securityMgr: SecurityManager)
73+
securityMgr: SecurityManager,
74+
registry: MetricRegistry)
7475
extends Logging {
7576

7677
private[this] val metricsConfig = new MetricsConfig(conf)
7778

7879
private val sinks = new mutable.ArrayBuffer[Sink]
79-
private val sources = new mutable.ArrayBuffer[Source]
80-
private val registry = new MetricRegistry()
80+
private val sourceToListeners = new mutable.HashMap[Source, MetricRegistryListener]
81+
private val defaultListener = new MetricsSystemListener("")
8182

8283
private var running: Boolean = false
8384

@@ -99,13 +100,17 @@ private[spark] class MetricsSystem private (
99100
running = true
100101
StaticSources.allSources.foreach(registerSource)
101102
registerSources()
103+
SharedMetricRegistries.getDefault.addListener(defaultListener)
102104
registerSinks()
103-
sinks.foreach(_.start)
105+
sinks.foreach(_.start())
104106
}
105107

106108
def stop() {
107109
if (running) {
108-
sinks.foreach(_.stop)
110+
sinks.foreach(_.stop())
111+
sourceToListeners.keySet.foreach(deregisterSource)
112+
sourceToListeners.clear()
113+
SharedMetricRegistries.getDefault.removeListener(defaultListener)
109114
} else {
110115
logWarning("Stopping a MetricsSystem that is not running")
111116
}
@@ -151,26 +156,36 @@ private[spark] class MetricsSystem private (
151156
}
152157

153158
def getSourcesByName(sourceName: String): Seq[Source] =
154-
sources.filter(_.sourceName == sourceName)
159+
sourceToListeners.keySet.filter(_.sourceName == sourceName).toSeq
155160

156161
def registerSource(source: Source) {
157-
sources += source
158162
try {
159-
val regName = buildRegistryName(source)
160-
registry.register(regName, source.metricRegistry)
163+
val listener = new MetricsSystemListener(buildRegistryName(source))
164+
source.metricRegistry.addListener(listener)
165+
sourceToListeners += source -> listener
161166
} catch {
162167
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
163168
}
164169
}
165170

166-
def removeSource(source: Source) {
167-
sources -= source
171+
def deregisterSource(source: Source) {
168172
val regName = buildRegistryName(source)
169173
registry.removeMatching(new MetricFilter {
170174
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
171175
})
176+
sourceToListeners.get(source).foreach(source.metricRegistry.removeListener)
172177
}
173178

179+
def removeSource(source: Source): Unit = {
180+
deregisterSource(source)
181+
sourceToListeners.remove(source)
182+
}
183+
184+
def getSources: Seq[Source] =
185+
sourceToListeners.keySet.to[collection.immutable.Seq]
186+
187+
def getSinks: Seq[Sink] = sinks.to[collection.immutable.Seq]
188+
174189
private def registerSources() {
175190
val instConfig = metricsConfig.getInstance(instance)
176191
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
@@ -211,6 +226,41 @@ private[spark] class MetricsSystem private (
211226
}
212227
}
213228
}
229+
230+
private[spark] class MetricsSystemListener(prefix: String)
231+
extends MetricRegistryListener {
232+
def metricName(name: String): String = MetricRegistry.name(prefix, name)
233+
234+
override def onHistogramAdded(name: String, histogram: Histogram): Unit =
235+
registry.register(metricName(name), histogram)
236+
237+
override def onCounterAdded(name: String, counter: Counter): Unit =
238+
registry.register(metricName(name), counter)
239+
240+
override def onHistogramRemoved(name: String): Unit =
241+
registry.remove(metricName(name))
242+
243+
override def onGaugeRemoved(name: String): Unit =
244+
registry.remove(metricName(name))
245+
246+
override def onMeterRemoved(name: String): Unit =
247+
registry.remove(metricName(name))
248+
249+
override def onTimerAdded(name: String, timer: Timer): Unit =
250+
registry.register(metricName(name), timer)
251+
252+
override def onCounterRemoved(name: String): Unit =
253+
registry.remove(metricName(name))
254+
255+
override def onGaugeAdded(name: String, gauge: Gauge[_]): Unit =
256+
registry.register(metricName(name), gauge)
257+
258+
override def onTimerRemoved(name: String): Unit =
259+
registry.remove(metricName(name))
260+
261+
override def onMeterAdded(name: String, meter: Meter): Unit =
262+
registry.register(metricName(name), meter)
263+
}
214264
}
215265

216266
private[spark] object MetricsSystem {
@@ -220,6 +270,10 @@ private[spark] object MetricsSystem {
220270
private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
221271
private[this] val MINIMAL_POLL_PERIOD = 1
222272

273+
scala.util.control.Exception.ignoring(classOf[IllegalStateException]) {
274+
SharedMetricRegistries.setDefault("spark", new MetricRegistry())
275+
}
276+
223277
def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
224278
val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
225279
if (period < MINIMAL_POLL_PERIOD) {
@@ -230,6 +284,14 @@ private[spark] object MetricsSystem {
230284

231285
def createMetricsSystem(
232286
instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
233-
new MetricsSystem(instance, conf, securityMgr)
287+
new MetricsSystem(instance, conf, securityMgr, new MetricRegistry())
288+
}
289+
290+
def createMetricsSystem(
291+
instance: String,
292+
conf: SparkConf,
293+
securityMgr: SecurityManager,
294+
registry: MetricRegistry): MetricsSystem = {
295+
new MetricsSystem(instance, conf, securityMgr, registry)
234296
}
235297
}

core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark.metrics
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
22-
import com.codahale.metrics.MetricRegistry
20+
import com.codahale.metrics.{Gauge, MetricRegistry}
2321
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2422

2523
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
@@ -41,27 +39,23 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
4139
test("MetricsSystem with default config") {
4240
val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr)
4341
metricsSystem.start()
44-
val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
45-
val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
4642

47-
assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length)
48-
assert(metricsSystem.invokePrivate(sinks()).length === 0)
43+
assert(metricsSystem.getSources.length === StaticSources.allSources.length)
44+
assert(metricsSystem.getSinks.length === 0)
4945
assert(metricsSystem.getServletHandlers.nonEmpty)
5046
}
5147

5248
test("MetricsSystem with sources add") {
5349
val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr)
5450
metricsSystem.start()
55-
val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
56-
val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
5751

58-
assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length)
59-
assert(metricsSystem.invokePrivate(sinks()).length === 1)
52+
assert(metricsSystem.getSources.length === StaticSources.allSources.length)
53+
assert(metricsSystem.getSinks.length === 1)
6054
assert(metricsSystem.getServletHandlers.nonEmpty)
6155

6256
val source = new MasterSource(null)
6357
metricsSystem.registerSource(source)
64-
assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length + 1)
58+
assert(metricsSystem.getSources.length === StaticSources.allSources.length + 1)
6559
}
6660

6761
test("MetricsSystem with Driver instance") {
@@ -268,4 +262,24 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
268262
assert(metricName === source.sourceName)
269263
}
270264

265+
test("MetricsSystem registers dynamically added metrics") {
266+
val registry = new MetricRegistry()
267+
val source = new Source {
268+
override val sourceName = "dummySource"
269+
override val metricRegistry = new MetricRegistry()
270+
}
271+
272+
val instanceName = "testInstance"
273+
val metricsSystem = MetricsSystem.createMetricsSystem(
274+
instanceName, conf, securityMgr, registry)
275+
metricsSystem.registerSource(source)
276+
assert(!registry.getNames.contains("dummySource.newMetric"), "Metric shouldn't be registered")
277+
278+
source.metricRegistry.register("newMetric", new Gauge[Integer] {
279+
override def getValue: Integer = 1
280+
})
281+
assert(registry.getNames.contains("dummySource.newMetric"),
282+
"Metric should have been registered")
283+
}
284+
271285
}

pom.xml

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -742,11 +742,6 @@
742742
<artifactId>jackson-core</artifactId>
743743
<version>${fasterxml.jackson.version}</version>
744744
</dependency>
745-
<dependency>
746-
<groupId>com.fasterxml.jackson.core</groupId>
747-
<artifactId>jackson-core</artifactId>
748-
<version>${fasterxml.jackson.version}</version>
749-
</dependency>
750745
<dependency>
751746
<groupId>com.fasterxml.jackson.core</groupId>
752747
<artifactId>jackson-databind</artifactId>
@@ -757,11 +752,6 @@
757752
<artifactId>jackson-annotations</artifactId>
758753
<version>${fasterxml.jackson.version}</version>
759754
</dependency>
760-
<dependency>
761-
<groupId>com.fasterxml.jackson.dataformat</groupId>
762-
<artifactId>jackson-dataformat-yaml</artifactId>
763-
<version>${fasterxml.jackson.version}</version>
764-
</dependency>
765755
<!-- Guava is excluded because of SPARK-6149. The Guava version referenced in this module is
766756
15.0, which causes runtime incompatibility issues. -->
767757
<dependency>
@@ -820,11 +810,6 @@
820810
<artifactId>jackson-datatype-jsr310</artifactId>
821811
<version>${fasterxml.jackson.version}</version>
822812
</dependency>
823-
<dependency>
824-
<groupId>com.fasterxml.jackson.datatype</groupId>
825-
<artifactId>jackson-datatype-guava</artifactId>
826-
<version>${fasterxml.jackson.version}</version>
827-
</dependency>
828813
<dependency>
829814
<groupId>org.glassfish.jersey.core</groupId>
830815
<artifactId>jersey-server</artifactId>

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -377,13 +377,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
377377
addInputStream(ssc).register()
378378
ssc.start()
379379

380-
val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
380+
val sources = ssc.env.metricsSystem.getSources
381381
val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
382382
assert(sources.contains(streamingSource))
383383
assert(ssc.getState() === StreamingContextState.ACTIVE)
384384

385385
ssc.stop()
386-
val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
386+
val sourcesAfterStop = ssc.env.metricsSystem.getSources
387387
val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
388388
assert(ssc.getState() === StreamingContextState.STOPPED)
389389
assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
@@ -979,10 +979,6 @@ package object testPackage extends Assertions {
979979
* This includes methods to access private methods and fields in StreamingContext and MetricsSystem
980980
*/
981981
private object StreamingContextSuite extends PrivateMethodTester {
982-
private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
983-
private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
984-
metricsSystem.invokePrivate(_sources())
985-
}
986982
private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource)
987983
private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
988984
streamingContext.invokePrivate(_streamingSource())

0 commit comments

Comments
 (0)