|
17 | 17 |
|
18 | 18 | package org.apache.spark.executor
|
19 | 19 |
|
| 20 | +import java.lang.management.ManagementFactory |
20 | 21 | import java.util.concurrent.ThreadPoolExecutor
|
| 22 | +import javax.management.{MBeanServer, ObjectName} |
21 | 23 |
|
22 | 24 | import scala.collection.JavaConverters._
|
| 25 | +import scala.util.control.NonFatal |
23 | 26 |
|
24 | 27 | import com.codahale.metrics.{Gauge, MetricRegistry}
|
25 | 28 | import org.apache.hadoop.fs.FileSystem
|
@@ -73,6 +76,24 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
|
73 | 76 | registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
|
74 | 77 | }
|
75 | 78 |
|
| 79 | + // Dropwizard metrics gauge measuring the executor's process CPU time. |
| 80 | + // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise. |
| 81 | + // The CPU time value is returned in nanoseconds. |
| 82 | + // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or |
| 83 | + // com.ibm.lang.management.OperatingSystemMXBean, if available. |
| 84 | + metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { |
| 85 | + val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer |
| 86 | + val name = new ObjectName("java.lang", "type", "OperatingSystem") |
| 87 | + override def getValue: Long = { |
| 88 | + try { |
| 89 | + // return JVM process CPU time if the ProcessCpuTime method is available |
| 90 | + mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long] |
| 91 | + } catch { |
| 92 | + case NonFatal(_) => -1L |
| 93 | + } |
| 94 | + } |
| 95 | + }) |
| 96 | + |
76 | 97 | // Expose executor task metrics using the Dropwizard metrics system.
|
77 | 98 | // The list is taken from TaskMetrics.scala
|
78 | 99 | val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
|
|
0 commit comments