Skip to content

Commit f69af44

Browse files
JMX: Add Kafka consumer target script (#25)
1 parent 9f47008 commit f69af44

File tree

9 files changed

+322
-9
lines changed

9 files changed

+322
-9
lines changed

contrib/jmx-metrics/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ The currently available target systems are:
146146
| [`jvm`](./docs/target-systems/jvm.md) |
147147
| [`cassandra`](./docs/target-systems/cassandra.md) |
148148
| [`kafka`](./docs/target-systems/kafka.md) |
149+
| [`kafka-consumer`](./docs/target-systems/kafka-consumer.md) |
149150

150151
### Configuration
151152

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Kafka Consumer Metrics
2+
3+
The JMX Metric Gatherer provides built in Kafka consumer metric gathering capabilities for versions v0.8.2.x and above.
4+
These metrics are sourced from Kafka's exposed Yammer metrics for each instance: https://kafka.apache.org/documentation/#monitoring
5+
6+
## Metrics
7+
8+
### Consumer Metrics
9+
10+
* Name: `kafka.consumer.fetch-rate`
11+
* Description: The number of fetch requests for all topics per second.
12+
* Unit: `1`
13+
* Labels: `client-id`
14+
* Instrument Type: DoubleValueObserver
15+
16+
* Name: `kafka.consumer.records-lag-max`
17+
* Description: Number of messages the consumer lags behind the producer.
18+
* Unit: `1`
19+
* Labels: `client-id`
20+
* Instrument Type: DoubleValueObserver
21+
22+
* Name: `kafka.consumer.total.bytes-consumed-rate`
23+
* Description: The average number of bytes consumed for all topics per second.
24+
* Unit: `by`
25+
* Labels: `client-id`
26+
* Instrument Type: DoubleValueObserver
27+
28+
* Name: `kafka.consumer.total.fetch-size-avg`
29+
* Description: The average number of bytes fetched per request for all topics.
30+
* Unit: `by`
31+
* Labels: `client-id`
32+
* Instrument Type: DoubleValueObserver
33+
34+
* Name: `kafka.consumer.total.records-consumed-rate`
35+
* Description: The average number of records consumed for all topics per second.
36+
* Unit: `1`
37+
* Labels: `client-id`
38+
* Instrument Type: DoubleValueObserver
39+
40+
### Per-Topic Consumer Metrics
41+
42+
* Name: `kafka.consumer.bytes-consumed-rate`
43+
* Description: The average number of bytes consumed per second
44+
* Unit: `by`
45+
* Labels: `client-id`, `topic`
46+
* Instrument Type: DoubleValueObserver
47+
48+
* Name: `kafka.consumer.fetch-size-avg`
49+
* Description: The average number of bytes fetched per request
50+
* Unit: `by`
51+
* Labels: `client-id`, `topic`
52+
* Instrument Type: DoubleValueObserver
53+
54+
* Name: `kafka.consumer.records-consumed-rate`
55+
* Description: The average number of records consumed per second
56+
* Unit: `1`
57+
* Labels: `client-id`, `topic`
58+
* Instrument Type: DoubleValueObserver

contrib/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/InstrumentHelper.groovy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ class InstrumentHelper {
108108
def updatedInstrumentName = "${instrumentName}.${key}"
109109
def labels = getLabels(mbean, labelFuncs)
110110
def inst = instrument(updatedInstrumentName, description, unit)
111-
println "InstrumentHelper.update (composite) - ${inst}"
112111
logger.fine("Recording ${updatedInstrumentName} - ${inst} w/ ${val} - ${labels}")
113112
if (!instToUpdates.containsKey(inst)) {
114113
instToUpdates[inst] = []
@@ -118,7 +117,6 @@ class InstrumentHelper {
118117
} else {
119118
def labels = getLabels(mbean, labelFuncs)
120119
def inst = instrument(instrumentName, description, unit)
121-
println "InstrumentHelper.update - ${inst}"
122120
logger.fine("Recording ${instrumentName} - ${inst} w/ ${value} - ${labels}")
123121
if (!instToUpdates.containsKey(inst)) {
124122
instToUpdates[inst] = []

contrib/jmx-metrics/src/main/java/io/opentelemetry/contrib/jmxmetrics/JmxConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class JmxConfig {
3939
static final String JMX_REMOTE_PROFILE = PREFIX + "jmx.remote.profile";
4040
static final String JMX_REALM = PREFIX + "jmx.realm";
4141

42-
static final List<String> AVAILABLE_TARGET_SYSTEMS = Arrays.asList("jvm", "kafka", "cassandra");
42+
static final List<String> AVAILABLE_TARGET_SYSTEMS =
43+
Arrays.asList("cassandra", "jvm", "kafka", "kafka-consumer");
4344

4445
final String serviceUrl;
4546
final String groovyScript;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
def consumerFetchManagerMetrics = otel.mbeans("kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics")
18+
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.fetch-rate",
19+
"The number of fetch requests for all topics per second", "1",
20+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
21+
"fetch-rate", otel.&doubleValueObserver)
22+
23+
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.records-lag-max",
24+
"Number of messages the consumer lags behind the producer", "1",
25+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
26+
"records-lag-max", otel.&doubleValueObserver)
27+
28+
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.bytes-consumed-rate",
29+
"The average number of bytes consumed for all topics per second", "by",
30+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
31+
"bytes-consumed-rate", otel.&doubleValueObserver)
32+
33+
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.fetch-size-avg",
34+
"The average number of bytes fetched per request for all topics", "by",
35+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
36+
"fetch-size-avg", otel.&doubleValueObserver)
37+
38+
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.records-consumed-rate",
39+
"The average number of records consumed for all topics per second", "1",
40+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
41+
"records-consumed-rate", otel.&doubleValueObserver)
42+
43+
def consumerFetchManagerMetricsByTopic = otel.mbeans("kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics")
44+
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.bytes-consumed-rate",
45+
"The average number of bytes consumed per second", "by",
46+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
47+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
48+
"bytes-consumed-rate", otel.&doubleValueObserver)
49+
50+
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.fetch-size-avg",
51+
"The average number of bytes fetched per request", "by",
52+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
53+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
54+
"fetch-size-avg", otel.&doubleValueObserver)
55+
56+
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.records-consumed-rate",
57+
"The average number of records consumed per second", "1",
58+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
59+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
60+
"records-consumed-rate", otel.&doubleValueObserver)

contrib/jmx-metrics/src/test/groovy/io/opentelemetry/contrib/jmxmetrics/IntegrationTest.groovy

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.testcontainers.containers.GenericContainer
3131
import org.testcontainers.containers.Network
3232
import org.testcontainers.containers.wait.strategy.Wait
3333
import org.testcontainers.images.builder.ImageFromDockerfile
34+
import org.testcontainers.lifecycle.Startable
3435
import org.testcontainers.utility.MountableFile
3536
import spock.lang.Shared
3637
import spock.lang.Specification
@@ -96,23 +97,47 @@ class IntegrationTest extends Specification{
9697
)
9798
}
9899

99-
if ("kafka" in targets) {
100+
if (["kafka", "kafka-consumer"].any { it in targets }) {
100101
def zookeeper = new GenericContainer<>("zookeeper:3.5")
101102
.withNetwork(network)
102103
.withNetworkAliases("zookeeper")
103104
.withStartupTimeout(Duration.ofSeconds(120))
104105
.waitingFor(Wait.forListeningPort())
105106
targetContainers.add(zookeeper)
106-
targetContainers.add(
107-
new GenericContainer<>("bitnami/kafka:latest")
107+
def kafka = new GenericContainer<>("bitnami/kafka:latest")
108108
.withNetwork(network)
109109
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
110110
.withNetworkAliases("kafka")
111111
.withExposedPorts(7199)
112112
.withStartupTimeout(Duration.ofSeconds(120))
113113
.waitingFor(Wait.forListeningPort())
114114
.dependsOn(zookeeper)
115-
)
115+
targetContainers.add(kafka)
116+
if ("kafka-consumer" in targets) {
117+
def createTopics = new Startable() {
118+
@Override
119+
void start() {
120+
kafka.execInContainer(
121+
"sh", "-c",
122+
"unset JMX_PORT; for i in `seq 3`; do kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic-\$i; done"
123+
)
124+
}
125+
126+
@Override
127+
void stop() { }
128+
}
129+
def kafkaConsumer = new GenericContainer<>("bitnami/kafka:latest")
130+
.withNetwork(network)
131+
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
132+
.withNetworkAliases("kafka-consumer")
133+
.withExposedPorts(7199)
134+
.withCommand("kafka-console-consumer.sh", "--bootstrap-server", "kafka:9092",
135+
"--whitelist", "test-topic-.*", "--max-messages", "100"
136+
).withStartupTimeout(Duration.ofSeconds(120))
137+
.waitingFor(Wait.forListeningPort())
138+
.dependsOn(kafka, createTopics)
139+
targetContainers.add(kafkaConsumer)
140+
}
116141
}
117142

118143
targetContainers.each {

contrib/jmx-metrics/src/test/groovy/io/opentelemetry/contrib/jmxmetrics/JmxConfigTest.groovy

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ class JmxConfigTest extends UnitTest {
2121

2222
def 'static values'() {
2323
expect: 'static values to be expected'
24-
JmxConfig.AVAILABLE_TARGET_SYSTEMS == ["jvm", "kafka", "cassandra"]
24+
JmxConfig.AVAILABLE_TARGET_SYSTEMS == [
25+
"cassandra",
26+
"jvm",
27+
"kafka",
28+
"kafka-consumer"
29+
]
2530
}
2631

2732
def 'default values'() {
@@ -139,6 +144,6 @@ class JmxConfigTest extends UnitTest {
139144

140145
expect: 'config fails to validate'
141146
raised != null
142-
raised.message == "unavailabletargetsystem must be one of [jvm, kafka, cassandra]"
147+
raised.message == "unavailabletargetsystem must be one of [cassandra, jvm, kafka, kafka-consumer]"
143148
}
144149
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.contrib.jmxmetrics
18+
19+
import io.opentelemetry.proto.common.v1.StringKeyValue
20+
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
21+
import io.opentelemetry.proto.metrics.v1.Metric
22+
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
23+
import org.testcontainers.Testcontainers
24+
import spock.lang.Requires
25+
import spock.lang.Timeout
26+
27+
@Requires({
28+
System.getProperty('ojc.integration.tests') == 'true'
29+
})
30+
@Timeout(60)
31+
class KafkaConsumerTargetSystemIntegrationTests extends OtlpIntegrationTest {
32+
33+
def 'end to end'() {
34+
setup: 'we configure JMX metrics gatherer and target server to use Kafka as target system'
35+
targets = ["kafka-consumer"]
36+
Testcontainers.exposeHostPorts(otlpPort)
37+
configureContainers('target-systems/kafka-consumer.properties', otlpPort, 0, false)
38+
39+
expect:
40+
when: 'we receive metrics from the JMX metric gatherer'
41+
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
42+
then: 'they are of the expected size'
43+
receivedMetrics.size() == 1
44+
45+
when: "we examine the received metric's instrumentation library metrics lists"
46+
ResourceMetrics receivedMetric = receivedMetrics.get(0)
47+
List<InstrumentationLibraryMetrics> ilMetrics =
48+
receivedMetric.instrumentationLibraryMetricsList
49+
then: 'they of the expected size'
50+
ilMetrics.size() == 1
51+
52+
when: 'we examine the instrumentation library metric metrics list'
53+
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
54+
ArrayList<Metric> metrics = ilMetric.metricsList as ArrayList
55+
metrics.sort{ a, b -> a.name <=> b.name}
56+
then: 'they are of the expected size and content'
57+
metrics.size() == 8
58+
59+
def expectedTopics = [
60+
'test-topic-1',
61+
'test-topic-2',
62+
'test-topic-3'
63+
]
64+
65+
[
66+
[
67+
'kafka.consumer.bytes-consumed-rate',
68+
'The average number of bytes consumed per second',
69+
'by',
70+
['client-id' : '', 'topic' : expectedTopics.clone() ],
71+
],
72+
[
73+
'kafka.consumer.fetch-rate',
74+
'The number of fetch requests for all topics per second',
75+
'1',
76+
['client-id' : '']
77+
],
78+
[
79+
'kafka.consumer.fetch-size-avg',
80+
'The average number of bytes fetched per request',
81+
'by',
82+
['client-id' : '', 'topic' : expectedTopics.clone() ],
83+
],
84+
[
85+
'kafka.consumer.records-consumed-rate',
86+
'The average number of records consumed per second',
87+
'1',
88+
['client-id' : '', 'topic' : expectedTopics.clone() ],
89+
],
90+
[
91+
'kafka.consumer.records-lag-max',
92+
'Number of messages the consumer lags behind the producer',
93+
'1',
94+
['client-id' : '']
95+
],
96+
[
97+
'kafka.consumer.total.bytes-consumed-rate',
98+
'The average number of bytes consumed for all topics per second',
99+
'by',
100+
['client-id' : '']
101+
],
102+
[
103+
'kafka.consumer.total.fetch-size-avg',
104+
'The average number of bytes fetched per request for all topics',
105+
'by',
106+
['client-id' : '']
107+
],
108+
[
109+
'kafka.consumer.total.records-consumed-rate',
110+
'The average number of records consumed for all topics per second',
111+
'1',
112+
['client-id' : '']
113+
],
114+
].eachWithIndex{ item, index ->
115+
Metric metric = metrics.get(index)
116+
assert metric.name == item[0]
117+
assert metric.description == item[1]
118+
assert metric.unit == item[2]
119+
120+
assert metric.hasDoubleGauge()
121+
def datapoints = metric.doubleGauge
122+
123+
Map<String, String> expectedLabels = item[3]
124+
def expectedLabelCount = expectedLabels.size()
125+
126+
assert datapoints.dataPointsCount == expectedLabelCount == 1 ? 1 : 3
127+
128+
(0..<datapoints.dataPointsCount).each { i ->
129+
def datapoint = datapoints.getDataPoints(i)
130+
131+
List<StringKeyValue> labels = datapoint.labelsList
132+
assert labels.size() == expectedLabelCount
133+
134+
(0..<expectedLabelCount).each { j ->
135+
def key = labels[j].key
136+
assert expectedLabels.containsKey(key)
137+
def value = expectedLabels[key]
138+
if (!value.empty) {
139+
def actual = labels[j].value
140+
assert value.contains(actual)
141+
value.remove(actual)
142+
if (value.empty) {
143+
expectedLabels.remove(key)
144+
}
145+
}
146+
}
147+
}
148+
149+
assert expectedLabels == ['client-id': '']
150+
}
151+
152+
cleanup:
153+
targetContainers.each { it.stop() }
154+
jmxExtensionAppContainer.stop()
155+
}
156+
}

0 commit comments

Comments
 (0)