-
Notifications
You must be signed in to change notification settings - Fork 1k
Add Kafka Connect as a built‑in JMX metrics target #15561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@SylvainJuge could you review this |
SylvainJuge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aaaugustine29, thanks for opening this!
There are quite a lot of metrics added here, so it makes it quite challenging to review them all.
I don't have any expertise in Kafka Connect, so you are probably more knowledgeable here.
I would suggest to :
- implement test with a real instance of the target system, ideally the two apache/confluent variants
- as a first step, focus on the "essential" metrics, do not include everything that is available, this is where your knowledge might be useful
- try to simplify the the maximum by using metric attributes to provide breakdown when possible if the metrics represent a partition (for example on state).
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.extension.RegisterExtension; | ||
|
|
||
| class KafkaConnectRuleTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests mocks the actual kafka connect instance JMX, while this makes testing fast and easier to run without the target kafka connect system, this also means the tests and metrics definitions can easily drift from the actual implementation.
So, the tests here only test the metric mapping is what we expect, not that this mapping actually works as expected on a real kafka connect instance. In order to solve this I would recommend to add a test with a real kafka connect like we have with other systems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will do, I was trying to avoid adding too heavy of tests but I'm glad to hear that's acceptable. I'll make sure to get those in there soon! In this case, I'll look into the lightest way to do so and commit.
|
@SylvainJuge Thanks for your help and guidance. At this point, the metrics have been reduced to the minimum set without losing any information. That being said, that doesn't mean we need to keep everything. In particular, your previous comment brings up the opportunity for consolidating some of them with metric attributes. However, there will be a loss of info for a niche and advanced group. What's your guidance on this? And to clarify your comment about testing, having tests that actually instantiate a kafka connect cluster will be very heavy, I could emulate what the apache jmx server would produce, would that be sufficient? |
| connector-startup-attempts-total: | ||
| metric: connector.startup.attempts | ||
| type: counter | ||
| unit: "{attempt}" | ||
| desc: The total number of connector startups that this worker has attempted. | ||
| # kafka.connect.worker.connector.startup.failure.total | ||
| connector-startup-failure-total: | ||
| metric: connector.startup.failures | ||
| type: counter | ||
| unit: "{startup}" | ||
| desc: The total number of connector starts that failed. | ||
| # kafka.connect.worker.connector.startup.success.total | ||
| connector-startup-success-total: | ||
| metric: connector.startup.successes | ||
| type: counter | ||
| unit: "{startup}" | ||
| desc: The total number of connector starts that succeeded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the metrics definitions we can assume that connector-startup-attempts-total = connector-startup-failure-total + connector-startup-success-total.
This assumption is confirmed when we look for the probable implementation in https://github.com/apache/kafka/blob/83dc0d7eae2940ea26781276b3dfee5ed65dba15/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java#L80
From the implementation we know the following:
- the total startup attempt count is incremented when the startup is known to be a failure/success, so after the startup attempt is completed/failed. There is no state where the attempt is "in progress"
- the sum of failure + success will always be equal to the total, thus we don't need an extra metric
This means that we could capture a single kafka.connect.worker.connector.startup metric with a breakdown of the startup result by kafka.connect.worker.connector.startup.result = failure | success
With such a metric, we have the following:
- the total number of startup attempts is provided by discarding the
kafka.connect.worker.connector.startup.resultattribute and aggregating (sum) - the total number of startup success/failures is provided by filtering on the value of
kafka.connect.worker.connector.startup.resultattribute.
Also, we can use yaml anchors to avoid duplication and ensure this is captured in the same metric. The implementation should look like this:
| connector-startup-attempts-total: | |
| metric: connector.startup.attempts | |
| type: counter | |
| unit: "{attempt}" | |
| desc: The total number of connector startups that this worker has attempted. | |
| # kafka.connect.worker.connector.startup.failure.total | |
| connector-startup-failure-total: | |
| metric: connector.startup.failures | |
| type: counter | |
| unit: "{startup}" | |
| desc: The total number of connector starts that failed. | |
| # kafka.connect.worker.connector.startup.success.total | |
| connector-startup-success-total: | |
| metric: connector.startup.successes | |
| type: counter | |
| unit: "{startup}" | |
| desc: The total number of connector starts that succeeded. | |
| # kafka.connect.worker.connector.startup | |
| connector-startup-failure-total: | |
| metric: &metric connector.startup | |
| type: &type counter | |
| unit: &unit "{startup}" | |
| desc: &desc The total number of connector starts. | |
| metricAttribute: | |
| kafka.connect.worker.connector.startup.result: const(failure) | |
| # kafka.connect.worker.connector.startup.success.total | |
| connector-startup-success-total: | |
| metric: *metric | |
| type: *type | |
| unit: &unit | |
| desc: &desc | |
| metricAttribute: | |
| kafka.connect.worker.connector.startup.result: const(success) |
| # kafka.connect.worker.task.startup.attempts | ||
| task-startup-attempts-total: | ||
| metric: task.startup.attempts | ||
| type: counter | ||
| unit: "{attempt}" | ||
| desc: The total number of task startups that this worker has attempted. | ||
| # kafka.connect.worker.task.startup.failure.total | ||
| task-startup-failure-total: | ||
| metric: task.startup.failures | ||
| type: counter | ||
| unit: "{startup}" | ||
| desc: The total number of task starts that failed. | ||
| # kafka.connect.worker.task.startup.success.total | ||
| task-startup-success-total: | ||
| metric: task.startup.successes | ||
| type: counter | ||
| unit: "{startup}" | ||
| desc: The total number of task starts that succeeded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do the same here with a single kafka.connect.worker.task.startup metric.
| # kafka.connect.worker.connector.task.destroyed | ||
| connector-destroyed-task-count: | ||
| metric: destroyed | ||
| desc: The number of destroyed tasks of the connector on the worker. | ||
| # kafka.connect.worker.connector.task.failed | ||
| connector-failed-task-count: | ||
| metric: failed | ||
| desc: The number of failed tasks of the connector on the worker. | ||
| # kafka.connect.worker.connector.task.paused | ||
| connector-paused-task-count: | ||
| metric: paused | ||
| desc: The number of paused tasks of the connector on the worker. | ||
| # kafka.connect.worker.connector.task.restarting | ||
| connector-restarting-task-count: | ||
| metric: restarting | ||
| desc: The number of restarting tasks of the connector on the worker. | ||
| # kafka.connect.worker.connector.task.running | ||
| connector-running-task-count: | ||
| metric: running | ||
| desc: The number of running tasks of the connector on the worker. | ||
| # kafka.connect.worker.connector.task.total | ||
| connector-total-task-count: | ||
| metric: total | ||
| desc: The number of tasks of the connector on the worker. | ||
| # kafka.connect.worker.connector.task.unassigned | ||
| connector-unassigned-task-count: | ||
| metric: unassigned | ||
| desc: The number of unassigned tasks of the connector on the worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those metrics have been added about 6 years ago with this PR: https://github.com/apache/kafka/pull/6843/files
A quick look at the implementation seems to indicate that those metrics are produced by iterating over a list of tasks and counting each state. That means the list of states that is expressed here is a complete partition over the all the possible task states.
So here we can replace those 7 metrics with a single kafka.connect.woker.connector.task with a breakdown on kafka.connect.worker.connector.task.state metric attribute.
Overview:
This change introduces Kafka Connect as a first‑class JMX target system in the JMX metrics library. It adds a ruleset and documentation that cover both Apache Kafka Connect and Confluent Platform variants from the outset, so users can enable Kafka Connect monitoring without custom YAML.
Details:
Added kafka-connect.yaml JMX rules that map worker, rebalance, connector, task, source/sink task, and task-error MBeans into OpenTelemetry metrics, including Apache‑only metrics (e.g., worker rebalance protocol, per‑connector task counts, predicate/transform metadata, converter metadata, source transaction sizes, sink record lag max).
Defined connector and task status as state metrics using the superset of status values across Apache and Confluent, to avoid vendor‑specific enum mismatches.
Documented the new target in kafka-connect.md, including metric groups, attributes, and the dual‑vendor compatibility model (no renames; Apache list as a superset of Confluent docs).
Added self‑contained tests for the Kafka Connect rules that load the YAML, build metric definitions, and validate key state mappings and metric presence, ensuring the new target is ready to consume from day one.
Testing:
./gradlew -Dorg.gradle.configuration-cache.parallel=false instrumentation:jmx-metrics:library:test