Skip to content

Commit d46eb77

Browse files
committed
slight change to support both apache and confluent
1 parent 6293d68 commit d46eb77

File tree

3 files changed

+154
-47
lines changed

3 files changed

+154
-47
lines changed

instrumentation/jmx-metrics/library/kafka-connect.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
# Kafka Connect Metrics
22

3-
Here is the list of metrics based on MBeans exposed by Kafka Connect. String-valued JMX
3+
Here is the list of metrics based on MBeans exposed by Apache Kafka Connect. String-valued JMX
44
attributes (class/type/version information) are exported as state metrics with value `1` and
5-
carry the raw string value as metric attributes.
5+
carry the raw string value as metric attributes.
6+
7+
## Compatibility
8+
9+
This rule set targets both Apache Kafka Connect and Confluent Platform. Apache documents several
10+
metrics not surfaced in Confluent docs (worker rebalance protocol, per-connector task counts on
11+
workers, predicate/transform metadata, connector task metadata including converter info, source
12+
transaction size stats, and sink record lag max); all of them are included below. Status metrics use
13+
the superset of values across both variants (connector: running, paused, stopped, failed,
14+
restarting, unassigned, degraded; task: running, paused, failed, restarting, unassigned,
15+
destroyed) and fall back to `unknown` for any new values. Differences in bean placeholder
16+
formatting between the docs are cosmetic; bean names align across both variants.
617

718
## Worker metrics
819

@@ -57,7 +68,7 @@ Attributes: `kafka.connect.connector`, `kafka.connect.connector.class`, `kafka.c
5768
| kafka.connect.connector.class | UpDownCounter | 1 | kafka.connect.connector.class.state | The name of the connector class. |
5869
| kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. |
5970
| kafka.connect.connector.version | UpDownCounter | 1 | kafka.connect.connector.version.state | The version of the connector class, as reported by the connector. |
60-
| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value). |
71+
| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value); accepts running, paused, stopped, failed, restarting, unassigned, degraded, or unknown. |
6172

6273
## Predicate metrics
6374

@@ -89,7 +100,7 @@ Attributes include `kafka.connect.connector`, `kafka.connect.task.id`, connector
89100
| kafka.connect.task.offset.commit.success.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that succeeded. |
90101
| kafka.connect.task.pause.ratio | Gauge | 1 | | The fraction of time this task has spent in the pause state. |
91102
| kafka.connect.task.running.ratio | Gauge | 1 | | The fraction of time this task has spent in the running state. |
92-
| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task. |
103+
| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task; supports running, paused, failed, restarting, unassigned, destroyed, or unknown. |
93104
| kafka.connect.task.class | UpDownCounter | 1 | kafka.connect.task.class.state | The class name of the task. |
94105
| kafka.connect.task.version | UpDownCounter | 1 | kafka.connect.task.version.state | The version of the task. |
95106
| kafka.connect.task.value.converter.class | UpDownCounter | 1 | kafka.connect.task.value.converter.class.state | The fully qualified class name from value.converter. |

instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ rules:
223223
status:
224224
metric: status
225225
type: state
226-
desc: Connector lifecycle state indicator (1 when the state matches the attribute value)
226+
desc: Connector lifecycle state indicator (1 when the state matches the attribute value). Supports Apache and Confluent status values.
227227
metricAttribute:
228228
kafka.connect.connector.state:
229229
running: [running, RUNNING]
@@ -392,14 +392,15 @@ rules:
392392
status:
393393
metric: status
394394
type: state
395-
desc: The status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or 'restarting'.
395+
desc: The status of the connector task. Supports Apache (unassigned, running, paused, failed, restarting) and Confluent (unassigned, running, paused, failed, destroyed) values.
396396
metricAttribute:
397397
kafka.connect.task.state:
398398
running: [running, RUNNING]
399399
failed: [failed, FAILED]
400400
paused: [paused, PAUSED]
401401
unassigned: [unassigned, UNASSIGNED]
402402
restarting: [restarting, RESTARTING]
403+
destroyed: [destroyed, DESTROYED]
403404
unknown: "*"
404405
# kafka.connect.task.class
405406
task-class:

instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java

Lines changed: 136 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,61 +14,156 @@
1414
import io.opentelemetry.instrumentation.jmx.yaml.RuleParser;
1515
import io.opentelemetry.instrumentation.jmx.yaml.StateMapping;
1616
import java.io.InputStream;
17-
import java.util.Optional;
1817
import org.junit.jupiter.api.Test;
1918

2019
class KafkaConnectRuleTest {
2120

2221
@Test
2322
void kafkaConnectConfigParsesAndBuilds() throws Exception {
24-
try (InputStream input =
25-
getClass().getClassLoader().getResourceAsStream("jmx/rules/kafka-connect.yaml")) {
26-
assertThat(input).isNotNull();
27-
28-
JmxConfig config = RuleParser.get().loadConfig(input);
29-
assertThat(config.getRules()).isNotEmpty();
23+
JmxConfig config = loadKafkaConnectConfig();
24+
assertThat(config.getRules()).isNotEmpty();
3025

31-
// ensure all metric definitions build without throwing
32-
for (JmxRule rule : config.getRules()) {
33-
assertThatCode(rule::buildMetricDef).doesNotThrowAnyException();
34-
}
26+
// ensure all metric definitions build without throwing
27+
for (JmxRule rule : config.getRules()) {
28+
assertThatCode(rule::buildMetricDef).doesNotThrowAnyException();
3529
}
3630
}
3731

3832
@Test
3933
void connectorStatusStateMappingPresent() throws Exception {
34+
JmxConfig config = loadKafkaConnectConfig();
35+
36+
JmxRule connectorRule =
37+
getRuleForBean(config, "kafka.connect:type=connector-metrics,connector=*");
38+
39+
StateMapping stateMapping = getMetric(connectorRule, "status").getStateMapping();
40+
assertThat(stateMapping.isEmpty()).isFalse();
41+
assertThat(stateMapping.getStateKeys())
42+
.contains(
43+
"running",
44+
"failed",
45+
"paused",
46+
"unassigned",
47+
"restarting",
48+
"degraded",
49+
"stopped",
50+
"unknown");
51+
assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown");
52+
assertThat(stateMapping.getStateValue("RUNNING")).isEqualTo("running");
53+
assertThat(stateMapping.getStateValue("FAILED")).isEqualTo("failed");
54+
assertThat(stateMapping.getStateValue("PAUSED")).isEqualTo("paused");
55+
assertThat(stateMapping.getStateValue("UNKNOWN")).isEqualTo("unknown");
56+
}
57+
58+
@Test
59+
void taskStatusStateMappingSuperset() throws Exception {
60+
JmxConfig config = loadKafkaConnectConfig();
61+
62+
JmxRule connectorTaskRule =
63+
getRuleForBean(
64+
config, "kafka.connect:type=connector-task-metrics,connector=*,task=*");
65+
66+
StateMapping stateMapping = getMetric(connectorTaskRule, "status").getStateMapping();
67+
assertThat(stateMapping.isEmpty()).isFalse();
68+
assertThat(stateMapping.getStateKeys())
69+
.contains(
70+
"running",
71+
"failed",
72+
"paused",
73+
"unassigned",
74+
"restarting",
75+
"destroyed",
76+
"unknown");
77+
assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown");
78+
assertThat(stateMapping.getStateValue("DESTROYED")).isEqualTo("destroyed");
79+
assertThat(stateMapping.getStateValue("RESTARTING")).isEqualTo("restarting");
80+
assertThat(stateMapping.getStateValue("unexpected")).isEqualTo("unknown");
81+
}
82+
83+
@Test
84+
void apacheSpecificMetricsPresent() throws Exception {
85+
JmxConfig config = loadKafkaConnectConfig();
86+
87+
assertMappingContains(
88+
config, "kafka.connect:type=connect-worker-rebalance-metrics", "connect-protocol");
89+
90+
assertMappingContains(
91+
config,
92+
"kafka.connect:type=connect-worker-metrics,connector=*",
93+
"connector-destroyed-task-count",
94+
"connector-failed-task-count",
95+
"connector-paused-task-count",
96+
"connector-restarting-task-count",
97+
"connector-running-task-count",
98+
"connector-total-task-count",
99+
"connector-unassigned-task-count");
100+
101+
assertMappingContains(
102+
config,
103+
"kafka.connect:type=connector-predicate-metrics,connector=*,task=*,predicate=*",
104+
"predicate-class",
105+
"predicate-version");
106+
107+
assertMappingContains(
108+
config,
109+
"kafka.connect:type=connector-transform-metrics,connector=*,task=*,transform=*",
110+
"transform-class",
111+
"transform-version");
112+
113+
assertMappingContains(
114+
config,
115+
"kafka.connect:type=connector-task-metrics,connector=*,task=*",
116+
"connector-class",
117+
"connector-type",
118+
"connector-version",
119+
"header-converter-class",
120+
"header-converter-version",
121+
"key-converter-class",
122+
"key-converter-version",
123+
"task-class",
124+
"task-version",
125+
"value-converter-class",
126+
"value-converter-version");
127+
128+
assertMappingContains(
129+
config,
130+
"kafka.connect:type=source-task-metrics,connector=*,task=*",
131+
"transaction-size-avg",
132+
"transaction-size-max",
133+
"transaction-size-min");
134+
135+
assertMappingContains(
136+
config,
137+
"kafka.connect:type=sink-task-metrics,connector=*,task=*",
138+
"sink-record-lag-max");
139+
}
140+
141+
private JmxConfig loadKafkaConnectConfig() throws Exception {
40142
try (InputStream input =
41143
getClass().getClassLoader().getResourceAsStream("jmx/rules/kafka-connect.yaml")) {
42-
JmxConfig config = RuleParser.get().loadConfig(input);
43-
44-
Optional<JmxRule> connectorRule =
45-
config.getRules().stream()
46-
.filter(
47-
rule ->
48-
rule.getBeans().contains("kafka.connect:type=connector-metrics,connector=*"))
49-
.findFirst();
50-
assertThat(connectorRule).isPresent();
51-
52-
Metric statusMetric = connectorRule.get().getMapping().get("status");
53-
assertThat(statusMetric).isNotNull();
54-
55-
StateMapping stateMapping = statusMetric.getStateMapping();
56-
assertThat(stateMapping.isEmpty()).isFalse();
57-
assertThat(stateMapping.getStateKeys())
58-
.contains(
59-
"running",
60-
"failed",
61-
"paused",
62-
"unassigned",
63-
"restarting",
64-
"degraded",
65-
"stopped",
66-
"unknown");
67-
assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown");
68-
assertThat(stateMapping.getStateValue("RUNNING")).isEqualTo("running");
69-
assertThat(stateMapping.getStateValue("FAILED")).isEqualTo("failed");
70-
assertThat(stateMapping.getStateValue("PAUSED")).isEqualTo("paused");
71-
assertThat(stateMapping.getStateValue("UNKNOWN")).isEqualTo("unknown");
144+
assertThat(input).isNotNull();
145+
return RuleParser.get().loadConfig(input);
72146
}
73147
}
148+
149+
private static JmxRule getRuleForBean(JmxConfig config, String bean) {
150+
return config.getRules().stream()
151+
.filter(rule -> rule.getBeans().contains(bean))
152+
.findFirst()
153+
.orElseThrow(() -> new AssertionError("Missing rule for bean " + bean));
154+
}
155+
156+
private static Metric getMetric(JmxRule rule, String metricKey) {
157+
Metric metric = rule.getMapping().get(metricKey);
158+
if (metric == null) {
159+
throw new AssertionError("Missing metric " + metricKey + " in rule " + rule.getBeans());
160+
}
161+
return metric;
162+
}
163+
164+
private static void assertMappingContains(
165+
JmxConfig config, String bean, String... metricKeys) {
166+
JmxRule rule = getRuleForBean(config, bean);
167+
assertThat(rule.getMapping().keySet()).contains(metricKeys);
168+
}
74169
}

0 commit comments

Comments
 (0)