Skip to content

Commit d73c2d7

Browse files
JMX: Add Kafka producer target script (#27)
1 parent f69af44 commit d73c2d7

File tree

9 files changed

+352
-16
lines changed

9 files changed

+352
-16
lines changed

contrib/jmx-metrics/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ The currently available target systems are:
147147
| [`cassandra`](./docs/target-systems/cassandra.md) |
148148
| [`kafka`](./docs/target-systems/kafka.md) |
149149
| [`kafka-consumer`](./docs/target-systems/kafka-consumer.md) |
150+
| [`kafka-producer`](./docs/target-systems/kafka-producer.md) |
150151

151152
### Configuration
152153

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Kafka Producer Metrics
2+
3+
The JMX Metric Gatherer provides built in Kafka producer metric gathering capabilities for versions v0.8.2.x and above.
4+
These metrics are sourced from Kafka's exposed Yammer metrics for each instance: https://kafka.apache.org/documentation/#monitoring
5+
6+
## Metrics
7+
8+
### Producer Metrics
9+
10+
* Name: `kafka.producer.io-wait-time-ns-avg`
11+
* Description: The average length of time the I/O thread spent waiting for a socket ready for reads or writes.
12+
* Unit: `ns`
13+
* Labels: `client-id`
14+
* Instrument Type: DoubleValueObserver
15+
16+
* Name: `kafka.producer.outgoing-byte-rate`
17+
* Description: The average number of outgoing bytes sent per second to all servers.
18+
* Unit: `by`
19+
* Labels: `client-id`
20+
* Instrument Type: DoubleValueObserver
21+
22+
* Name: `kafka.producer.request-latency-avg`
23+
* Description: The average request latency.
24+
* Unit: `ms`
25+
* Labels: `client-id`
26+
* Instrument Type: DoubleValueObserver
27+
28+
* Name: `kafka.producer.request-rate`
29+
* Description: The average number of requests sent per second.
30+
* Unit: `1`
31+
* Labels: `client-id`
32+
* Instrument Type: DoubleValueObserver
33+
34+
* Name: `kafka.producer.response-rate`
35+
* Description: Responses received per second.
36+
* Unit: `1`
37+
* Labels: `client-id`
38+
* Instrument Type: DoubleValueObserver
39+
40+
### Per-Topic Producer Metrics
41+
42+
* Name: `kafka.producer.byte-rate`
43+
* Description: The average number of bytes sent per second for a topic.
44+
* Unit: `by`
45+
* Labels: `client-id`, `topic`
46+
* Instrument Type: DoubleValueObserver
47+
48+
* Name: `kafka.producer.compression-rate`
49+
* Description: The average compression rate of record batches for a topic.
50+
* Unit: `1`
51+
* Labels: `client-id`, `topic`
52+
* Instrument Type: DoubleValueObserver
53+
54+
* Name: `kafka.producer.record-error-rate`
55+
* Description: The average per-second number of record sends that resulted in errors for a topic.
56+
* Unit: `1`
57+
* Labels: `client-id`, `topic`
58+
* Instrument Type: DoubleValueObserver
59+
60+
* Name: `kafka.producer.record-retry-rate`
61+
* Description: The average per-second number of retried record sends for a topic.
62+
* Unit: `1`
63+
* Labels: `client-id`, `topic`
64+
* Instrument Type: DoubleValueObserver
65+
66+
* Name: `kafka.producer.record-send-rate`
67+
* Description: The average number of records sent per second for a topic.
68+
* Unit: `1`
69+
* Labels: `client-id`, `topic`
70+
* Instrument Type: DoubleValueObserver

contrib/jmx-metrics/src/main/java/io/opentelemetry/contrib/jmxmetrics/JmxConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class JmxConfig {
4040
static final String JMX_REALM = PREFIX + "jmx.realm";
4141

4242
static final List<String> AVAILABLE_TARGET_SYSTEMS =
43-
Arrays.asList("cassandra", "jvm", "kafka", "kafka-consumer");
43+
Arrays.asList("cassandra", "jvm", "kafka", "kafka-consumer", "kafka-producer");
4444

4545
final String serviceUrl;
4646
final String groovyScript;
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
def producerMetrics = otel.mbeans("kafka.producer:client-id=*,type=producer-metrics")
18+
otel.instrument(producerMetrics, "kafka.producer.io-wait-time-ns-avg",
19+
"The average length of time the I/O thread spent waiting for a socket ready for reads or writes", "ns",
20+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
21+
"io-wait-time-ns-avg", otel.&doubleValueObserver)
22+
otel.instrument(producerMetrics, "kafka.producer.outgoing-byte-rate",
23+
"The average number of outgoing bytes sent per second to all servers", "by",
24+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
25+
"outgoing-byte-rate", otel.&doubleValueObserver)
26+
otel.instrument(producerMetrics, "kafka.producer.request-latency-avg",
27+
"The average request latency", "ms",
28+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
29+
"request-latency-avg", otel.&doubleValueObserver)
30+
otel.instrument(producerMetrics, "kafka.producer.request-rate",
31+
"The average number of requests sent per second", "1",
32+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
33+
"request-rate", otel.&doubleValueObserver)
34+
otel.instrument(producerMetrics, "kafka.producer.response-rate",
35+
"Responses received per second", "1",
36+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
37+
"response-rate", otel.&doubleValueObserver)
38+
39+
def producerTopicMetrics = otel.mbeans("kafka.producer:client-id=*,topic=*,type=producer-topic-metrics")
40+
otel.instrument(producerTopicMetrics, "kafka.producer.byte-rate",
41+
"The average number of bytes sent per second for a topic", "by",
42+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
43+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
44+
"byte-rate", otel.&doubleValueObserver)
45+
otel.instrument(producerTopicMetrics, "kafka.producer.compression-rate",
46+
"The average compression rate of record batches for a topic", "1",
47+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
48+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
49+
"compression-rate", otel.&doubleValueObserver)
50+
otel.instrument(producerTopicMetrics, "kafka.producer.record-error-rate",
51+
"The average per-second number of record sends that resulted in errors for a topic", "1",
52+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
53+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
54+
"record-error-rate", otel.&doubleValueObserver)
55+
otel.instrument(producerTopicMetrics, "kafka.producer.record-retry-rate",
56+
"The average per-second number of retried record sends for a topic", "1",
57+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
58+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
59+
"record-retry-rate", otel.&doubleValueObserver)
60+
otel.instrument(producerTopicMetrics, "kafka.producer.record-send-rate",
61+
"The average number of records sent per second for a topic", "1",
62+
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
63+
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
64+
"record-send-rate", otel.&doubleValueObserver)

contrib/jmx-metrics/src/test/groovy/io/opentelemetry/contrib/jmxmetrics/IntegrationTest.groovy

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class IntegrationTest extends Specification{
9797
)
9898
}
9999

100-
if (["kafka", "kafka-consumer"].any { it in targets }) {
100+
if (targets.any { it.contains("kafka") }) {
101101
def zookeeper = new GenericContainer<>("zookeeper:3.5")
102102
.withNetwork(network)
103103
.withNetworkAliases("zookeeper")
@@ -113,7 +113,12 @@ class IntegrationTest extends Specification{
113113
.waitingFor(Wait.forListeningPort())
114114
.dependsOn(zookeeper)
115115
targetContainers.add(kafka)
116-
if ("kafka-consumer" in targets) {
116+
if (targets.any {
117+
it in [
118+
"kafka-consumer",
119+
"kafka-producer"
120+
]
121+
}) {
117122
def createTopics = new Startable() {
118123
@Override
119124
void start() {
@@ -126,17 +131,32 @@ class IntegrationTest extends Specification{
126131
@Override
127132
void stop() { }
128133
}
129-
def kafkaConsumer = new GenericContainer<>("bitnami/kafka:latest")
130-
.withNetwork(network)
131-
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
132-
.withNetworkAliases("kafka-consumer")
133-
.withExposedPorts(7199)
134-
.withCommand("kafka-console-consumer.sh", "--bootstrap-server", "kafka:9092",
135-
"--whitelist", "test-topic-.*", "--max-messages", "100"
136-
).withStartupTimeout(Duration.ofSeconds(120))
137-
.waitingFor(Wait.forListeningPort())
138-
.dependsOn(kafka, createTopics)
139-
targetContainers.add(kafkaConsumer)
134+
if ("kafka-consumer" in targets) {
135+
def kafkaConsumer = new GenericContainer<>("bitnami/kafka:latest")
136+
.withNetwork(network)
137+
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
138+
.withNetworkAliases("kafka-consumer")
139+
.withExposedPorts(7199)
140+
.withCommand("kafka-console-consumer.sh", "--bootstrap-server", "kafka:9092",
141+
"--whitelist", "test-topic-.*", "--max-messages", "100"
142+
).withStartupTimeout(Duration.ofSeconds(120))
143+
.waitingFor(Wait.forListeningPort())
144+
.dependsOn(kafka, createTopics)
145+
targetContainers.add(kafkaConsumer)
146+
} else {
147+
def producerPath = ClassLoader.getSystemClassLoader().getResource("target-systems/kafka-producer.sh").path
148+
def kafkaProducer = new GenericContainer<>("bitnami/kafka:latest")
149+
.withNetwork(network)
150+
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
151+
.withNetworkAliases("kafka-producer")
152+
.withExposedPorts(7199)
153+
.withCopyFileToContainer(MountableFile.forHostPath(producerPath), "/usr/bin/kafka-producer.sh")
154+
.withCommand( "kafka-producer.sh")
155+
.withStartupTimeout(Duration.ofSeconds(120))
156+
.waitingFor(Wait.forListeningPort())
157+
.dependsOn(kafka, createTopics)
158+
targetContainers.add(kafkaProducer)
159+
}
140160
}
141161
}
142162

contrib/jmx-metrics/src/test/groovy/io/opentelemetry/contrib/jmxmetrics/JmxConfigTest.groovy

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ class JmxConfigTest extends UnitTest {
2525
"cassandra",
2626
"jvm",
2727
"kafka",
28-
"kafka-consumer"
28+
"kafka-consumer",
29+
"kafka-producer"
2930
]
3031
}
3132

@@ -144,6 +145,6 @@ class JmxConfigTest extends UnitTest {
144145

145146
expect: 'config fails to validate'
146147
raised != null
147-
raised.message == "unavailabletargetsystem must be one of [cassandra, jvm, kafka, kafka-consumer]"
148+
raised.message == "unavailabletargetsystem must be one of [cassandra, jvm, kafka, kafka-consumer, kafka-producer]"
148149
}
149150
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.contrib.jmxmetrics
18+
19+
import io.opentelemetry.proto.common.v1.StringKeyValue
20+
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
21+
import io.opentelemetry.proto.metrics.v1.Metric
22+
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
23+
import org.testcontainers.Testcontainers
24+
import spock.lang.Requires
25+
import spock.lang.Timeout
26+
27+
@Requires({
28+
System.getProperty('ojc.integration.tests') == 'true'
29+
})
30+
@Timeout(60)
31+
class KafkaProducerTargetSystemIntegrationTests extends OtlpIntegrationTest {
32+
33+
def 'end to end'() {
34+
setup: 'we configure JMX metrics gatherer and target server to use Kafka as target system'
35+
targets = ["kafka-producer"]
36+
Testcontainers.exposeHostPorts(otlpPort)
37+
configureContainers('target-systems/kafka-producer.properties', otlpPort, 0, false)
38+
39+
expect:
40+
when: 'we receive metrics from the JMX metric gatherer'
41+
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
42+
then: 'they are of the expected size'
43+
receivedMetrics.size() == 1
44+
45+
when: "we examine the received metric's instrumentation library metrics lists"
46+
ResourceMetrics receivedMetric = receivedMetrics.get(0)
47+
List<InstrumentationLibraryMetrics> ilMetrics =
48+
receivedMetric.instrumentationLibraryMetricsList
49+
then: 'they of the expected size'
50+
ilMetrics.size() == 1
51+
52+
when: 'we examine the instrumentation library metric metrics list'
53+
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
54+
ArrayList<Metric> metrics = ilMetric.metricsList as ArrayList
55+
metrics.sort{ a, b -> a.name <=> b.name}
56+
then: 'they are of the expected size and content'
57+
metrics.size() == 10
58+
59+
[
60+
[
61+
"kafka.producer.byte-rate",
62+
"The average number of bytes sent per second for a topic",
63+
"by",
64+
['client-id' : '', 'topic' : ['test-topic-1']],
65+
],
66+
[
67+
"kafka.producer.compression-rate",
68+
"The average compression rate of record batches for a topic",
69+
"1",
70+
['client-id' : '', 'topic' : ['test-topic-1']],
71+
],
72+
[
73+
"kafka.producer.io-wait-time-ns-avg",
74+
"The average length of time the I/O thread spent waiting for a socket ready for reads or writes",
75+
"ns",
76+
['client-id' : ''],
77+
],
78+
[
79+
"kafka.producer.outgoing-byte-rate",
80+
"The average number of outgoing bytes sent per second to all servers",
81+
"by",
82+
['client-id' : ''],
83+
],
84+
[
85+
"kafka.producer.record-error-rate",
86+
"The average per-second number of record sends that resulted in errors for a topic",
87+
"1",
88+
['client-id' : '', 'topic' : ['test-topic-1']],
89+
],
90+
[
91+
"kafka.producer.record-retry-rate",
92+
"The average per-second number of retried record sends for a topic",
93+
"1",
94+
['client-id' : '', 'topic' : ['test-topic-1']],
95+
],
96+
[
97+
"kafka.producer.record-send-rate",
98+
"The average number of records sent per second for a topic",
99+
"1",
100+
['client-id' : '', 'topic' : ['test-topic-1']],
101+
],
102+
[
103+
"kafka.producer.request-latency-avg",
104+
"The average request latency",
105+
"ms",
106+
['client-id' : ''],
107+
],
108+
[
109+
"kafka.producer.request-rate",
110+
"The average number of requests sent per second",
111+
"1",
112+
['client-id' : ''],
113+
],
114+
[
115+
"kafka.producer.response-rate",
116+
"Responses received per second",
117+
"1",
118+
['client-id' : ''],
119+
],
120+
].eachWithIndex{ item, index ->
121+
Metric metric = metrics.get(index)
122+
assert metric.name == item[0]
123+
assert metric.description == item[1]
124+
assert metric.unit == item[2]
125+
126+
assert metric.hasDoubleGauge()
127+
def datapoints = metric.doubleGauge
128+
129+
Map<String, String> expectedLabels = item[3]
130+
def expectedLabelCount = expectedLabels.size()
131+
132+
assert datapoints.dataPointsCount == 1
133+
134+
def datapoint = datapoints.getDataPoints(0)
135+
136+
List<StringKeyValue> labels = datapoint.labelsList
137+
assert labels.size() == expectedLabelCount
138+
139+
(0..<expectedLabelCount).each { j ->
140+
def key = labels[j].key
141+
assert expectedLabels.containsKey(key)
142+
def value = expectedLabels[key]
143+
if (!value.empty) {
144+
def actual = labels[j].value
145+
assert value.contains(actual)
146+
value.remove(actual)
147+
if (value.empty) {
148+
expectedLabels.remove(key)
149+
}
150+
}
151+
}
152+
153+
assert expectedLabels == ['client-id': '']
154+
}
155+
156+
cleanup:
157+
targetContainers.each { it.stop() }
158+
println jmxExtensionAppContainer.getLogs()
159+
jmxExtensionAppContainer.stop()
160+
}
161+
}

0 commit comments

Comments
 (0)