From 6293d683d6aa8e4ae0767d5c65a06e74b16efc42 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Sat, 6 Dec 2025 13:28:52 -0500 Subject: [PATCH 01/13] initial commit with changes --- .../jmx-metrics/library/kafka-connect.md | 162 ++++ .../resources/jmx/rules/kafka-connect.yaml | 704 ++++++++++++++++++ .../jmx/rules/KafkaConnectRuleTest.java | 74 ++ 3 files changed, 940 insertions(+) create mode 100644 instrumentation/jmx-metrics/library/kafka-connect.md create mode 100644 instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml create mode 100644 instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md new file mode 100644 index 000000000000..55aa4e5624f8 --- /dev/null +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -0,0 +1,162 @@ +# Kafka Connect Metrics + +Here is the list of metrics based on MBeans exposed by Kafka Connect. String-valued JMX +attributes (class/type/version information) are exported as state metrics with value `1` and +carry the raw string value as metric attributes. + +## Worker metrics + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.worker.connector.count | UpDownCounter | {connector} | | The number of connectors run in this worker. | +| kafka.connect.worker.connector.startup.attempts | Counter | {attempt} | | The total number of connector startups that this worker has attempted. | +| kafka.connect.worker.connector.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that failed. | +| kafka.connect.worker.connector.startup.failure.total | Counter | {startup} | | The total number of connector starts that failed. | +| kafka.connect.worker.connector.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that succeeded. | +| kafka.connect.worker.connector.startup.success.total | Counter | {startup} | | The total number of connector starts that succeeded. | +| kafka.connect.worker.task.count | UpDownCounter | {task} | | The number of tasks run in this worker. | +| kafka.connect.worker.task.startup.attempts | Counter | {attempt} | | The total number of task startups that this worker has attempted. | +| kafka.connect.worker.task.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that failed. | +| kafka.connect.worker.task.startup.failure.total | Counter | {startup} | | The total number of task starts that failed. | +| kafka.connect.worker.task.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that succeeded. | +| kafka.connect.worker.task.startup.success.total | Counter | {startup} | | The total number of task starts that succeeded. | + +## Worker connector task metrics + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.worker.connector.task.destroyed | UpDownCounter | {task} | kafka.connect.connector | The number of destroyed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.failed | UpDownCounter | {task} | kafka.connect.connector | The number of failed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.paused | UpDownCounter | {task} | kafka.connect.connector | The number of paused tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.restarting | UpDownCounter | {task} | kafka.connect.connector | The number of restarting tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.running | UpDownCounter | {task} | kafka.connect.connector | The number of running tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.total | UpDownCounter | {task} | kafka.connect.connector | The number of tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.unassigned | UpDownCounter | {task} | kafka.connect.connector | The number of unassigned tasks of the connector on the worker. | + +## Worker rebalance metrics + +All metrics include `kafka.connect.worker.leader`. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.worker.rebalance.completed.total | Counter | {rebalance} | | The total number of rebalances completed by this worker. | +| kafka.connect.worker.rebalance.protocol | UpDownCounter | 1 | kafka.connect.protocol.state | The Connect protocol used by this cluster. | +| kafka.connect.worker.rebalance.epoch | UpDownCounter | {epoch} | | The epoch or generation number of this worker. | +| kafka.connect.worker.rebalance.leader | UpDownCounter | 1 | kafka.connect.worker.leader.state | The name of the group leader. | +| kafka.connect.worker.rebalance.avg.time | Gauge | s | | The average time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.max.time | Gauge | s | | The maximum time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.active | UpDownCounter | 1 | kafka.connect.worker.rebalance.state | Whether this worker is currently rebalancing. | +| kafka.connect.worker.rebalance.since_last | Gauge | s | | The time in milliseconds since this worker completed the most recent rebalance. | + +## Connector metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.connector.class`, `kafka.connect.connector.version`, `kafka.connect.connector.type.raw`. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.connector.class | UpDownCounter | 1 | kafka.connect.connector.class.state | The name of the connector class. | +| kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. | +| kafka.connect.connector.version | UpDownCounter | 1 | kafka.connect.connector.version.state | The version of the connector class, as reported by the connector. | +| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value). | + +## Predicate metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`, `kafka.connect.predicate`, `kafka.connect.predicate.class`, `kafka.connect.predicate.version`. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.predicate.class | UpDownCounter | 1 | kafka.connect.predicate.class.state | The class name of the predicate class. | +| kafka.connect.predicate.version | UpDownCounter | 1 | kafka.connect.predicate.version.state | The version of the predicate class. | + +## Connector task metrics + +Attributes include `kafka.connect.connector`, `kafka.connect.task.id`, connector class/type/version, converter class/version attributes, and task class/version. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.task.batch.size.avg | Gauge | {record} | | The average number of records in the batches the task has processed so far. | +| kafka.connect.task.batch.size.max | Gauge | {record} | | The number of records in the largest batch the task has processed so far. | +| kafka.connect.task.connector.class | UpDownCounter | 1 | kafka.connect.task.connector.class.state | The name of the connector class. | +| kafka.connect.task.connector.type | UpDownCounter | 1 | kafka.connect.task.connector.type | The type of the connector. One of 'source' or 'sink'. | +| kafka.connect.task.connector.version | UpDownCounter | 1 | kafka.connect.task.connector.version.state | The version of the connector class, as reported by the connector. | +| kafka.connect.task.header.converter.class | UpDownCounter | 1 | kafka.connect.task.header.converter.class.state | The fully qualified class name from header.converter. | +| kafka.connect.task.header.converter.version | UpDownCounter | 1 | kafka.connect.task.header.converter.version.state | The version instantiated for header.converter. May be undefined. | +| kafka.connect.task.key.converter.class | UpDownCounter | 1 | kafka.connect.task.key.converter.class.state | The fully qualified class name from key.converter. | +| kafka.connect.task.key.converter.version | UpDownCounter | 1 | kafka.connect.task.key.converter.version.state | The version instantiated for key.converter. May be undefined. | +| kafka.connect.task.offset.commit.avg.time | Gauge | s | | The average time in milliseconds taken by this task to commit offsets. | +| kafka.connect.task.offset.commit.failure.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that failed. | +| kafka.connect.task.offset.commit.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to commit offsets. | +| kafka.connect.task.offset.commit.success.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that succeeded. | +| kafka.connect.task.pause.ratio | Gauge | 1 | | The fraction of time this task has spent in the pause state. | +| kafka.connect.task.running.ratio | Gauge | 1 | | The fraction of time this task has spent in the running state. | +| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task. | +| kafka.connect.task.class | UpDownCounter | 1 | kafka.connect.task.class.state | The class name of the task. | +| kafka.connect.task.version | UpDownCounter | 1 | kafka.connect.task.version.state | The version of the task. | +| kafka.connect.task.value.converter.class | UpDownCounter | 1 | kafka.connect.task.value.converter.class.state | The fully qualified class name from value.converter. | +| kafka.connect.task.value.converter.version | UpDownCounter | 1 | kafka.connect.task.value.converter.version.state | The version instantiated for value.converter. May be undefined. | + +## Transform metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`, `kafka.connect.transform`, `kafka.connect.transform.class`, `kafka.connect.transform.version`. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.transform.class | UpDownCounter | 1 | kafka.connect.transform.class.state | The class name of the transformation class. | +| kafka.connect.transform.version | UpDownCounter | 1 | kafka.connect.transform.version.state | The version of the transformation class. | + +## Sink task metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.sink.offset.commit.completion.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were completed successfully. | +| kafka.connect.sink.offset.commit.completion.total | Counter | {commit} | | The total number of offset commit completions that were completed successfully. | +| kafka.connect.sink.offset.commit.seq | UpDownCounter | {sequence} | | The current sequence number for offset commits. | +| kafka.connect.sink.offset.commit.skip.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were received too late and skipped/ignored. | +| kafka.connect.sink.offset.commit.skip.total | Counter | {commit} | | The total number of offset commit completions that were received too late and skipped/ignored. | +| kafka.connect.sink.partition.count | UpDownCounter | {partition} | | The number of topic partitions assigned to this task. | +| kafka.connect.sink.put.batch.avg.time | Gauge | s | | The average time taken by this task to put a batch of sinks records. | +| kafka.connect.sink.put.batch.max.time | Gauge | s | | The maximum time taken by this task to put a batch of sinks records. | +| kafka.connect.sink.record.active.count | UpDownCounter | {record} | | The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.active.count.avg | Gauge | {record} | | The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.active.count.max | Gauge | {record} | | The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.lag.max | Gauge | {record} | | The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions. | +| kafka.connect.sink.record.read.rate | Gauge | {record}/s | | The average per-second number of records read from Kafka for this task before transformations are applied. | +| kafka.connect.sink.record.read.total | Counter | {record} | | The total number of records read from Kafka by this task since it was last restarted. | +| kafka.connect.sink.record.send.rate | Gauge | {record}/s | | The average per-second number of records output from the transformations and sent/put to this task. | +| kafka.connect.sink.record.send.total | Counter | {record} | | The total number of records output from the transformations and sent/put to this task since it was last restarted. | + +## Source task metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.source.poll.batch.avg.time | Gauge | s | | The average time in milliseconds taken by this task to poll for a batch of source records. | +| kafka.connect.source.poll.batch.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to poll for a batch of source records. | +| kafka.connect.source.record.active.count | UpDownCounter | {record} | | The number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.active.count.avg | Gauge | {record} | | The average number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.active.count.max | Gauge | {record} | | The maximum number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.poll.rate | Gauge | {record}/s | | The average per-second number of records produced/polled (before transformation) by this task. | +| kafka.connect.source.record.poll.total | Counter | {record} | | The total number of records produced/polled (before transformation) by this task. | +| kafka.connect.source.record.write.rate | Gauge | {record}/s | | The average per-second number of records written to Kafka for this task. | +| kafka.connect.source.record.write.total | Counter | {record} | | The number of records output written to Kafka for this task. | +| kafka.connect.source.transaction.size.avg | Gauge | {record} | | The average number of records in the transactions the task has committed so far. | +| kafka.connect.source.transaction.size.max | Gauge | {record} | | The number of records in the largest transaction the task has committed so far. | +| kafka.connect.source.transaction.size.min | Gauge | {record} | | The number of records in the smallest transaction the task has committed so far. | + +## Task error metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. + +| Metric Name | Type | Unit | Attributes | Description | +|-------------|------|------|------------|-------------| +| kafka.connect.task.error.deadletterqueue.produce.failures | Counter | {failure} | | The number of failed writes to the dead letter queue. | +| kafka.connect.task.error.deadletterqueue.produce.requests | Counter | {request} | | The number of attempted writes to the dead letter queue. | +| kafka.connect.task.error.last.error.timestamp | Gauge | s | | The epoch timestamp when this task last encountered an error. | +| kafka.connect.task.error.total.errors.logged | Counter | {error} | | The number of errors that were logged. | +| kafka.connect.task.error.total.record.errors | Counter | {record} | | The number of record processing errors in this task. | +| kafka.connect.task.error.total.record.failures | Counter | {record} | | The number of record processing failures in this task. | +| kafka.connect.task.error.total.records.skipped | Counter | {record} | | The number of records skipped due to errors. | +| kafka.connect.task.error.total.retries | Counter | {retry} | | The number of operations retried. | diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml new file mode 100644 index 000000000000..53ea518a9491 --- /dev/null +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -0,0 +1,704 @@ +--- +rules: + + - bean: kafka.connect:type=connect-worker-metrics + prefix: kafka.connect.worker. + mapping: + # kafka.connect.worker.connector.count + connector-count: + metric: connector.count + type: updowncounter + unit: "{connector}" + desc: The number of connectors run in this worker. + # kafka.connect.worker.connector.startup.attempts + connector-startup-attempts-total: + metric: connector.startup.attempts + type: counter + unit: "{attempt}" + desc: The total number of connector startups that this worker has attempted. + # kafka.connect.worker.connector.startup.failure.percentage + connector-startup-failure-percentage: + metric: connector.startup.failure.percentage + type: gauge + sourceUnit: "%" + unit: "1" + desc: The average percentage of this worker's connectors starts that failed. + # kafka.connect.worker.connector.startup.failure.total + connector-startup-failure-total: + metric: connector.startup.failure.total + type: counter + unit: "{startup}" + desc: The total number of connector starts that failed. + # kafka.connect.worker.connector.startup.success.percentage + connector-startup-success-percentage: + metric: connector.startup.success.percentage + type: gauge + sourceUnit: "%" + unit: "1" + desc: The average percentage of this worker's connectors starts that succeeded. + # kafka.connect.worker.connector.startup.success.total + connector-startup-success-total: + metric: connector.startup.success.total + type: counter + unit: "{startup}" + desc: The total number of connector starts that succeeded. + # kafka.connect.worker.task.count + task-count: + metric: task.count + type: updowncounter + unit: "{task}" + desc: The number of tasks run in this worker. + # kafka.connect.worker.task.startup.attempts + task-startup-attempts-total: + metric: task.startup.attempts + type: counter + unit: "{attempt}" + desc: The total number of task startups that this worker has attempted. + # kafka.connect.worker.task.startup.failure.percentage + task-startup-failure-percentage: + metric: task.startup.failure.percentage + type: gauge + sourceUnit: "%" + unit: "1" + desc: The average percentage of this worker's tasks starts that failed. + # kafka.connect.worker.task.startup.failure.total + task-startup-failure-total: + metric: task.startup.failure.total + type: counter + unit: "{startup}" + desc: The total number of task starts that failed. + # kafka.connect.worker.task.startup.success.percentage + task-startup-success-percentage: + metric: task.startup.success.percentage + type: gauge + sourceUnit: "%" + unit: "1" + desc: The average percentage of this worker's tasks starts that succeeded. + # kafka.connect.worker.task.startup.success.total + task-startup-success-total: + metric: task.startup.success.total + type: counter + unit: "{startup}" + desc: The total number of task starts that succeeded. + + - bean: kafka.connect:type=connect-worker-metrics,connector=* + prefix: kafka.connect.worker.connector.task. + metricAttribute: + kafka.connect.connector: param(connector) + type: updowncounter + unit: "{task}" + mapping: + # kafka.connect.worker.connector.task.destroyed + connector-destroyed-task-count: + metric: destroyed + desc: The number of destroyed tasks of the connector on the worker. + # kafka.connect.worker.connector.task.failed + connector-failed-task-count: + metric: failed + desc: The number of failed tasks of the connector on the worker. + # kafka.connect.worker.connector.task.paused + connector-paused-task-count: + metric: paused + desc: The number of paused tasks of the connector on the worker. + # kafka.connect.worker.connector.task.restarting + connector-restarting-task-count: + metric: restarting + desc: The number of restarting tasks of the connector on the worker. + # kafka.connect.worker.connector.task.running + connector-running-task-count: + metric: running + desc: The number of running tasks of the connector on the worker. + # kafka.connect.worker.connector.task.total + connector-total-task-count: + metric: total + desc: The number of tasks of the connector on the worker. + # kafka.connect.worker.connector.task.unassigned + connector-unassigned-task-count: + metric: unassigned + desc: The number of unassigned tasks of the connector on the worker. + + - bean: kafka.connect:type=connect-worker-rebalance-metrics + prefix: kafka.connect.worker.rebalance. + metricAttribute: + kafka.connect.worker.leader: beanattr(leader-name) + mapping: + # kafka.connect.worker.rebalance.completed.total + completed-rebalances-total: + metric: completed.total + type: counter + unit: "{rebalance}" + desc: The total number of rebalances completed by this worker. + # kafka.connect.worker.rebalance.protocol + connect-protocol: + metric: protocol + type: state + desc: The Connect protocol used by this cluster. + metricAttribute: + kafka.connect.protocol.state: + eager: [eager, EAGER] + cooperative: [compatible, COMPATIBLE, cooperative, COOPERATIVE] + unknown: "*" + # kafka.connect.worker.rebalance.epoch + epoch: + metric: epoch + type: updowncounter + unit: "{epoch}" + desc: The epoch or generation number of this worker. + # kafka.connect.worker.rebalance.leader + leader-name: + metric: leader + type: state + desc: The name of the group leader. + metricAttribute: + kafka.connect.worker.leader.state: + leader: "*" + # kafka.connect.worker.rebalance.avg.time + rebalance-avg-time-ms: + metric: avg.time + type: gauge + sourceUnit: ms + unit: s + desc: The average time in milliseconds spent by this worker to rebalance. + # kafka.connect.worker.rebalance.max.time + rebalance-max-time-ms: + metric: max.time + type: gauge + sourceUnit: ms + unit: s + desc: The maximum time in milliseconds spent by this worker to rebalance. + # kafka.connect.worker.rebalance.active + rebalancing: + metric: active + type: state + unit: "1" + desc: Whether this worker is currently rebalancing. + metricAttribute: + kafka.connect.worker.rebalance.state: + rebalancing: [true, TRUE] + idle: [false, FALSE] + unknown: "*" + # kafka.connect.worker.rebalance.since_last + time-since-last-rebalance-ms: + metric: since_last + type: gauge + sourceUnit: ms + unit: s + desc: The time in milliseconds since this worker completed the most recent rebalance. + + - bean: kafka.connect:type=connector-metrics,connector=* + prefix: kafka.connect.connector. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.connector.class: beanattr(connector-class) + kafka.connect.connector.version: beanattr(connector-version) + kafka.connect.connector.type.raw: lowercase(beanattr(connector-type)) + mapping: + # kafka.connect.connector.class + connector-class: + metric: class + type: state + desc: The name of the connector class. + metricAttribute: + kafka.connect.connector.class.state: + configured: "*" + # kafka.connect.connector.type + connector-type: + metric: type + type: state + desc: The type of the connector. One of 'source' or 'sink'. + metricAttribute: + kafka.connect.connector.type: + sink: [sink, SINK] + source: [source, SOURCE] + unknown: "*" + # kafka.connect.connector.version + connector-version: + metric: version + type: state + desc: The version of the connector class, as reported by the connector. + metricAttribute: + kafka.connect.connector.version.state: + reported: "*" + # kafka.connect.connector.status + status: + metric: status + type: state + desc: Connector lifecycle state indicator (1 when the state matches the attribute value) + metricAttribute: + kafka.connect.connector.state: + running: [running, RUNNING] + failed: [failed, FAILED] + paused: [paused, PAUSED] + unassigned: [unassigned, UNASSIGNED] + restarting: [restarting, RESTARTING] + degraded: [degraded, DEGRADED] + stopped: [stopped, STOPPED] + unknown: "*" + + - bean: kafka.connect:type=connector-predicate-metrics,connector=*,task=*,predicate=* + prefix: kafka.connect.predicate. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + kafka.connect.predicate: param(predicate) + kafka.connect.predicate.class: beanattr(predicate-class) + kafka.connect.predicate.version: beanattr(predicate-version) + mapping: + # kafka.connect.predicate.class + predicate-class: + metric: class + type: state + desc: The class name of the predicate class + metricAttribute: + kafka.connect.predicate.class.state: + configured: "*" + # kafka.connect.predicate.version + predicate-version: + metric: version + type: state + desc: The version of the predicate class + metricAttribute: + kafka.connect.predicate.version.state: + reported: "*" + + - bean: kafka.connect:type=connector-task-metrics,connector=*,task=* + prefix: kafka.connect.task. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + kafka.connect.connector.class: beanattr(connector-class) + kafka.connect.connector.type.raw: lowercase(beanattr(connector-type)) + kafka.connect.connector.version: beanattr(connector-version) + kafka.connect.converter.header.class: beanattr(header-converter-class) + kafka.connect.converter.header.version: beanattr(header-converter-version) + kafka.connect.converter.key.class: beanattr(key-converter-class) + kafka.connect.converter.key.version: beanattr(key-converter-version) + kafka.connect.converter.value.class: beanattr(value-converter-class) + kafka.connect.converter.value.version: beanattr(value-converter-version) + kafka.connect.task.class: beanattr(task-class) + kafka.connect.task.version: beanattr(task-version) + mapping: + # kafka.connect.task.batch.size.avg + batch-size-avg: + metric: batch.size.avg + type: gauge + unit: "{record}" + desc: The average number of records in the batches the task has processed so far. + # kafka.connect.task.batch.size.max + batch-size-max: + metric: batch.size.max + type: gauge + unit: "{record}" + desc: The number of records in the largest batch the task has processed so far. + # kafka.connect.task.connector.class + connector-class: + metric: connector.class + type: state + desc: The name of the connector class. + metricAttribute: + kafka.connect.task.connector.class.state: + configured: "*" + # kafka.connect.task.connector.type + connector-type: + metric: connector.type + type: state + desc: The type of the connector. One of 'source' or 'sink'. + metricAttribute: + kafka.connect.task.connector.type: + sink: [sink, SINK] + source: [source, SOURCE] + unknown: "*" + # kafka.connect.task.connector.version + connector-version: + metric: connector.version + type: state + desc: The version of the connector class, as reported by the connector. + metricAttribute: + kafka.connect.task.connector.version.state: + reported: "*" + # kafka.connect.task.header.converter.class + header-converter-class: + metric: header.converter.class + type: state + desc: The fully qualified class name from header.converter + metricAttribute: + kafka.connect.task.header.converter.class.state: + configured: "*" + # kafka.connect.task.header.converter.version + header-converter-version: + metric: header.converter.version + type: state + desc: The version instantiated for header.converter. May be undefined + metricAttribute: + kafka.connect.task.header.converter.version.state: + reported: "*" + # kafka.connect.task.key.converter.class + key-converter-class: + metric: key.converter.class + type: state + desc: The fully qualified class name from key.converter + metricAttribute: + kafka.connect.task.key.converter.class.state: + configured: "*" + # kafka.connect.task.key.converter.version + key-converter-version: + metric: key.converter.version + type: state + desc: The version instantiated for key.converter. May be undefined + metricAttribute: + kafka.connect.task.key.converter.version.state: + reported: "*" + # kafka.connect.task.offset.commit.avg.time + offset-commit-avg-time-ms: + metric: offset.commit.avg.time + type: gauge + sourceUnit: ms + unit: s + desc: The average time in milliseconds taken by this task to commit offsets. + # kafka.connect.task.offset.commit.failure.percentage + offset-commit-failure-percentage: + metric: offset.commit.failure.percentage + type: gauge + sourceUnit: "%" + unit: "1" + desc: The average percentage of this task's offset commit attempts that failed. + # kafka.connect.task.offset.commit.max.time + offset-commit-max-time-ms: + metric: offset.commit.max.time + type: gauge + sourceUnit: ms + unit: s + desc: The maximum time in milliseconds taken by this task to commit offsets. + # kafka.connect.task.offset.commit.success.percentage + offset-commit-success-percentage: + metric: offset.commit.success.percentage + type: gauge + sourceUnit: "%" + unit: "1" + desc: The average percentage of this task's offset commit attempts that succeeded. + # kafka.connect.task.pause.ratio + pause-ratio: + metric: pause.ratio + type: gauge + unit: "1" + desc: The fraction of time this task has spent in the pause state. + # kafka.connect.task.running.ratio + running-ratio: + metric: running.ratio + type: gauge + unit: "1" + desc: The fraction of time this task has spent in the running state. + # kafka.connect.task.status + status: + metric: status + type: state + desc: The status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or 'restarting'. + metricAttribute: + kafka.connect.task.state: + running: [running, RUNNING] + failed: [failed, FAILED] + paused: [paused, PAUSED] + unassigned: [unassigned, UNASSIGNED] + restarting: [restarting, RESTARTING] + unknown: "*" + # kafka.connect.task.class + task-class: + metric: task.class + type: state + desc: The class name of the task. + metricAttribute: + kafka.connect.task.class.state: + configured: "*" + # kafka.connect.task.version + task-version: + metric: task.version + type: state + desc: The version of the task. + metricAttribute: + kafka.connect.task.version.state: + reported: "*" + # kafka.connect.task.value.converter.class + value-converter-class: + metric: value.converter.class + type: state + desc: The fully qualified class name from value.converter + metricAttribute: + kafka.connect.task.value.converter.class.state: + configured: "*" + # kafka.connect.task.value.converter.version + value-converter-version: + metric: value.converter.version + type: state + desc: The version instantiated for value.converter. May be undefined + metricAttribute: + kafka.connect.task.value.converter.version.state: + reported: "*" + + - bean: kafka.connect:type=connector-transform-metrics,connector=*,task=*,transform=* + prefix: kafka.connect.transform. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + kafka.connect.transform: param(transform) + kafka.connect.transform.class: beanattr(transform-class) + kafka.connect.transform.version: beanattr(transform-version) + mapping: + # kafka.connect.transform.class + transform-class: + metric: class + type: state + desc: The class name of the transformation class + metricAttribute: + kafka.connect.transform.class.state: + configured: "*" + # kafka.connect.transform.version + transform-version: + metric: version + type: state + desc: The version of the transformation class + metricAttribute: + kafka.connect.transform.version.state: + reported: "*" + + - bean: kafka.connect:type=sink-task-metrics,connector=*,task=* + prefix: kafka.connect.sink. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + mapping: + # kafka.connect.sink.offset.commit.completion.rate + offset-commit-completion-rate: + metric: offset.commit.completion.rate + type: gauge + unit: "{commit}/s" + desc: The average per-second number of offset commit completions that were completed successfully. + # kafka.connect.sink.offset.commit.completion.total + offset-commit-completion-total: + metric: offset.commit.completion.total + type: counter + unit: "{commit}" + desc: The total number of offset commit completions that were completed successfully. + # kafka.connect.sink.offset.commit.seq + offset-commit-seq-no: + metric: offset.commit.seq + type: updowncounter + unit: "{sequence}" + desc: The current sequence number for offset commits. + # kafka.connect.sink.offset.commit.skip.rate + offset-commit-skip-rate: + metric: offset.commit.skip.rate + type: gauge + unit: "{commit}/s" + desc: The average per-second number of offset commit completions that were received too late and skipped/ignored. + # kafka.connect.sink.offset.commit.skip.total + offset-commit-skip-total: + metric: offset.commit.skip.total + type: counter + unit: "{commit}" + desc: The total number of offset commit completions that were received too late and skipped/ignored. + # kafka.connect.sink.partition.count + partition-count: + metric: partition.count + type: updowncounter + unit: "{partition}" + desc: The number of topic partitions assigned to this task belonging to the named sink connector in this worker. + # kafka.connect.sink.put.batch.avg.time + put-batch-avg-time-ms: + metric: put.batch.avg.time + type: gauge + sourceUnit: ms + unit: s + desc: The average time taken by this task to put a batch of sinks records. + # kafka.connect.sink.put.batch.max.time + put-batch-max-time-ms: + metric: put.batch.max.time + type: gauge + sourceUnit: ms + unit: s + desc: The maximum time taken by this task to put a batch of sinks records. + # kafka.connect.sink.record.active.count + sink-record-active-count: + metric: record.active.count + type: updowncounter + unit: "{record}" + desc: The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. + # kafka.connect.sink.record.active.count.avg + sink-record-active-count-avg: + metric: record.active.count.avg + type: gauge + unit: "{record}" + desc: The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. + # kafka.connect.sink.record.active.count.max + sink-record-active-count-max: + metric: record.active.count.max + type: gauge + unit: "{record}" + desc: The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. + # kafka.connect.sink.record.lag.max + sink-record-lag-max: + metric: record.lag.max + type: gauge + unit: "{record}" + desc: The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions. + # kafka.connect.sink.record.read.rate + sink-record-read-rate: + metric: record.read.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records read from Kafka for this task belonging to the named sink connector in this worker. This is before transformations are applied. + # kafka.connect.sink.record.read.total + sink-record-read-total: + metric: record.read.total + type: counter + unit: "{record}" + desc: The total number of records read from Kafka by this task belonging to the named sink connector in this worker, since the task was last restarted. + # kafka.connect.sink.record.send.rate + sink-record-send-rate: + metric: record.send.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations. + # kafka.connect.sink.record.send.total + sink-record-send-total: + metric: record.send.total + type: counter + unit: "{record}" + desc: The total number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker, since the task was last restarted. + + - bean: kafka.connect:type=source-task-metrics,connector=*,task=* + prefix: kafka.connect.source. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + mapping: + # kafka.connect.source.poll.batch.avg.time + poll-batch-avg-time-ms: + metric: poll.batch.avg.time + type: gauge + sourceUnit: ms + unit: s + desc: The average time in milliseconds taken by this task to poll for a batch of source records. + # kafka.connect.source.poll.batch.max.time + poll-batch-max-time-ms: + metric: poll.batch.max.time + type: gauge + sourceUnit: ms + unit: s + desc: The maximum time in milliseconds taken by this task to poll for a batch of source records. + # kafka.connect.source.record.active.count + source-record-active-count: + metric: record.active.count + type: updowncounter + unit: "{record}" + desc: The number of records that have been produced by this task but not yet completely written to Kafka. + # kafka.connect.source.record.active.count.avg + source-record-active-count-avg: + metric: record.active.count.avg + type: gauge + unit: "{record}" + desc: The average number of records that have been produced by this task but not yet completely written to Kafka. + # kafka.connect.source.record.active.count.max + source-record-active-count-max: + metric: record.active.count.max + type: gauge + unit: "{record}" + desc: The maximum number of records that have been produced by this task but not yet completely written to Kafka. + # kafka.connect.source.record.poll.rate + source-record-poll-rate: + metric: record.poll.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker. + # kafka.connect.source.record.poll.total + source-record-poll-total: + metric: record.poll.total + type: counter + unit: "{record}" + desc: The total number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker. + # kafka.connect.source.record.write.rate + source-record-write-rate: + metric: record.write.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations. + # kafka.connect.source.record.write.total + source-record-write-total: + metric: record.write.total + type: counter + unit: "{record}" + desc: The number of records output written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations. + # kafka.connect.source.transaction.size.avg + transaction-size-avg: + metric: transaction.size.avg + type: gauge + unit: "{record}" + desc: The average number of records in the transactions the task has committed so far. + # kafka.connect.source.transaction.size.max + transaction-size-max: + metric: transaction.size.max + type: gauge + unit: "{record}" + desc: The number of records in the largest transaction the task has committed so far. + # kafka.connect.source.transaction.size.min + transaction-size-min: + metric: transaction.size.min + type: gauge + unit: "{record}" + desc: The number of records in the smallest transaction the task has committed so far. + + - bean: kafka.connect:type=task-error-metrics,connector=*,task=* + prefix: kafka.connect.task.error. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + mapping: + # kafka.connect.task.error.deadletterqueue.produce.failures + deadletterqueue-produce-failures: + metric: deadletterqueue.produce.failures + type: counter + unit: "{failure}" + desc: The number of failed writes to the dead letter queue. + # kafka.connect.task.error.deadletterqueue.produce.requests + deadletterqueue-produce-requests: + metric: deadletterqueue.produce.requests + type: counter + unit: "{request}" + desc: The number of attempted writes to the dead letter queue. + # kafka.connect.task.error.last.error.timestamp + last-error-timestamp: + metric: last.error.timestamp + type: gauge + sourceUnit: ms + unit: s + dropNegativeValues: true + desc: The epoch timestamp when this task last encountered an error. + # kafka.connect.task.error.total.errors.logged + total-errors-logged: + metric: total.errors.logged + type: counter + unit: "{error}" + desc: The number of errors that were logged. + # kafka.connect.task.error.total.record.errors + total-record-errors: + metric: total.record.errors + type: counter + unit: "{record}" + desc: The number of record processing errors in this task. + # kafka.connect.task.error.total.record.failures + total-record-failures: + metric: total.record.failures + type: counter + unit: "{record}" + desc: The number of record processing failures in this task. + # kafka.connect.task.error.total.records.skipped + total-records-skipped: + metric: total.records.skipped + type: counter + unit: "{record}" + desc: The number of records skipped due to errors. + # kafka.connect.task.error.total.retries + total-retries: + metric: total.retries + type: counter + unit: "{retry}" + desc: The number of operations retried. diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java new file mode 100644 index 000000000000..6231a663a16a --- /dev/null +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.jmx.rules; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import io.opentelemetry.instrumentation.jmx.yaml.JmxConfig; +import io.opentelemetry.instrumentation.jmx.yaml.JmxRule; +import io.opentelemetry.instrumentation.jmx.yaml.Metric; +import io.opentelemetry.instrumentation.jmx.yaml.RuleParser; +import io.opentelemetry.instrumentation.jmx.yaml.StateMapping; +import java.io.InputStream; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class KafkaConnectRuleTest { + + @Test + void kafkaConnectConfigParsesAndBuilds() throws Exception { + try (InputStream input = + getClass().getClassLoader().getResourceAsStream("jmx/rules/kafka-connect.yaml")) { + assertThat(input).isNotNull(); + + JmxConfig config = RuleParser.get().loadConfig(input); + assertThat(config.getRules()).isNotEmpty(); + + // ensure all metric definitions build without throwing + for (JmxRule rule : config.getRules()) { + assertThatCode(rule::buildMetricDef).doesNotThrowAnyException(); + } + } + } + + @Test + void connectorStatusStateMappingPresent() throws Exception { + try (InputStream input = + getClass().getClassLoader().getResourceAsStream("jmx/rules/kafka-connect.yaml")) { + JmxConfig config = RuleParser.get().loadConfig(input); + + Optional connectorRule = + config.getRules().stream() + .filter( + rule -> + rule.getBeans().contains("kafka.connect:type=connector-metrics,connector=*")) + .findFirst(); + assertThat(connectorRule).isPresent(); + + Metric statusMetric = connectorRule.get().getMapping().get("status"); + assertThat(statusMetric).isNotNull(); + + StateMapping stateMapping = statusMetric.getStateMapping(); + assertThat(stateMapping.isEmpty()).isFalse(); + assertThat(stateMapping.getStateKeys()) + .contains( + "running", + "failed", + "paused", + "unassigned", + "restarting", + "degraded", + "stopped", + "unknown"); + assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown"); + assertThat(stateMapping.getStateValue("RUNNING")).isEqualTo("running"); + assertThat(stateMapping.getStateValue("FAILED")).isEqualTo("failed"); + assertThat(stateMapping.getStateValue("PAUSED")).isEqualTo("paused"); + assertThat(stateMapping.getStateValue("UNKNOWN")).isEqualTo("unknown"); + } + } +} From d46eb7704e15a41cd5a77dc74b9f69cfdd9be60b Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Fri, 5 Dec 2025 19:39:54 -0500 Subject: [PATCH 02/13] slight change to support both apache and confluent --- .../jmx-metrics/library/kafka-connect.md | 19 +- .../resources/jmx/rules/kafka-connect.yaml | 5 +- .../jmx/rules/KafkaConnectRuleTest.java | 177 ++++++++++++++---- 3 files changed, 154 insertions(+), 47 deletions(-) diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md index 55aa4e5624f8..fc0c9c8cabe4 100644 --- a/instrumentation/jmx-metrics/library/kafka-connect.md +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -1,8 +1,19 @@ # Kafka Connect Metrics -Here is the list of metrics based on MBeans exposed by Kafka Connect. String-valued JMX +Here is the list of metrics based on MBeans exposed by Apache Kafka Connect. String-valued JMX attributes (class/type/version information) are exported as state metrics with value `1` and -carry the raw string value as metric attributes. +carry the raw string value as metric attributes. + +## Compatibility + +This rule set targets both Apache Kafka Connect and Confluent Platform. Apache documents several +metrics not surfaced in Confluent docs (worker rebalance protocol, per-connector task counts on +workers, predicate/transform metadata, connector task metadata including converter info, source +transaction size stats, and sink record lag max); all of them are included below. Status metrics use +the superset of values across both variants (connector: running, paused, stopped, failed, +restarting, unassigned, degraded; task: running, paused, failed, restarting, unassigned, +destroyed) and fall back to `unknown` for any new values. Differences in bean placeholder +formatting between the docs are cosmetic; bean names align across both variants. ## Worker metrics @@ -57,7 +68,7 @@ Attributes: `kafka.connect.connector`, `kafka.connect.connector.class`, `kafka.c | kafka.connect.connector.class | UpDownCounter | 1 | kafka.connect.connector.class.state | The name of the connector class. | | kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. | | kafka.connect.connector.version | UpDownCounter | 1 | kafka.connect.connector.version.state | The version of the connector class, as reported by the connector. | -| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value). | +| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value); accepts running, paused, stopped, failed, restarting, unassigned, degraded, or unknown. | ## Predicate metrics @@ -89,7 +100,7 @@ Attributes include `kafka.connect.connector`, `kafka.connect.task.id`, connector | kafka.connect.task.offset.commit.success.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that succeeded. | | kafka.connect.task.pause.ratio | Gauge | 1 | | The fraction of time this task has spent in the pause state. | | kafka.connect.task.running.ratio | Gauge | 1 | | The fraction of time this task has spent in the running state. | -| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task. | +| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task; supports running, paused, failed, restarting, unassigned, destroyed, or unknown. | | kafka.connect.task.class | UpDownCounter | 1 | kafka.connect.task.class.state | The class name of the task. | | kafka.connect.task.version | UpDownCounter | 1 | kafka.connect.task.version.state | The version of the task. | | kafka.connect.task.value.converter.class | UpDownCounter | 1 | kafka.connect.task.value.converter.class.state | The fully qualified class name from value.converter. | diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml index 53ea518a9491..bee00568c61a 100644 --- a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -223,7 +223,7 @@ rules: status: metric: status type: state - desc: Connector lifecycle state indicator (1 when the state matches the attribute value) + desc: Connector lifecycle state indicator (1 when the state matches the attribute value). Supports Apache and Confluent status values. metricAttribute: kafka.connect.connector.state: running: [running, RUNNING] @@ -392,7 +392,7 @@ rules: status: metric: status type: state - desc: The status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or 'restarting'. + desc: The status of the connector task. Supports Apache (unassigned, running, paused, failed, restarting) and Confluent (unassigned, running, paused, failed, destroyed) values. metricAttribute: kafka.connect.task.state: running: [running, RUNNING] @@ -400,6 +400,7 @@ rules: paused: [paused, PAUSED] unassigned: [unassigned, UNASSIGNED] restarting: [restarting, RESTARTING] + destroyed: [destroyed, DESTROYED] unknown: "*" # kafka.connect.task.class task-class: diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java index 6231a663a16a..8f9e9d238893 100644 --- a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -14,61 +14,156 @@ import io.opentelemetry.instrumentation.jmx.yaml.RuleParser; import io.opentelemetry.instrumentation.jmx.yaml.StateMapping; import java.io.InputStream; -import java.util.Optional; import org.junit.jupiter.api.Test; class KafkaConnectRuleTest { @Test void kafkaConnectConfigParsesAndBuilds() throws Exception { - try (InputStream input = - getClass().getClassLoader().getResourceAsStream("jmx/rules/kafka-connect.yaml")) { - assertThat(input).isNotNull(); - - JmxConfig config = RuleParser.get().loadConfig(input); - assertThat(config.getRules()).isNotEmpty(); + JmxConfig config = loadKafkaConnectConfig(); + assertThat(config.getRules()).isNotEmpty(); - // ensure all metric definitions build without throwing - for (JmxRule rule : config.getRules()) { - assertThatCode(rule::buildMetricDef).doesNotThrowAnyException(); - } + // ensure all metric definitions build without throwing + for (JmxRule rule : config.getRules()) { + assertThatCode(rule::buildMetricDef).doesNotThrowAnyException(); } } @Test void connectorStatusStateMappingPresent() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + + JmxRule connectorRule = + getRuleForBean(config, "kafka.connect:type=connector-metrics,connector=*"); + + StateMapping stateMapping = getMetric(connectorRule, "status").getStateMapping(); + assertThat(stateMapping.isEmpty()).isFalse(); + assertThat(stateMapping.getStateKeys()) + .contains( + "running", + "failed", + "paused", + "unassigned", + "restarting", + "degraded", + "stopped", + "unknown"); + assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown"); + assertThat(stateMapping.getStateValue("RUNNING")).isEqualTo("running"); + assertThat(stateMapping.getStateValue("FAILED")).isEqualTo("failed"); + assertThat(stateMapping.getStateValue("PAUSED")).isEqualTo("paused"); + assertThat(stateMapping.getStateValue("UNKNOWN")).isEqualTo("unknown"); + } + + @Test + void taskStatusStateMappingSuperset() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + + JmxRule connectorTaskRule = + getRuleForBean( + config, "kafka.connect:type=connector-task-metrics,connector=*,task=*"); + + StateMapping stateMapping = getMetric(connectorTaskRule, "status").getStateMapping(); + assertThat(stateMapping.isEmpty()).isFalse(); + assertThat(stateMapping.getStateKeys()) + .contains( + "running", + "failed", + "paused", + "unassigned", + "restarting", + "destroyed", + "unknown"); + assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown"); + assertThat(stateMapping.getStateValue("DESTROYED")).isEqualTo("destroyed"); + assertThat(stateMapping.getStateValue("RESTARTING")).isEqualTo("restarting"); + assertThat(stateMapping.getStateValue("unexpected")).isEqualTo("unknown"); + } + + @Test + void apacheSpecificMetricsPresent() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + + assertMappingContains( + config, "kafka.connect:type=connect-worker-rebalance-metrics", "connect-protocol"); + + assertMappingContains( + config, + "kafka.connect:type=connect-worker-metrics,connector=*", + "connector-destroyed-task-count", + "connector-failed-task-count", + "connector-paused-task-count", + "connector-restarting-task-count", + "connector-running-task-count", + "connector-total-task-count", + "connector-unassigned-task-count"); + + assertMappingContains( + config, + "kafka.connect:type=connector-predicate-metrics,connector=*,task=*,predicate=*", + "predicate-class", + "predicate-version"); + + assertMappingContains( + config, + "kafka.connect:type=connector-transform-metrics,connector=*,task=*,transform=*", + "transform-class", + "transform-version"); + + assertMappingContains( + config, + "kafka.connect:type=connector-task-metrics,connector=*,task=*", + "connector-class", + "connector-type", + "connector-version", + "header-converter-class", + "header-converter-version", + "key-converter-class", + "key-converter-version", + "task-class", + "task-version", + "value-converter-class", + "value-converter-version"); + + assertMappingContains( + config, + "kafka.connect:type=source-task-metrics,connector=*,task=*", + "transaction-size-avg", + "transaction-size-max", + "transaction-size-min"); + + assertMappingContains( + config, + "kafka.connect:type=sink-task-metrics,connector=*,task=*", + "sink-record-lag-max"); + } + + private JmxConfig loadKafkaConnectConfig() throws Exception { try (InputStream input = getClass().getClassLoader().getResourceAsStream("jmx/rules/kafka-connect.yaml")) { - JmxConfig config = RuleParser.get().loadConfig(input); - - Optional connectorRule = - config.getRules().stream() - .filter( - rule -> - rule.getBeans().contains("kafka.connect:type=connector-metrics,connector=*")) - .findFirst(); - assertThat(connectorRule).isPresent(); - - Metric statusMetric = connectorRule.get().getMapping().get("status"); - assertThat(statusMetric).isNotNull(); - - StateMapping stateMapping = statusMetric.getStateMapping(); - assertThat(stateMapping.isEmpty()).isFalse(); - assertThat(stateMapping.getStateKeys()) - .contains( - "running", - "failed", - "paused", - "unassigned", - "restarting", - "degraded", - "stopped", - "unknown"); - assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown"); - assertThat(stateMapping.getStateValue("RUNNING")).isEqualTo("running"); - assertThat(stateMapping.getStateValue("FAILED")).isEqualTo("failed"); - assertThat(stateMapping.getStateValue("PAUSED")).isEqualTo("paused"); - assertThat(stateMapping.getStateValue("UNKNOWN")).isEqualTo("unknown"); + assertThat(input).isNotNull(); + return RuleParser.get().loadConfig(input); } } + + private static JmxRule getRuleForBean(JmxConfig config, String bean) { + return config.getRules().stream() + .filter(rule -> rule.getBeans().contains(bean)) + .findFirst() + .orElseThrow(() -> new AssertionError("Missing rule for bean " + bean)); + } + + private static Metric getMetric(JmxRule rule, String metricKey) { + Metric metric = rule.getMapping().get(metricKey); + if (metric == null) { + throw new AssertionError("Missing metric " + metricKey + " in rule " + rule.getBeans()); + } + return metric; + } + + private static void assertMappingContains( + JmxConfig config, String bean, String... metricKeys) { + JmxRule rule = getRuleForBean(config, bean); + assertThat(rule.getMapping().keySet()).contains(metricKeys); + } } From 31b44c8024260692929f72d17bc628ae143002c8 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Sat, 6 Dec 2025 13:01:34 -0500 Subject: [PATCH 03/13] fix small error and unit tests --- .../resources/jmx/rules/kafka-connect.yaml | 4 +- .../jmx/rules/KafkaConnectRuleTest.java | 444 +++++++++++++++--- 2 files changed, 388 insertions(+), 60 deletions(-) diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml index bee00568c61a..bc3392924f96 100644 --- a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -174,8 +174,8 @@ rules: desc: Whether this worker is currently rebalancing. metricAttribute: kafka.connect.worker.rebalance.state: - rebalancing: [true, TRUE] - idle: [false, FALSE] + rebalancing: ["true", "TRUE"] + idle: ["false", "FALSE"] unknown: "*" # kafka.connect.worker.rebalance.since_last time-since-last-rebalance-ms: diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java index 8f9e9d238893..48de2f1a9b09 100644 --- a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -8,16 +8,48 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.jmx.JmxTelemetry; import io.opentelemetry.instrumentation.jmx.yaml.JmxConfig; import io.opentelemetry.instrumentation.jmx.yaml.JmxRule; import io.opentelemetry.instrumentation.jmx.yaml.Metric; import io.opentelemetry.instrumentation.jmx.yaml.RuleParser; import io.opentelemetry.instrumentation.jmx.yaml.StateMapping; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.AttributeNotFoundException; +import javax.management.DynamicMBean; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; class KafkaConnectRuleTest { + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private static MBeanServer mbeanServer; + @Test void kafkaConnectConfigParsesAndBuilds() throws Exception { JmxConfig config = loadKafkaConnectConfig(); @@ -81,61 +113,201 @@ void taskStatusStateMappingSuperset() throws Exception { } @Test - void apacheSpecificMetricsPresent() throws Exception { - JmxConfig config = loadKafkaConnectConfig(); + void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Exception { + registerMBean( + "kafka.connect:type=connect-worker-metrics", + mapOf("connector-count", 1L, "task-count", 2L)); + + registerMBean( + "kafka.connect:type=connector-metrics,connector=confluent-connector", + mapOf( + "connector-class", "io.test.MyConnector", + "connector-type", "source", + "connector-version", "1.2.3", + "status", "RUNNING")); + + registerMBean( + "kafka.connect:type=connector-task-metrics,connector=confluent-connector,task=0", + mapOf( + "status", "DESTROYED", + "connector-class", "io.test.MyConnector", + "connector-type", "sink", + "connector-version", "9.9", + "task-class", "io.test.Task", + "task-version", "1.0", + "offset-commit-avg-time-ms", 5L)); + + startKafkaConnectTelemetry(); + + assertLongSum( + "kafka.connect.worker.connector.count", Attributes.empty(), 1); + assertLongSum("kafka.connect.worker.task.count", Attributes.empty(), 2); + + Attributes connectorStatusAttributes = + Attributes.builder() + .put("kafka.connect.connector", "confluent-connector") + .put("kafka.connect.connector.state", "running") + .put("kafka.connect.connector.class", "io.test.MyConnector") + .put("kafka.connect.connector.type.raw", "source") + .put("kafka.connect.connector.version", "1.2.3") + .build(); + assertLongSum( + "kafka.connect.connector.status", connectorStatusAttributes, 1); - assertMappingContains( - config, "kafka.connect:type=connect-worker-rebalance-metrics", "connect-protocol"); - - assertMappingContains( - config, - "kafka.connect:type=connect-worker-metrics,connector=*", - "connector-destroyed-task-count", - "connector-failed-task-count", - "connector-paused-task-count", - "connector-restarting-task-count", - "connector-running-task-count", - "connector-total-task-count", - "connector-unassigned-task-count"); - - assertMappingContains( - config, - "kafka.connect:type=connector-predicate-metrics,connector=*,task=*,predicate=*", - "predicate-class", - "predicate-version"); - - assertMappingContains( - config, - "kafka.connect:type=connector-transform-metrics,connector=*,task=*,transform=*", - "transform-class", - "transform-version"); - - assertMappingContains( - config, - "kafka.connect:type=connector-task-metrics,connector=*,task=*", - "connector-class", - "connector-type", - "connector-version", - "header-converter-class", - "header-converter-version", - "key-converter-class", - "key-converter-version", - "task-class", - "task-version", - "value-converter-class", - "value-converter-version"); - - assertMappingContains( - config, - "kafka.connect:type=source-task-metrics,connector=*,task=*", - "transaction-size-avg", - "transaction-size-max", - "transaction-size-min"); - - assertMappingContains( - config, - "kafka.connect:type=sink-task-metrics,connector=*,task=*", - "sink-record-lag-max"); + Attributes taskStatusAttributes = + Attributes.builder() + .put("kafka.connect.connector", "confluent-connector") + .put("kafka.connect.task.id", "0") + .put("kafka.connect.connector.class", "io.test.MyConnector") + .put("kafka.connect.connector.type.raw", "sink") + .put("kafka.connect.connector.version", "9.9") + .put("kafka.connect.task.class", "io.test.Task") + .put("kafka.connect.task.version", "1.0") + .put("kafka.connect.task.state", "destroyed") + .build(); + assertLongSum("kafka.connect.task.status", taskStatusAttributes, 1); + } + + @Test + void apacheSpecificMetricsAreReportedWhenPresent() throws Exception { + registerMBean( + "kafka.connect:type=connect-worker-rebalance-metrics", + mapOf("connect-protocol", "eager")); + + registerMBean( + "kafka.connect:type=connect-worker-metrics,connector=apache-connector", + mapOf( + "connector-running-task-count", 2L, + "connector-unassigned-task-count", 0L)); + + registerMBean( + "kafka.connect:type=connector-predicate-metrics,connector=apache-connector,task=1,predicate=sample", + mapOf("predicate-class", "io.test.Predicate", "predicate-version", "2.1.0")); + + registerMBean( + "kafka.connect:type=connector-transform-metrics,connector=apache-connector,task=1,transform=mask", + mapOf("transform-class", "io.test.Transform", "transform-version", "0.9.0")); + + registerMBean( + "kafka.connect:type=connector-task-metrics,connector=apache-connector,task=1", + mapOf( + "connector-class", "io.test.ApacheConnector", + "connector-type", "source", + "connector-version", "3.0", + "task-class", "io.test.Task", + "task-version", "3.1", + "header-converter-class", "io.test.HeaderConverter", + "header-converter-version", "1.0", + "key-converter-class", "io.test.KeyConverter", + "key-converter-version", "1.1", + "value-converter-class", "io.test.ValueConverter", + "value-converter-version", "1.2")); + + registerMBean( + "kafka.connect:type=source-task-metrics,connector=apache-connector,task=1", + mapOf( + "transaction-size-avg", 3L, + "transaction-size-max", 6L, + "transaction-size-min", 1L)); + + registerMBean( + "kafka.connect:type=sink-task-metrics,connector=apache-connector,task=1", + mapOf("sink-record-lag-max", 11L)); + + startKafkaConnectTelemetry(); + + assertLongSum( + "kafka.connect.worker.rebalance.protocol", + Attributes.of( + io.opentelemetry.api.common.AttributeKey.stringKey("kafka.connect.protocol.state"), + "eager"), + 1); + + Attributes connectorTaskAttributes = + Attributes.of(io.opentelemetry.api.common.AttributeKey.stringKey("kafka.connect.connector"), "apache-connector"); + assertLongSum("kafka.connect.worker.connector.task.running", connectorTaskAttributes, 2); + + assertLongSum( + "kafka.connect.predicate.class", + Attributes.builder() + .put("kafka.connect.connector", "apache-connector") + .put("kafka.connect.task.id", "1") + .put("kafka.connect.predicate", "sample") + .put("kafka.connect.predicate.class", "io.test.Predicate") + .put("kafka.connect.predicate.class.state", "configured") + .put("kafka.connect.predicate.version", "2.1.0") + .build(), + 1); + + assertLongSum( + "kafka.connect.transform.class", + Attributes.builder() + .put("kafka.connect.connector", "apache-connector") + .put("kafka.connect.task.id", "1") + .put("kafka.connect.transform", "mask") + .put("kafka.connect.transform.class", "io.test.Transform") + .put("kafka.connect.transform.class.state", "configured") + .put("kafka.connect.transform.version", "0.9.0") + .build(), + 1); + + Attributes connectorTaskMetaAttributes = + Attributes.builder() + .put("kafka.connect.connector", "apache-connector") + .put("kafka.connect.task.id", "1") + .put("kafka.connect.connector.class", "io.test.ApacheConnector") + .put("kafka.connect.connector.type.raw", "source") + .put("kafka.connect.connector.version", "3.0") + .put("kafka.connect.task.class", "io.test.Task") + .put("kafka.connect.task.version", "3.1") + .build(); + assertLongSum( + "kafka.connect.task.header.converter.class", + Attributes.builder() + .putAll(connectorTaskMetaAttributes) + .put("kafka.connect.converter.header.class", "io.test.HeaderConverter") + .put("kafka.connect.task.header.converter.class.state", "configured") + .put("kafka.connect.converter.header.version", "1.0") + .build(), + 1); + + assertLongGauge( + "kafka.connect.source.transaction.size.max", + Attributes.builder() + .put("kafka.connect.connector", "apache-connector") + .put("kafka.connect.task.id", "1") + .build(), + 6); + + assertLongGauge( + "kafka.connect.sink.record.lag.max", + Attributes.builder() + .put("kafka.connect.connector", "apache-connector") + .put("kafka.connect.task.id", "1") + .build(), + 11); + } + + @BeforeAll + static void setUp() { + mbeanServer = MBeanServerFactory.createMBeanServer("kafka.connect"); + } + + @AfterEach + void cleanUp() throws Exception { + for (ObjectName name : mbeanServer.queryNames(new ObjectName("kafka.connect:*"), null)) { + try { + mbeanServer.unregisterMBean(name); + } catch (InstanceNotFoundException | MBeanRegistrationException ignored) { + // best effort cleanup for flaky tests + } + } + testing.clearData(); + } + + @AfterAll + static void tearDown() { + MBeanServerFactory.releaseMBeanServer(mbeanServer); } private JmxConfig loadKafkaConnectConfig() throws Exception { @@ -146,6 +318,13 @@ private JmxConfig loadKafkaConnectConfig() throws Exception { } } + private static void startKafkaConnectTelemetry() { + JmxTelemetry.builder(testing.getOpenTelemetry()) + .addClassPathRules("kafka-connect") + .build() + .start(() -> Collections.singletonList(mbeanServer)); + } + private static JmxRule getRuleForBean(JmxConfig config, String bean) { return config.getRules().stream() .filter(rule -> rule.getBeans().contains(bean)) @@ -161,9 +340,158 @@ private static Metric getMetric(JmxRule rule, String metricKey) { return metric; } - private static void assertMappingContains( - JmxConfig config, String bean, String... metricKeys) { - JmxRule rule = getRuleForBean(config, bean); - assertThat(rule.getMapping().keySet()).contains(metricKeys); + private static Map mapOf(Object... kvPairs) { + Map map = new HashMap<>(); + for (int i = 0; i < kvPairs.length; i += 2) { + map.put((String) kvPairs[i], kvPairs[i + 1]); + } + return map; + } + + private static void registerMBean(String objectName, Map attributes) + throws Exception { + mbeanServer.registerMBean(new MapBackedDynamicMBean(attributes), new ObjectName(objectName)); + } + + private static void assertLongSum(String metricName, Attributes attributes, long expectedValue) { + testing.waitAndAssertMetrics( + "io.opentelemetry.jmx", + metricName, + metrics -> + metrics.anySatisfy( + metric -> { + boolean matched = + metric.getLongSumData().getPoints().stream() + .anyMatch( + pointData -> + attributesMatch(pointData.getAttributes(), attributes) + && pointData.getValue() == expectedValue); + assertThat(matched) + .as( + "Expected %s to have a point with attributes %s and value %s", + metricName, attributes, expectedValue) + .isTrue(); + })); + } + + private static void assertLongGauge(String metricName, Attributes attributes, long expected) { + testing.waitAndAssertMetrics( + "io.opentelemetry.jmx", + metricName, + metrics -> + metrics.anySatisfy( + metric -> { + boolean matched = + metric.getLongGaugeData().getPoints().stream() + .anyMatch( + pointData -> + attributesMatch(pointData.getAttributes(), attributes) + && pointData.getValue() == expected); + assertThat(matched) + .as( + "Expected %s to have a point with attributes %s and value %s", + metricName, attributes, expected) + .isTrue(); + })); + } + + private static boolean attributesMatch(Attributes actual, Attributes expected) { + for (Map.Entry, Object> entry : + expected.asMap().entrySet()) { + Object actualValue = actual.get(entry.getKey()); + if (!entry.getValue().equals(actualValue)) { + return false; + } + } + return true; + } + + /** + * Minimal DynamicMBean implementation backed by a simple attribute map. This keeps the functional + * Kafka Connect coverage self contained in the test file. + */ + static class MapBackedDynamicMBean extends NotificationBroadcasterSupport implements DynamicMBean { + + private final Map attributes; + private long sequenceNumber = 1; + + MapBackedDynamicMBean(Map attributes) { + this.attributes = new HashMap<>(attributes); + } + + @Override + public Object getAttribute(String attribute) + throws AttributeNotFoundException, MBeanException, ReflectionException { + if (!attributes.containsKey(attribute)) { + throw new AttributeNotFoundException(attribute); + } + return attributes.get(attribute); + } + + @Override + public void setAttribute(Attribute attribute) { + attributes.put(attribute.getName(), attribute.getValue()); + Notification n = + new Notification( + "jmx.attribute.changed", + this, + sequenceNumber++, + System.currentTimeMillis(), + attribute.getName()); + sendNotification(n); + } + + @Override + public AttributeList getAttributes(String[] attributes) { + AttributeList list = new AttributeList(); + for (String attr : attributes) { + Object value = this.attributes.get(attr); + if (value != null) { + list.add(new Attribute(attr, value)); + } + } + return list; + } + + @Override + public AttributeList setAttributes(AttributeList attributes) { + AttributeList updated = new AttributeList(); + for (Object attribute : attributes) { + if (attribute instanceof Attribute) { + Attribute attr = (Attribute) attribute; + setAttribute(attr); + updated.add(attr); + } + } + return updated; + } + + @Override + public Object invoke(String actionName, Object[] params, String[] signature) + throws MBeanException, ReflectionException { + return null; + } + + @Override + public MBeanInfo getMBeanInfo() { + List infos = new ArrayList<>(); + attributes.forEach( + (name, value) -> + infos.add( + new javax.management.MBeanAttributeInfo( + name, + value.getClass().getName(), + name + " attribute", + true, + true, + false))); + return new MBeanInfo( + this.getClass().getName(), + "Map backed test MBean", + infos.toArray(new javax.management.MBeanAttributeInfo[0]), + null, + null, + null); + } } } From 2f25096ac9f0a395a0cc1a1230ed5381ef400588 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Sat, 6 Dec 2025 13:14:33 -0500 Subject: [PATCH 04/13] fix linting issue --- .../jmx-metrics/library/kafka-connect.md | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md index fc0c9c8cabe4..f78ea6c14f1a 100644 --- a/instrumentation/jmx-metrics/library/kafka-connect.md +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -2,7 +2,7 @@ Here is the list of metrics based on MBeans exposed by Apache Kafka Connect. String-valued JMX attributes (class/type/version information) are exported as state metrics with value `1` and -carry the raw string value as metric attributes. +carry the raw string value as metric attributes. ## Compatibility @@ -17,47 +17,47 @@ formatting between the docs are cosmetic; bean names align across both variants. ## Worker metrics -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.worker.connector.count | UpDownCounter | {connector} | | The number of connectors run in this worker. | -| kafka.connect.worker.connector.startup.attempts | Counter | {attempt} | | The total number of connector startups that this worker has attempted. | -| kafka.connect.worker.connector.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that failed. | -| kafka.connect.worker.connector.startup.failure.total | Counter | {startup} | | The total number of connector starts that failed. | -| kafka.connect.worker.connector.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that succeeded. | -| kafka.connect.worker.connector.startup.success.total | Counter | {startup} | | The total number of connector starts that succeeded. | -| kafka.connect.worker.task.count | UpDownCounter | {task} | | The number of tasks run in this worker. | -| kafka.connect.worker.task.startup.attempts | Counter | {attempt} | | The total number of task startups that this worker has attempted. | -| kafka.connect.worker.task.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that failed. | -| kafka.connect.worker.task.startup.failure.total | Counter | {startup} | | The total number of task starts that failed. | -| kafka.connect.worker.task.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that succeeded. | -| kafka.connect.worker.task.startup.success.total | Counter | {startup} | | The total number of task starts that succeeded. | +| Metric Name | Type | Unit | Attributes | Description | +|-------------------------------------------------|---------------|-------------|------------|---------------------------------------------------------------------------------------------| +| kafka.connect.worker.connector.count | UpDownCounter | {connector} | | The number of connectors run in this worker. | +| kafka.connect.worker.connector.startup.attempts | Counter | {attempt} | | The total number of connector startups that this worker has attempted. | +| kafka.connect.worker.connector.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that failed. | +| kafka.connect.worker.connector.startup.failure.total | Counter | {startup} | | The total number of connector starts that failed. | +| kafka.connect.worker.connector.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that succeeded. | +| kafka.connect.worker.connector.startup.success.total | Counter | {startup} | | The total number of connector starts that succeeded. | +| kafka.connect.worker.task.count | UpDownCounter | {task} | | The number of tasks run in this worker. | +| kafka.connect.worker.task.startup.attempts | Counter | {attempt} | | The total number of task startups that this worker has attempted. | +| kafka.connect.worker.task.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that failed. | +| kafka.connect.worker.task.startup.failure.total | Counter | {startup} | | The total number of task starts that failed. | +| kafka.connect.worker.task.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that succeeded. | +| kafka.connect.worker.task.startup.success.total | Counter | {startup} | | The total number of task starts that succeeded. | ## Worker connector task metrics -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.worker.connector.task.destroyed | UpDownCounter | {task} | kafka.connect.connector | The number of destroyed tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.failed | UpDownCounter | {task} | kafka.connect.connector | The number of failed tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.paused | UpDownCounter | {task} | kafka.connect.connector | The number of paused tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.restarting | UpDownCounter | {task} | kafka.connect.connector | The number of restarting tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.running | UpDownCounter | {task} | kafka.connect.connector | The number of running tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.total | UpDownCounter | {task} | kafka.connect.connector | The number of tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.unassigned | UpDownCounter | {task} | kafka.connect.connector | The number of unassigned tasks of the connector on the worker. | +| Metric Name | Type | Unit | Attributes | Description | +|-------------------------------------------------|---------------|--------|------------------------|--------------------------------------------------------------------| +| kafka.connect.worker.connector.task.destroyed | UpDownCounter | {task} | kafka.connect.connector | The number of destroyed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.failed | UpDownCounter | {task} | kafka.connect.connector | The number of failed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.paused | UpDownCounter | {task} | kafka.connect.connector | The number of paused tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.restarting | UpDownCounter | {task} | kafka.connect.connector | The number of restarting tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.running | UpDownCounter | {task} | kafka.connect.connector | The number of running tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.total | UpDownCounter | {task} | kafka.connect.connector | The number of tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.unassigned | UpDownCounter | {task} | kafka.connect.connector | The number of unassigned tasks of the connector on the worker. | ## Worker rebalance metrics All metrics include `kafka.connect.worker.leader`. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.worker.rebalance.completed.total | Counter | {rebalance} | | The total number of rebalances completed by this worker. | -| kafka.connect.worker.rebalance.protocol | UpDownCounter | 1 | kafka.connect.protocol.state | The Connect protocol used by this cluster. | -| kafka.connect.worker.rebalance.epoch | UpDownCounter | {epoch} | | The epoch or generation number of this worker. | -| kafka.connect.worker.rebalance.leader | UpDownCounter | 1 | kafka.connect.worker.leader.state | The name of the group leader. | -| kafka.connect.worker.rebalance.avg.time | Gauge | s | | The average time in milliseconds spent by this worker to rebalance. | -| kafka.connect.worker.rebalance.max.time | Gauge | s | | The maximum time in milliseconds spent by this worker to rebalance. | -| kafka.connect.worker.rebalance.active | UpDownCounter | 1 | kafka.connect.worker.rebalance.state | Whether this worker is currently rebalancing. | -| kafka.connect.worker.rebalance.since_last | Gauge | s | | The time in milliseconds since this worker completed the most recent rebalance. | +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------------------|---------------|------------|-----------------------------------|---------------------------------------------------------------------------------| +| kafka.connect.worker.rebalance.completed.total | Counter | {rebalance} | | The total number of rebalances completed by this worker. | +| kafka.connect.worker.rebalance.protocol | UpDownCounter | 1 | kafka.connect.protocol.state | The Connect protocol used by this cluster. | +| kafka.connect.worker.rebalance.epoch | UpDownCounter | {epoch} | | The epoch or generation number of this worker. | +| kafka.connect.worker.rebalance.leader | UpDownCounter | 1 | kafka.connect.worker.leader.state | The name of the group leader. | +| kafka.connect.worker.rebalance.avg.time | Gauge | s | | The average time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.max.time | Gauge | s | | The maximum time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.active | UpDownCounter | 1 | kafka.connect.worker.rebalance.state | Whether this worker is currently rebalancing. | +| kafka.connect.worker.rebalance.since_last | Gauge | s | | The time in milliseconds since this worker completed the most recent rebalance. | ## Connector metrics From 81fa539db30fea78cfd692c517b4762b4d8eca5a Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Sat, 6 Dec 2025 13:18:49 -0500 Subject: [PATCH 05/13] fix spotless issues --- .../jmx/rules/KafkaConnectRuleTest.java | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java index 48de2f1a9b09..04928f4356f0 100644 --- a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -92,20 +92,13 @@ void taskStatusStateMappingSuperset() throws Exception { JmxConfig config = loadKafkaConnectConfig(); JmxRule connectorTaskRule = - getRuleForBean( - config, "kafka.connect:type=connector-task-metrics,connector=*,task=*"); + getRuleForBean(config, "kafka.connect:type=connector-task-metrics,connector=*,task=*"); StateMapping stateMapping = getMetric(connectorTaskRule, "status").getStateMapping(); assertThat(stateMapping.isEmpty()).isFalse(); assertThat(stateMapping.getStateKeys()) .contains( - "running", - "failed", - "paused", - "unassigned", - "restarting", - "destroyed", - "unknown"); + "running", "failed", "paused", "unassigned", "restarting", "destroyed", "unknown"); assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown"); assertThat(stateMapping.getStateValue("DESTROYED")).isEqualTo("destroyed"); assertThat(stateMapping.getStateValue("RESTARTING")).isEqualTo("restarting"); @@ -139,8 +132,7 @@ void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Excep startKafkaConnectTelemetry(); - assertLongSum( - "kafka.connect.worker.connector.count", Attributes.empty(), 1); + assertLongSum("kafka.connect.worker.connector.count", Attributes.empty(), 1); assertLongSum("kafka.connect.worker.task.count", Attributes.empty(), 2); Attributes connectorStatusAttributes = @@ -151,8 +143,7 @@ void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Excep .put("kafka.connect.connector.type.raw", "source") .put("kafka.connect.connector.version", "1.2.3") .build(); - assertLongSum( - "kafka.connect.connector.status", connectorStatusAttributes, 1); + assertLongSum("kafka.connect.connector.status", connectorStatusAttributes, 1); Attributes taskStatusAttributes = Attributes.builder() @@ -171,8 +162,7 @@ void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Excep @Test void apacheSpecificMetricsAreReportedWhenPresent() throws Exception { registerMBean( - "kafka.connect:type=connect-worker-rebalance-metrics", - mapOf("connect-protocol", "eager")); + "kafka.connect:type=connect-worker-rebalance-metrics", mapOf("connect-protocol", "eager")); registerMBean( "kafka.connect:type=connect-worker-metrics,connector=apache-connector", @@ -224,7 +214,9 @@ void apacheSpecificMetricsAreReportedWhenPresent() throws Exception { 1); Attributes connectorTaskAttributes = - Attributes.of(io.opentelemetry.api.common.AttributeKey.stringKey("kafka.connect.connector"), "apache-connector"); + Attributes.of( + io.opentelemetry.api.common.AttributeKey.stringKey("kafka.connect.connector"), + "apache-connector"); assertLongSum("kafka.connect.worker.connector.task.running", connectorTaskAttributes, 2); assertLongSum( @@ -380,7 +372,7 @@ private static void assertLongGauge(String metricName, Attributes attributes, lo metricName, metrics -> metrics.anySatisfy( - metric -> { + metric -> { boolean matched = metric.getLongGaugeData().getPoints().stream() .anyMatch( @@ -410,7 +402,8 @@ private static boolean attributesMatch(Attributes actual, Attributes expected) { * Minimal DynamicMBean implementation backed by a simple attribute map. This keeps the functional * Kafka Connect coverage self contained in the test file. */ - static class MapBackedDynamicMBean extends NotificationBroadcasterSupport implements DynamicMBean { + static class MapBackedDynamicMBean extends NotificationBroadcasterSupport + implements DynamicMBean { private final Map attributes; private long sequenceNumber = 1; @@ -479,12 +472,7 @@ public MBeanInfo getMBeanInfo() { (name, value) -> infos.add( new javax.management.MBeanAttributeInfo( - name, - value.getClass().getName(), - name + " attribute", - true, - true, - false))); + name, value.getClass().getName(), name + " attribute", true, true, false))); return new MBeanInfo( this.getClass().getName(), "Map backed test MBean", From dd176335a97b8c9d0312740cd12d13a6dc35541b Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Sat, 6 Dec 2025 13:22:18 -0500 Subject: [PATCH 06/13] fix readme --- .../jmx-metrics/library/kafka-connect.md | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md index f78ea6c14f1a..ff865b0a7822 100644 --- a/instrumentation/jmx-metrics/library/kafka-connect.md +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -17,20 +17,20 @@ formatting between the docs are cosmetic; bean names align across both variants. ## Worker metrics -| Metric Name | Type | Unit | Attributes | Description | -|-------------------------------------------------|---------------|-------------|------------|---------------------------------------------------------------------------------------------| -| kafka.connect.worker.connector.count | UpDownCounter | {connector} | | The number of connectors run in this worker. | -| kafka.connect.worker.connector.startup.attempts | Counter | {attempt} | | The total number of connector startups that this worker has attempted. | -| kafka.connect.worker.connector.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that failed. | -| kafka.connect.worker.connector.startup.failure.total | Counter | {startup} | | The total number of connector starts that failed. | -| kafka.connect.worker.connector.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that succeeded. | -| kafka.connect.worker.connector.startup.success.total | Counter | {startup} | | The total number of connector starts that succeeded. | -| kafka.connect.worker.task.count | UpDownCounter | {task} | | The number of tasks run in this worker. | -| kafka.connect.worker.task.startup.attempts | Counter | {attempt} | | The total number of task startups that this worker has attempted. | -| kafka.connect.worker.task.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that failed. | -| kafka.connect.worker.task.startup.failure.total | Counter | {startup} | | The total number of task starts that failed. | -| kafka.connect.worker.task.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that succeeded. | -| kafka.connect.worker.task.startup.success.total | Counter | {startup} | | The total number of task starts that succeeded. | +| Metric Name | Type | Unit | Attributes | Description | +|-----------------------------------------------------------|---------------|-------------|------------|---------------------------------------------------------------------------| +| kafka.connect.worker.connector.count | UpDownCounter | {connector} | | The number of connectors run in this worker. | +| kafka.connect.worker.connector.startup.attempts | Counter | {attempt} | | The total number of connector startups that this worker has attempted. | +| kafka.connect.worker.connector.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that failed. | +| kafka.connect.worker.connector.startup.failure.total | Counter | {startup} | | The total number of connector starts that failed. | +| kafka.connect.worker.connector.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that succeeded. | +| kafka.connect.worker.connector.startup.success.total | Counter | {startup} | | The total number of connector starts that succeeded. | +| kafka.connect.worker.task.count | UpDownCounter | {task} | | The number of tasks run in this worker. | +| kafka.connect.worker.task.startup.attempts | Counter | {attempt} | | The total number of task startups that this worker has attempted. | +| kafka.connect.worker.task.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that failed. | +| kafka.connect.worker.task.startup.failure.total | Counter | {startup} | | The total number of task starts that failed. | +| kafka.connect.worker.task.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that succeeded. | +| kafka.connect.worker.task.startup.success.total | Counter | {startup} | | The total number of task starts that succeeded. | ## Worker connector task metrics From c985e8692888939b28c56409c47d3ffb9c0cc0cf Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Sat, 6 Dec 2025 13:44:21 -0500 Subject: [PATCH 07/13] finally fix markdown --- .../jmx-metrics/library/kafka-connect.md | 194 +++++++++--------- 1 file changed, 97 insertions(+), 97 deletions(-) diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md index ff865b0a7822..487b9c937ea5 100644 --- a/instrumentation/jmx-metrics/library/kafka-connect.md +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -34,140 +34,140 @@ formatting between the docs are cosmetic; bean names align across both variants. ## Worker connector task metrics -| Metric Name | Type | Unit | Attributes | Description | -|-------------------------------------------------|---------------|--------|------------------------|--------------------------------------------------------------------| -| kafka.connect.worker.connector.task.destroyed | UpDownCounter | {task} | kafka.connect.connector | The number of destroyed tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.failed | UpDownCounter | {task} | kafka.connect.connector | The number of failed tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.paused | UpDownCounter | {task} | kafka.connect.connector | The number of paused tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.restarting | UpDownCounter | {task} | kafka.connect.connector | The number of restarting tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.running | UpDownCounter | {task} | kafka.connect.connector | The number of running tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.total | UpDownCounter | {task} | kafka.connect.connector | The number of tasks of the connector on the worker. | -| kafka.connect.worker.connector.task.unassigned | UpDownCounter | {task} | kafka.connect.connector | The number of unassigned tasks of the connector on the worker. | +| Metric Name | Type | Unit | Attributes | Description | +|------------------------------------------------|---------------|--------|-------------------------|----------------------------------------------------------------| +| kafka.connect.worker.connector.task.destroyed | UpDownCounter | {task} | kafka.connect.connector | The number of destroyed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.failed | UpDownCounter | {task} | kafka.connect.connector | The number of failed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.paused | UpDownCounter | {task} | kafka.connect.connector | The number of paused tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.restarting | UpDownCounter | {task} | kafka.connect.connector | The number of restarting tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.running | UpDownCounter | {task} | kafka.connect.connector | The number of running tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.total | UpDownCounter | {task} | kafka.connect.connector | The number of tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.unassigned | UpDownCounter | {task} | kafka.connect.connector | The number of unassigned tasks of the connector on the worker. | ## Worker rebalance metrics All metrics include `kafka.connect.worker.leader`. -| Metric Name | Type | Unit | Attributes | Description | -|---------------------------------------------|---------------|------------|-----------------------------------|---------------------------------------------------------------------------------| -| kafka.connect.worker.rebalance.completed.total | Counter | {rebalance} | | The total number of rebalances completed by this worker. | -| kafka.connect.worker.rebalance.protocol | UpDownCounter | 1 | kafka.connect.protocol.state | The Connect protocol used by this cluster. | -| kafka.connect.worker.rebalance.epoch | UpDownCounter | {epoch} | | The epoch or generation number of this worker. | -| kafka.connect.worker.rebalance.leader | UpDownCounter | 1 | kafka.connect.worker.leader.state | The name of the group leader. | -| kafka.connect.worker.rebalance.avg.time | Gauge | s | | The average time in milliseconds spent by this worker to rebalance. | -| kafka.connect.worker.rebalance.max.time | Gauge | s | | The maximum time in milliseconds spent by this worker to rebalance. | -| kafka.connect.worker.rebalance.active | UpDownCounter | 1 | kafka.connect.worker.rebalance.state | Whether this worker is currently rebalancing. | -| kafka.connect.worker.rebalance.since_last | Gauge | s | | The time in milliseconds since this worker completed the most recent rebalance. | +| Metric Name | Type | Unit | Attributes | Description | +|------------------------------------------------|---------------|-------------|--------------------------------------|---------------------------------------------------------------------------------| +| kafka.connect.worker.rebalance.completed.total | Counter | {rebalance} | | The total number of rebalances completed by this worker. | +| kafka.connect.worker.rebalance.protocol | UpDownCounter | 1 | kafka.connect.protocol.state | The Connect protocol used by this cluster. | +| kafka.connect.worker.rebalance.epoch | UpDownCounter | {epoch} | | The epoch or generation number of this worker. | +| kafka.connect.worker.rebalance.leader | UpDownCounter | 1 | kafka.connect.worker.leader.state | The name of the group leader. | +| kafka.connect.worker.rebalance.avg.time | Gauge | s | | The average time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.max.time | Gauge | s | | The maximum time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.active | UpDownCounter | 1 | kafka.connect.worker.rebalance.state | Whether this worker is currently rebalancing. | +| kafka.connect.worker.rebalance.since_last | Gauge | s | | The time in milliseconds since this worker completed the most recent rebalance. | ## Connector metrics Attributes: `kafka.connect.connector`, `kafka.connect.connector.class`, `kafka.connect.connector.version`, `kafka.connect.connector.type.raw`. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.connector.class | UpDownCounter | 1 | kafka.connect.connector.class.state | The name of the connector class. | -| kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. | -| kafka.connect.connector.version | UpDownCounter | 1 | kafka.connect.connector.version.state | The version of the connector class, as reported by the connector. | -| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value); accepts running, paused, stopped, failed, restarting, unassigned, degraded, or unknown. | +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------|---------------|------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| kafka.connect.connector.class | UpDownCounter | 1 | kafka.connect.connector.class.state | The name of the connector class. | +| kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. | +| kafka.connect.connector.version | UpDownCounter | 1 | kafka.connect.connector.version.state | The version of the connector class, as reported by the connector. | +| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value); accepts running, paused, stopped, failed, restarting, unassigned, degraded, or unknown. | ## Predicate metrics Attributes: `kafka.connect.connector`, `kafka.connect.task.id`, `kafka.connect.predicate`, `kafka.connect.predicate.class`, `kafka.connect.predicate.version`. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.predicate.class | UpDownCounter | 1 | kafka.connect.predicate.class.state | The class name of the predicate class. | -| kafka.connect.predicate.version | UpDownCounter | 1 | kafka.connect.predicate.version.state | The version of the predicate class. | +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------|---------------|------|---------------------------------------|----------------------------------------| +| kafka.connect.predicate.class | UpDownCounter | 1 | kafka.connect.predicate.class.state | The class name of the predicate class. | +| kafka.connect.predicate.version | UpDownCounter | 1 | kafka.connect.predicate.version.state | The version of the predicate class. | ## Connector task metrics Attributes include `kafka.connect.connector`, `kafka.connect.task.id`, connector class/type/version, converter class/version attributes, and task class/version. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.task.batch.size.avg | Gauge | {record} | | The average number of records in the batches the task has processed so far. | -| kafka.connect.task.batch.size.max | Gauge | {record} | | The number of records in the largest batch the task has processed so far. | -| kafka.connect.task.connector.class | UpDownCounter | 1 | kafka.connect.task.connector.class.state | The name of the connector class. | -| kafka.connect.task.connector.type | UpDownCounter | 1 | kafka.connect.task.connector.type | The type of the connector. One of 'source' or 'sink'. | -| kafka.connect.task.connector.version | UpDownCounter | 1 | kafka.connect.task.connector.version.state | The version of the connector class, as reported by the connector. | -| kafka.connect.task.header.converter.class | UpDownCounter | 1 | kafka.connect.task.header.converter.class.state | The fully qualified class name from header.converter. | -| kafka.connect.task.header.converter.version | UpDownCounter | 1 | kafka.connect.task.header.converter.version.state | The version instantiated for header.converter. May be undefined. | -| kafka.connect.task.key.converter.class | UpDownCounter | 1 | kafka.connect.task.key.converter.class.state | The fully qualified class name from key.converter. | -| kafka.connect.task.key.converter.version | UpDownCounter | 1 | kafka.connect.task.key.converter.version.state | The version instantiated for key.converter. May be undefined. | -| kafka.connect.task.offset.commit.avg.time | Gauge | s | | The average time in milliseconds taken by this task to commit offsets. | -| kafka.connect.task.offset.commit.failure.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that failed. | -| kafka.connect.task.offset.commit.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to commit offsets. | -| kafka.connect.task.offset.commit.success.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that succeeded. | -| kafka.connect.task.pause.ratio | Gauge | 1 | | The fraction of time this task has spent in the pause state. | -| kafka.connect.task.running.ratio | Gauge | 1 | | The fraction of time this task has spent in the running state. | -| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task; supports running, paused, failed, restarting, unassigned, destroyed, or unknown. | -| kafka.connect.task.class | UpDownCounter | 1 | kafka.connect.task.class.state | The class name of the task. | -| kafka.connect.task.version | UpDownCounter | 1 | kafka.connect.task.version.state | The version of the task. | -| kafka.connect.task.value.converter.class | UpDownCounter | 1 | kafka.connect.task.value.converter.class.state | The fully qualified class name from value.converter. | -| kafka.connect.task.value.converter.version | UpDownCounter | 1 | kafka.connect.task.value.converter.version.state | The version instantiated for value.converter. May be undefined. | +| Metric Name | Type | Unit | Attributes | Description | +|-----------------------------------------------------|---------------|----------|---------------------------------------------------|--------------------------------------------------------------------------------------------------------------------| +| kafka.connect.task.batch.size.avg | Gauge | {record} | | The average number of records in the batches the task has processed so far. | +| kafka.connect.task.batch.size.max | Gauge | {record} | | The number of records in the largest batch the task has processed so far. | +| kafka.connect.task.connector.class | UpDownCounter | 1 | kafka.connect.task.connector.class.state | The name of the connector class. | +| kafka.connect.task.connector.type | UpDownCounter | 1 | kafka.connect.task.connector.type | The type of the connector. One of 'source' or 'sink'. | +| kafka.connect.task.connector.version | UpDownCounter | 1 | kafka.connect.task.connector.version.state | The version of the connector class, as reported by the connector. | +| kafka.connect.task.header.converter.class | UpDownCounter | 1 | kafka.connect.task.header.converter.class.state | The fully qualified class name from header.converter. | +| kafka.connect.task.header.converter.version | UpDownCounter | 1 | kafka.connect.task.header.converter.version.state | The version instantiated for header.converter. May be undefined. | +| kafka.connect.task.key.converter.class | UpDownCounter | 1 | kafka.connect.task.key.converter.class.state | The fully qualified class name from key.converter. | +| kafka.connect.task.key.converter.version | UpDownCounter | 1 | kafka.connect.task.key.converter.version.state | The version instantiated for key.converter. May be undefined. | +| kafka.connect.task.offset.commit.avg.time | Gauge | s | | The average time in milliseconds taken by this task to commit offsets. | +| kafka.connect.task.offset.commit.failure.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that failed. | +| kafka.connect.task.offset.commit.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to commit offsets. | +| kafka.connect.task.offset.commit.success.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that succeeded. | +| kafka.connect.task.pause.ratio | Gauge | 1 | | The fraction of time this task has spent in the pause state. | +| kafka.connect.task.running.ratio | Gauge | 1 | | The fraction of time this task has spent in the running state. | +| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task; supports running, paused, failed, restarting, unassigned, destroyed, or unknown. | +| kafka.connect.task.class | UpDownCounter | 1 | kafka.connect.task.class.state | The class name of the task. | +| kafka.connect.task.version | UpDownCounter | 1 | kafka.connect.task.version.state | The version of the task. | +| kafka.connect.task.value.converter.class | UpDownCounter | 1 | kafka.connect.task.value.converter.class.state | The fully qualified class name from value.converter. | +| kafka.connect.task.value.converter.version | UpDownCounter | 1 | kafka.connect.task.value.converter.version.state | The version instantiated for value.converter. May be undefined. | ## Transform metrics Attributes: `kafka.connect.connector`, `kafka.connect.task.id`, `kafka.connect.transform`, `kafka.connect.transform.class`, `kafka.connect.transform.version`. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.transform.class | UpDownCounter | 1 | kafka.connect.transform.class.state | The class name of the transformation class. | -| kafka.connect.transform.version | UpDownCounter | 1 | kafka.connect.transform.version.state | The version of the transformation class. | +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------|---------------|------|---------------------------------------|---------------------------------------------| +| kafka.connect.transform.class | UpDownCounter | 1 | kafka.connect.transform.class.state | The class name of the transformation class. | +| kafka.connect.transform.version | UpDownCounter | 1 | kafka.connect.transform.version.state | The version of the transformation class. | ## Sink task metrics Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.sink.offset.commit.completion.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were completed successfully. | -| kafka.connect.sink.offset.commit.completion.total | Counter | {commit} | | The total number of offset commit completions that were completed successfully. | -| kafka.connect.sink.offset.commit.seq | UpDownCounter | {sequence} | | The current sequence number for offset commits. | -| kafka.connect.sink.offset.commit.skip.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were received too late and skipped/ignored. | -| kafka.connect.sink.offset.commit.skip.total | Counter | {commit} | | The total number of offset commit completions that were received too late and skipped/ignored. | -| kafka.connect.sink.partition.count | UpDownCounter | {partition} | | The number of topic partitions assigned to this task. | -| kafka.connect.sink.put.batch.avg.time | Gauge | s | | The average time taken by this task to put a batch of sinks records. | -| kafka.connect.sink.put.batch.max.time | Gauge | s | | The maximum time taken by this task to put a batch of sinks records. | -| kafka.connect.sink.record.active.count | UpDownCounter | {record} | | The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | -| kafka.connect.sink.record.active.count.avg | Gauge | {record} | | The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | -| kafka.connect.sink.record.active.count.max | Gauge | {record} | | The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | -| kafka.connect.sink.record.lag.max | Gauge | {record} | | The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions. | -| kafka.connect.sink.record.read.rate | Gauge | {record}/s | | The average per-second number of records read from Kafka for this task before transformations are applied. | -| kafka.connect.sink.record.read.total | Counter | {record} | | The total number of records read from Kafka by this task since it was last restarted. | -| kafka.connect.sink.record.send.rate | Gauge | {record}/s | | The average per-second number of records output from the transformations and sent/put to this task. | -| kafka.connect.sink.record.send.total | Counter | {record} | | The total number of records output from the transformations and sent/put to this task since it was last restarted. | +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------------------------|---------------|-------------|------------|--------------------------------------------------------------------------------------------------------------------------------------| +| kafka.connect.sink.offset.commit.completion.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were completed successfully. | +| kafka.connect.sink.offset.commit.completion.total | Counter | {commit} | | The total number of offset commit completions that were completed successfully. | +| kafka.connect.sink.offset.commit.seq | UpDownCounter | {sequence} | | The current sequence number for offset commits. | +| kafka.connect.sink.offset.commit.skip.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were received too late and skipped/ignored. | +| kafka.connect.sink.offset.commit.skip.total | Counter | {commit} | | The total number of offset commit completions that were received too late and skipped/ignored. | +| kafka.connect.sink.partition.count | UpDownCounter | {partition} | | The number of topic partitions assigned to this task. | +| kafka.connect.sink.put.batch.avg.time | Gauge | s | | The average time taken by this task to put a batch of sinks records. | +| kafka.connect.sink.put.batch.max.time | Gauge | s | | The maximum time taken by this task to put a batch of sinks records. | +| kafka.connect.sink.record.active.count | UpDownCounter | {record} | | The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.active.count.avg | Gauge | {record} | | The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.active.count.max | Gauge | {record} | | The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.lag.max | Gauge | {record} | | The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions. | +| kafka.connect.sink.record.read.rate | Gauge | {record}/s | | The average per-second number of records read from Kafka for this task before transformations are applied. | +| kafka.connect.sink.record.read.total | Counter | {record} | | The total number of records read from Kafka by this task since it was last restarted. | +| kafka.connect.sink.record.send.rate | Gauge | {record}/s | | The average per-second number of records output from the transformations and sent/put to this task. | +| kafka.connect.sink.record.send.total | Counter | {record} | | The total number of records output from the transformations and sent/put to this task since it was last restarted. | ## Source task metrics Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.source.poll.batch.avg.time | Gauge | s | | The average time in milliseconds taken by this task to poll for a batch of source records. | -| kafka.connect.source.poll.batch.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to poll for a batch of source records. | -| kafka.connect.source.record.active.count | UpDownCounter | {record} | | The number of records that have been produced by this task but not yet completely written to Kafka. | -| kafka.connect.source.record.active.count.avg | Gauge | {record} | | The average number of records that have been produced by this task but not yet completely written to Kafka. | -| kafka.connect.source.record.active.count.max | Gauge | {record} | | The maximum number of records that have been produced by this task but not yet completely written to Kafka. | -| kafka.connect.source.record.poll.rate | Gauge | {record}/s | | The average per-second number of records produced/polled (before transformation) by this task. | -| kafka.connect.source.record.poll.total | Counter | {record} | | The total number of records produced/polled (before transformation) by this task. | -| kafka.connect.source.record.write.rate | Gauge | {record}/s | | The average per-second number of records written to Kafka for this task. | -| kafka.connect.source.record.write.total | Counter | {record} | | The number of records output written to Kafka for this task. | -| kafka.connect.source.transaction.size.avg | Gauge | {record} | | The average number of records in the transactions the task has committed so far. | -| kafka.connect.source.transaction.size.max | Gauge | {record} | | The number of records in the largest transaction the task has committed so far. | -| kafka.connect.source.transaction.size.min | Gauge | {record} | | The number of records in the smallest transaction the task has committed so far. | +| Metric Name | Type | Unit | Attributes | Description | +|----------------------------------------------|---------------|------------|------------|-------------------------------------------------------------------------------------------------------------| +| kafka.connect.source.poll.batch.avg.time | Gauge | s | | The average time in milliseconds taken by this task to poll for a batch of source records. | +| kafka.connect.source.poll.batch.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to poll for a batch of source records. | +| kafka.connect.source.record.active.count | UpDownCounter | {record} | | The number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.active.count.avg | Gauge | {record} | | The average number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.active.count.max | Gauge | {record} | | The maximum number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.poll.rate | Gauge | {record}/s | | The average per-second number of records produced/polled (before transformation) by this task. | +| kafka.connect.source.record.poll.total | Counter | {record} | | The total number of records produced/polled (before transformation) by this task. | +| kafka.connect.source.record.write.rate | Gauge | {record}/s | | The average per-second number of records written to Kafka for this task. | +| kafka.connect.source.record.write.total | Counter | {record} | | The number of records output written to Kafka for this task. | +| kafka.connect.source.transaction.size.avg | Gauge | {record} | | The average number of records in the transactions the task has committed so far. | +| kafka.connect.source.transaction.size.max | Gauge | {record} | | The number of records in the largest transaction the task has committed so far. | +| kafka.connect.source.transaction.size.min | Gauge | {record} | | The number of records in the smallest transaction the task has committed so far. | ## Task error metrics Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. -| Metric Name | Type | Unit | Attributes | Description | -|-------------|------|------|------------|-------------| -| kafka.connect.task.error.deadletterqueue.produce.failures | Counter | {failure} | | The number of failed writes to the dead letter queue. | -| kafka.connect.task.error.deadletterqueue.produce.requests | Counter | {request} | | The number of attempted writes to the dead letter queue. | -| kafka.connect.task.error.last.error.timestamp | Gauge | s | | The epoch timestamp when this task last encountered an error. | -| kafka.connect.task.error.total.errors.logged | Counter | {error} | | The number of errors that were logged. | -| kafka.connect.task.error.total.record.errors | Counter | {record} | | The number of record processing errors in this task. | -| kafka.connect.task.error.total.record.failures | Counter | {record} | | The number of record processing failures in this task. | -| kafka.connect.task.error.total.records.skipped | Counter | {record} | | The number of records skipped due to errors. | -| kafka.connect.task.error.total.retries | Counter | {retry} | | The number of operations retried. | +| Metric Name | Type | Unit | Attributes | Description | +|-----------------------------------------------------------|---------|-----------|------------|---------------------------------------------------------------| +| kafka.connect.task.error.deadletterqueue.produce.failures | Counter | {failure} | | The number of failed writes to the dead letter queue. | +| kafka.connect.task.error.deadletterqueue.produce.requests | Counter | {request} | | The number of attempted writes to the dead letter queue. | +| kafka.connect.task.error.last.error.timestamp | Gauge | s | | The epoch timestamp when this task last encountered an error. | +| kafka.connect.task.error.total.errors.logged | Counter | {error} | | The number of errors that were logged. | +| kafka.connect.task.error.total.record.errors | Counter | {record} | | The number of record processing errors in this task. | +| kafka.connect.task.error.total.record.failures | Counter | {record} | | The number of record processing failures in this task. | +| kafka.connect.task.error.total.records.skipped | Counter | {record} | | The number of records skipped due to errors. | +| kafka.connect.task.error.total.retries | Counter | {retry} | | The number of operations retried. | From 9ccdc08398de748bf6d0c2defa9154c453ddb941 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Mon, 8 Dec 2025 10:28:51 -0500 Subject: [PATCH 08/13] change from state to updowncounter for wider compatibility --- .../jmx-metrics/library/kafka-connect.md | 4 +- .../resources/jmx/rules/kafka-connect.yaml | 46 +++++++++---------- .../jmx/rules/KafkaConnectRuleTest.java | 20 ++++++++ 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md index 487b9c937ea5..0e6503cd0544 100644 --- a/instrumentation/jmx-metrics/library/kafka-connect.md +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -1,8 +1,8 @@ # Kafka Connect Metrics Here is the list of metrics based on MBeans exposed by Apache Kafka Connect. String-valued JMX -attributes (class/type/version information) are exported as state metrics with value `1` and -carry the raw string value as metric attributes. +attributes (class/type/version information) are exported as `UpDownCounter` metrics with value `1` +and carry the raw string value as metric attributes. ## Compatibility diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml index bc3392924f96..67d7913025f0 100644 --- a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -131,7 +131,7 @@ rules: # kafka.connect.worker.rebalance.protocol connect-protocol: metric: protocol - type: state + type: updowncounter desc: The Connect protocol used by this cluster. metricAttribute: kafka.connect.protocol.state: @@ -147,7 +147,7 @@ rules: # kafka.connect.worker.rebalance.leader leader-name: metric: leader - type: state + type: updowncounter desc: The name of the group leader. metricAttribute: kafka.connect.worker.leader.state: @@ -169,7 +169,7 @@ rules: # kafka.connect.worker.rebalance.active rebalancing: metric: active - type: state + type: updowncounter unit: "1" desc: Whether this worker is currently rebalancing. metricAttribute: @@ -196,7 +196,7 @@ rules: # kafka.connect.connector.class connector-class: metric: class - type: state + type: updowncounter desc: The name of the connector class. metricAttribute: kafka.connect.connector.class.state: @@ -204,7 +204,7 @@ rules: # kafka.connect.connector.type connector-type: metric: type - type: state + type: updowncounter desc: The type of the connector. One of 'source' or 'sink'. metricAttribute: kafka.connect.connector.type: @@ -214,7 +214,7 @@ rules: # kafka.connect.connector.version connector-version: metric: version - type: state + type: updowncounter desc: The version of the connector class, as reported by the connector. metricAttribute: kafka.connect.connector.version.state: @@ -222,7 +222,7 @@ rules: # kafka.connect.connector.status status: metric: status - type: state + type: updowncounter desc: Connector lifecycle state indicator (1 when the state matches the attribute value). Supports Apache and Confluent status values. metricAttribute: kafka.connect.connector.state: @@ -247,7 +247,7 @@ rules: # kafka.connect.predicate.class predicate-class: metric: class - type: state + type: updowncounter desc: The class name of the predicate class metricAttribute: kafka.connect.predicate.class.state: @@ -255,7 +255,7 @@ rules: # kafka.connect.predicate.version predicate-version: metric: version - type: state + type: updowncounter desc: The version of the predicate class metricAttribute: kafka.connect.predicate.version.state: @@ -293,7 +293,7 @@ rules: # kafka.connect.task.connector.class connector-class: metric: connector.class - type: state + type: updowncounter desc: The name of the connector class. metricAttribute: kafka.connect.task.connector.class.state: @@ -301,7 +301,7 @@ rules: # kafka.connect.task.connector.type connector-type: metric: connector.type - type: state + type: updowncounter desc: The type of the connector. One of 'source' or 'sink'. metricAttribute: kafka.connect.task.connector.type: @@ -311,7 +311,7 @@ rules: # kafka.connect.task.connector.version connector-version: metric: connector.version - type: state + type: updowncounter desc: The version of the connector class, as reported by the connector. metricAttribute: kafka.connect.task.connector.version.state: @@ -319,7 +319,7 @@ rules: # kafka.connect.task.header.converter.class header-converter-class: metric: header.converter.class - type: state + type: updowncounter desc: The fully qualified class name from header.converter metricAttribute: kafka.connect.task.header.converter.class.state: @@ -327,7 +327,7 @@ rules: # kafka.connect.task.header.converter.version header-converter-version: metric: header.converter.version - type: state + type: updowncounter desc: The version instantiated for header.converter. May be undefined metricAttribute: kafka.connect.task.header.converter.version.state: @@ -335,7 +335,7 @@ rules: # kafka.connect.task.key.converter.class key-converter-class: metric: key.converter.class - type: state + type: updowncounter desc: The fully qualified class name from key.converter metricAttribute: kafka.connect.task.key.converter.class.state: @@ -343,7 +343,7 @@ rules: # kafka.connect.task.key.converter.version key-converter-version: metric: key.converter.version - type: state + type: updowncounter desc: The version instantiated for key.converter. May be undefined metricAttribute: kafka.connect.task.key.converter.version.state: @@ -391,7 +391,7 @@ rules: # kafka.connect.task.status status: metric: status - type: state + type: updowncounter desc: The status of the connector task. Supports Apache (unassigned, running, paused, failed, restarting) and Confluent (unassigned, running, paused, failed, destroyed) values. metricAttribute: kafka.connect.task.state: @@ -405,7 +405,7 @@ rules: # kafka.connect.task.class task-class: metric: task.class - type: state + type: updowncounter desc: The class name of the task. metricAttribute: kafka.connect.task.class.state: @@ -413,7 +413,7 @@ rules: # kafka.connect.task.version task-version: metric: task.version - type: state + type: updowncounter desc: The version of the task. metricAttribute: kafka.connect.task.version.state: @@ -421,7 +421,7 @@ rules: # kafka.connect.task.value.converter.class value-converter-class: metric: value.converter.class - type: state + type: updowncounter desc: The fully qualified class name from value.converter metricAttribute: kafka.connect.task.value.converter.class.state: @@ -429,7 +429,7 @@ rules: # kafka.connect.task.value.converter.version value-converter-version: metric: value.converter.version - type: state + type: updowncounter desc: The version instantiated for value.converter. May be undefined metricAttribute: kafka.connect.task.value.converter.version.state: @@ -447,7 +447,7 @@ rules: # kafka.connect.transform.class transform-class: metric: class - type: state + type: updowncounter desc: The class name of the transformation class metricAttribute: kafka.connect.transform.class.state: @@ -455,7 +455,7 @@ rules: # kafka.connect.transform.version transform-version: metric: version - type: state + type: updowncounter desc: The version of the transformation class metricAttribute: kafka.connect.transform.version.state: diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java index 04928f4356f0..10a222cfbc10 100644 --- a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -10,6 +10,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.instrumentation.jmx.JmxTelemetry; +import io.opentelemetry.instrumentation.jmx.engine.MetricInfo; import io.opentelemetry.instrumentation.jmx.yaml.JmxConfig; import io.opentelemetry.instrumentation.jmx.yaml.JmxRule; import io.opentelemetry.instrumentation.jmx.yaml.Metric; @@ -61,6 +62,21 @@ void kafkaConnectConfigParsesAndBuilds() throws Exception { } } + @Test + void kafkaConnectRulesUseBasicMetricTypes() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + + assertThat(config.getRules()) + .allSatisfy( + rule -> { + assertThat(rule.getMetricType()).isNotEqualTo(MetricInfo.Type.STATE); + rule + .getMapping() + .values() + .forEach(metric -> assertThat(metric.getMetricType()).isNotEqualTo(MetricInfo.Type.STATE)); + }); + } + @Test void connectorStatusStateMappingPresent() throws Exception { JmxConfig config = loadKafkaConnectConfig(); @@ -69,6 +85,8 @@ void connectorStatusStateMappingPresent() throws Exception { getRuleForBean(config, "kafka.connect:type=connector-metrics,connector=*"); StateMapping stateMapping = getMetric(connectorRule, "status").getStateMapping(); + assertThat(getMetric(connectorRule, "status").getMetricType()) + .isEqualTo(MetricInfo.Type.UPDOWNCOUNTER); assertThat(stateMapping.isEmpty()).isFalse(); assertThat(stateMapping.getStateKeys()) .contains( @@ -95,6 +113,8 @@ void taskStatusStateMappingSuperset() throws Exception { getRuleForBean(config, "kafka.connect:type=connector-task-metrics,connector=*,task=*"); StateMapping stateMapping = getMetric(connectorTaskRule, "status").getStateMapping(); + assertThat(getMetric(connectorTaskRule, "status").getMetricType()) + .isEqualTo(MetricInfo.Type.UPDOWNCOUNTER); assertThat(stateMapping.isEmpty()).isFalse(); assertThat(stateMapping.getStateKeys()) .contains( From 471c9a7ce7e54b347754dbb2f6b283dcbef94d74 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Mon, 8 Dec 2025 10:33:24 -0500 Subject: [PATCH 09/13] Apply spotless --- .../instrumentation/jmx/rules/KafkaConnectRuleTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java index 10a222cfbc10..f9eb89774d7b 100644 --- a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -70,10 +70,11 @@ void kafkaConnectRulesUseBasicMetricTypes() throws Exception { .allSatisfy( rule -> { assertThat(rule.getMetricType()).isNotEqualTo(MetricInfo.Type.STATE); - rule - .getMapping() + rule.getMapping() .values() - .forEach(metric -> assertThat(metric.getMetricType()).isNotEqualTo(MetricInfo.Type.STATE)); + .forEach( + metric -> + assertThat(metric.getMetricType()).isNotEqualTo(MetricInfo.Type.STATE)); }); } From 7ceb694ac213518b8d811f44b0cab0c4c5502233 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Mon, 8 Dec 2025 11:42:07 -0500 Subject: [PATCH 10/13] update for simplicity and further compatibility --- .../jmx-metrics/library/kafka-connect.md | 52 +---- .../resources/jmx/rules/kafka-connect.yaml | 204 +----------------- .../jmx/rules/KafkaConnectRuleTest.java | 94 ++------ 3 files changed, 35 insertions(+), 315 deletions(-) diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md index 0e6503cd0544..4440ee99df04 100644 --- a/instrumentation/jmx-metrics/library/kafka-connect.md +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -1,15 +1,14 @@ # Kafka Connect Metrics Here is the list of metrics based on MBeans exposed by Apache Kafka Connect. String-valued JMX -attributes (class/type/version information) are exported as `UpDownCounter` metrics with value `1` -and carry the raw string value as metric attributes. +attributes are exported as `UpDownCounter` metrics with value `1` and only include connector/task +identifiers alongside any state-mapping attributes. ## Compatibility This rule set targets both Apache Kafka Connect and Confluent Platform. Apache documents several metrics not surfaced in Confluent docs (worker rebalance protocol, per-connector task counts on -workers, predicate/transform metadata, connector task metadata including converter info, source -transaction size stats, and sink record lag max); all of them are included below. Status metrics use +workers, source transaction size stats, and sink record lag max); all of them are included below. Status metrics use the superset of values across both variants (connector: running, paused, stopped, failed, restarting, unassigned, degraded; task: running, paused, failed, restarting, unassigned, destroyed) and fall back to `unknown` for any new values. Differences in bean placeholder @@ -46,14 +45,11 @@ formatting between the docs are cosmetic; bean names align across both variants. ## Worker rebalance metrics -All metrics include `kafka.connect.worker.leader`. - | Metric Name | Type | Unit | Attributes | Description | |------------------------------------------------|---------------|-------------|--------------------------------------|---------------------------------------------------------------------------------| | kafka.connect.worker.rebalance.completed.total | Counter | {rebalance} | | The total number of rebalances completed by this worker. | | kafka.connect.worker.rebalance.protocol | UpDownCounter | 1 | kafka.connect.protocol.state | The Connect protocol used by this cluster. | | kafka.connect.worker.rebalance.epoch | UpDownCounter | {epoch} | | The epoch or generation number of this worker. | -| kafka.connect.worker.rebalance.leader | UpDownCounter | 1 | kafka.connect.worker.leader.state | The name of the group leader. | | kafka.connect.worker.rebalance.avg.time | Gauge | s | | The average time in milliseconds spent by this worker to rebalance. | | kafka.connect.worker.rebalance.max.time | Gauge | s | | The maximum time in milliseconds spent by this worker to rebalance. | | kafka.connect.worker.rebalance.active | UpDownCounter | 1 | kafka.connect.worker.rebalance.state | Whether this worker is currently rebalancing. | @@ -61,39 +57,22 @@ All metrics include `kafka.connect.worker.leader`. ## Connector metrics -Attributes: `kafka.connect.connector`, `kafka.connect.connector.class`, `kafka.connect.connector.version`, `kafka.connect.connector.type.raw`. - -| Metric Name | Type | Unit | Attributes | Description | -|---------------------------------|---------------|------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| kafka.connect.connector.class | UpDownCounter | 1 | kafka.connect.connector.class.state | The name of the connector class. | -| kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. | -| kafka.connect.connector.version | UpDownCounter | 1 | kafka.connect.connector.version.state | The version of the connector class, as reported by the connector. | -| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value); accepts running, paused, stopped, failed, restarting, unassigned, degraded, or unknown. | - -## Predicate metrics - -Attributes: `kafka.connect.connector`, `kafka.connect.task.id`, `kafka.connect.predicate`, `kafka.connect.predicate.class`, `kafka.connect.predicate.version`. +Attributes: `kafka.connect.connector` and the state attribute shown. -| Metric Name | Type | Unit | Attributes | Description | -|---------------------------------|---------------|------|---------------------------------------|----------------------------------------| -| kafka.connect.predicate.class | UpDownCounter | 1 | kafka.connect.predicate.class.state | The class name of the predicate class. | -| kafka.connect.predicate.version | UpDownCounter | 1 | kafka.connect.predicate.version.state | The version of the predicate class. | +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------|---------------|------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. | +| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value); accepts running, paused, stopped, failed, restarting, unassigned, degraded, or unknown. | ## Connector task metrics -Attributes include `kafka.connect.connector`, `kafka.connect.task.id`, connector class/type/version, converter class/version attributes, and task class/version. +All metrics include `kafka.connect.connector` and `kafka.connect.task.id`. Attributes column lists any additional state attributes. | Metric Name | Type | Unit | Attributes | Description | |-----------------------------------------------------|---------------|----------|---------------------------------------------------|--------------------------------------------------------------------------------------------------------------------| | kafka.connect.task.batch.size.avg | Gauge | {record} | | The average number of records in the batches the task has processed so far. | | kafka.connect.task.batch.size.max | Gauge | {record} | | The number of records in the largest batch the task has processed so far. | -| kafka.connect.task.connector.class | UpDownCounter | 1 | kafka.connect.task.connector.class.state | The name of the connector class. | | kafka.connect.task.connector.type | UpDownCounter | 1 | kafka.connect.task.connector.type | The type of the connector. One of 'source' or 'sink'. | -| kafka.connect.task.connector.version | UpDownCounter | 1 | kafka.connect.task.connector.version.state | The version of the connector class, as reported by the connector. | -| kafka.connect.task.header.converter.class | UpDownCounter | 1 | kafka.connect.task.header.converter.class.state | The fully qualified class name from header.converter. | -| kafka.connect.task.header.converter.version | UpDownCounter | 1 | kafka.connect.task.header.converter.version.state | The version instantiated for header.converter. May be undefined. | -| kafka.connect.task.key.converter.class | UpDownCounter | 1 | kafka.connect.task.key.converter.class.state | The fully qualified class name from key.converter. | -| kafka.connect.task.key.converter.version | UpDownCounter | 1 | kafka.connect.task.key.converter.version.state | The version instantiated for key.converter. May be undefined. | | kafka.connect.task.offset.commit.avg.time | Gauge | s | | The average time in milliseconds taken by this task to commit offsets. | | kafka.connect.task.offset.commit.failure.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that failed. | | kafka.connect.task.offset.commit.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to commit offsets. | @@ -101,19 +80,6 @@ Attributes include `kafka.connect.connector`, `kafka.connect.task.id`, connector | kafka.connect.task.pause.ratio | Gauge | 1 | | The fraction of time this task has spent in the pause state. | | kafka.connect.task.running.ratio | Gauge | 1 | | The fraction of time this task has spent in the running state. | | kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task; supports running, paused, failed, restarting, unassigned, destroyed, or unknown. | -| kafka.connect.task.class | UpDownCounter | 1 | kafka.connect.task.class.state | The class name of the task. | -| kafka.connect.task.version | UpDownCounter | 1 | kafka.connect.task.version.state | The version of the task. | -| kafka.connect.task.value.converter.class | UpDownCounter | 1 | kafka.connect.task.value.converter.class.state | The fully qualified class name from value.converter. | -| kafka.connect.task.value.converter.version | UpDownCounter | 1 | kafka.connect.task.value.converter.version.state | The version instantiated for value.converter. May be undefined. | - -## Transform metrics - -Attributes: `kafka.connect.connector`, `kafka.connect.task.id`, `kafka.connect.transform`, `kafka.connect.transform.class`, `kafka.connect.transform.version`. - -| Metric Name | Type | Unit | Attributes | Description | -|---------------------------------|---------------|------|---------------------------------------|---------------------------------------------| -| kafka.connect.transform.class | UpDownCounter | 1 | kafka.connect.transform.class.state | The class name of the transformation class. | -| kafka.connect.transform.version | UpDownCounter | 1 | kafka.connect.transform.version.state | The version of the transformation class. | ## Sink task metrics diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml index 67d7913025f0..fa7941e4a7ea 100644 --- a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -20,8 +20,7 @@ rules: connector-startup-failure-percentage: metric: connector.startup.failure.percentage type: gauge - sourceUnit: "%" - unit: "1" + unit: '1' desc: The average percentage of this worker's connectors starts that failed. # kafka.connect.worker.connector.startup.failure.total connector-startup-failure-total: @@ -33,8 +32,7 @@ rules: connector-startup-success-percentage: metric: connector.startup.success.percentage type: gauge - sourceUnit: "%" - unit: "1" + unit: '1' desc: The average percentage of this worker's connectors starts that succeeded. # kafka.connect.worker.connector.startup.success.total connector-startup-success-total: @@ -58,8 +56,7 @@ rules: task-startup-failure-percentage: metric: task.startup.failure.percentage type: gauge - sourceUnit: "%" - unit: "1" + unit: '1' desc: The average percentage of this worker's tasks starts that failed. # kafka.connect.worker.task.startup.failure.total task-startup-failure-total: @@ -71,8 +68,7 @@ rules: task-startup-success-percentage: metric: task.startup.success.percentage type: gauge - sourceUnit: "%" - unit: "1" + unit: '1' desc: The average percentage of this worker's tasks starts that succeeded. # kafka.connect.worker.task.startup.success.total task-startup-success-total: @@ -119,8 +115,6 @@ rules: - bean: kafka.connect:type=connect-worker-rebalance-metrics prefix: kafka.connect.worker.rebalance. - metricAttribute: - kafka.connect.worker.leader: beanattr(leader-name) mapping: # kafka.connect.worker.rebalance.completed.total completed-rebalances-total: @@ -145,32 +139,23 @@ rules: unit: "{epoch}" desc: The epoch or generation number of this worker. # kafka.connect.worker.rebalance.leader - leader-name: - metric: leader - type: updowncounter - desc: The name of the group leader. - metricAttribute: - kafka.connect.worker.leader.state: - leader: "*" # kafka.connect.worker.rebalance.avg.time rebalance-avg-time-ms: metric: avg.time type: gauge - sourceUnit: ms unit: s desc: The average time in milliseconds spent by this worker to rebalance. # kafka.connect.worker.rebalance.max.time rebalance-max-time-ms: metric: max.time type: gauge - sourceUnit: ms unit: s desc: The maximum time in milliseconds spent by this worker to rebalance. # kafka.connect.worker.rebalance.active rebalancing: metric: active type: updowncounter - unit: "1" + unit: '1' desc: Whether this worker is currently rebalancing. metricAttribute: kafka.connect.worker.rebalance.state: @@ -181,7 +166,6 @@ rules: time-since-last-rebalance-ms: metric: since_last type: gauge - sourceUnit: ms unit: s desc: The time in milliseconds since this worker completed the most recent rebalance. @@ -189,18 +173,7 @@ rules: prefix: kafka.connect.connector. metricAttribute: kafka.connect.connector: param(connector) - kafka.connect.connector.class: beanattr(connector-class) - kafka.connect.connector.version: beanattr(connector-version) - kafka.connect.connector.type.raw: lowercase(beanattr(connector-type)) mapping: - # kafka.connect.connector.class - connector-class: - metric: class - type: updowncounter - desc: The name of the connector class. - metricAttribute: - kafka.connect.connector.class.state: - configured: "*" # kafka.connect.connector.type connector-type: metric: type @@ -211,14 +184,6 @@ rules: sink: [sink, SINK] source: [source, SOURCE] unknown: "*" - # kafka.connect.connector.version - connector-version: - metric: version - type: updowncounter - desc: The version of the connector class, as reported by the connector. - metricAttribute: - kafka.connect.connector.version.state: - reported: "*" # kafka.connect.connector.status status: metric: status @@ -235,48 +200,11 @@ rules: stopped: [stopped, STOPPED] unknown: "*" - - bean: kafka.connect:type=connector-predicate-metrics,connector=*,task=*,predicate=* - prefix: kafka.connect.predicate. - metricAttribute: - kafka.connect.connector: param(connector) - kafka.connect.task.id: param(task) - kafka.connect.predicate: param(predicate) - kafka.connect.predicate.class: beanattr(predicate-class) - kafka.connect.predicate.version: beanattr(predicate-version) - mapping: - # kafka.connect.predicate.class - predicate-class: - metric: class - type: updowncounter - desc: The class name of the predicate class - metricAttribute: - kafka.connect.predicate.class.state: - configured: "*" - # kafka.connect.predicate.version - predicate-version: - metric: version - type: updowncounter - desc: The version of the predicate class - metricAttribute: - kafka.connect.predicate.version.state: - reported: "*" - - bean: kafka.connect:type=connector-task-metrics,connector=*,task=* prefix: kafka.connect.task. metricAttribute: kafka.connect.connector: param(connector) kafka.connect.task.id: param(task) - kafka.connect.connector.class: beanattr(connector-class) - kafka.connect.connector.type.raw: lowercase(beanattr(connector-type)) - kafka.connect.connector.version: beanattr(connector-version) - kafka.connect.converter.header.class: beanattr(header-converter-class) - kafka.connect.converter.header.version: beanattr(header-converter-version) - kafka.connect.converter.key.class: beanattr(key-converter-class) - kafka.connect.converter.key.version: beanattr(key-converter-version) - kafka.connect.converter.value.class: beanattr(value-converter-class) - kafka.connect.converter.value.version: beanattr(value-converter-version) - kafka.connect.task.class: beanattr(task-class) - kafka.connect.task.version: beanattr(task-version) mapping: # kafka.connect.task.batch.size.avg batch-size-avg: @@ -291,14 +219,6 @@ rules: unit: "{record}" desc: The number of records in the largest batch the task has processed so far. # kafka.connect.task.connector.class - connector-class: - metric: connector.class - type: updowncounter - desc: The name of the connector class. - metricAttribute: - kafka.connect.task.connector.class.state: - configured: "*" - # kafka.connect.task.connector.type connector-type: metric: connector.type type: updowncounter @@ -308,85 +228,41 @@ rules: sink: [sink, SINK] source: [source, SOURCE] unknown: "*" - # kafka.connect.task.connector.version - connector-version: - metric: connector.version - type: updowncounter - desc: The version of the connector class, as reported by the connector. - metricAttribute: - kafka.connect.task.connector.version.state: - reported: "*" - # kafka.connect.task.header.converter.class - header-converter-class: - metric: header.converter.class - type: updowncounter - desc: The fully qualified class name from header.converter - metricAttribute: - kafka.connect.task.header.converter.class.state: - configured: "*" - # kafka.connect.task.header.converter.version - header-converter-version: - metric: header.converter.version - type: updowncounter - desc: The version instantiated for header.converter. May be undefined - metricAttribute: - kafka.connect.task.header.converter.version.state: - reported: "*" - # kafka.connect.task.key.converter.class - key-converter-class: - metric: key.converter.class - type: updowncounter - desc: The fully qualified class name from key.converter - metricAttribute: - kafka.connect.task.key.converter.class.state: - configured: "*" - # kafka.connect.task.key.converter.version - key-converter-version: - metric: key.converter.version - type: updowncounter - desc: The version instantiated for key.converter. May be undefined - metricAttribute: - kafka.connect.task.key.converter.version.state: - reported: "*" # kafka.connect.task.offset.commit.avg.time offset-commit-avg-time-ms: metric: offset.commit.avg.time type: gauge - sourceUnit: ms unit: s desc: The average time in milliseconds taken by this task to commit offsets. # kafka.connect.task.offset.commit.failure.percentage offset-commit-failure-percentage: metric: offset.commit.failure.percentage type: gauge - sourceUnit: "%" - unit: "1" + unit: '1' desc: The average percentage of this task's offset commit attempts that failed. # kafka.connect.task.offset.commit.max.time offset-commit-max-time-ms: metric: offset.commit.max.time type: gauge - sourceUnit: ms unit: s desc: The maximum time in milliseconds taken by this task to commit offsets. # kafka.connect.task.offset.commit.success.percentage offset-commit-success-percentage: metric: offset.commit.success.percentage type: gauge - sourceUnit: "%" - unit: "1" + unit: '1' desc: The average percentage of this task's offset commit attempts that succeeded. # kafka.connect.task.pause.ratio pause-ratio: metric: pause.ratio type: gauge - unit: "1" + unit: '1' desc: The fraction of time this task has spent in the pause state. # kafka.connect.task.running.ratio running-ratio: metric: running.ratio type: gauge - unit: "1" + unit: '1' desc: The fraction of time this task has spent in the running state. # kafka.connect.task.status status: @@ -403,63 +279,6 @@ rules: destroyed: [destroyed, DESTROYED] unknown: "*" # kafka.connect.task.class - task-class: - metric: task.class - type: updowncounter - desc: The class name of the task. - metricAttribute: - kafka.connect.task.class.state: - configured: "*" - # kafka.connect.task.version - task-version: - metric: task.version - type: updowncounter - desc: The version of the task. - metricAttribute: - kafka.connect.task.version.state: - reported: "*" - # kafka.connect.task.value.converter.class - value-converter-class: - metric: value.converter.class - type: updowncounter - desc: The fully qualified class name from value.converter - metricAttribute: - kafka.connect.task.value.converter.class.state: - configured: "*" - # kafka.connect.task.value.converter.version - value-converter-version: - metric: value.converter.version - type: updowncounter - desc: The version instantiated for value.converter. May be undefined - metricAttribute: - kafka.connect.task.value.converter.version.state: - reported: "*" - - - bean: kafka.connect:type=connector-transform-metrics,connector=*,task=*,transform=* - prefix: kafka.connect.transform. - metricAttribute: - kafka.connect.connector: param(connector) - kafka.connect.task.id: param(task) - kafka.connect.transform: param(transform) - kafka.connect.transform.class: beanattr(transform-class) - kafka.connect.transform.version: beanattr(transform-version) - mapping: - # kafka.connect.transform.class - transform-class: - metric: class - type: updowncounter - desc: The class name of the transformation class - metricAttribute: - kafka.connect.transform.class.state: - configured: "*" - # kafka.connect.transform.version - transform-version: - metric: version - type: updowncounter - desc: The version of the transformation class - metricAttribute: - kafka.connect.transform.version.state: - reported: "*" - bean: kafka.connect:type=sink-task-metrics,connector=*,task=* prefix: kafka.connect.sink. @@ -507,14 +326,12 @@ rules: put-batch-avg-time-ms: metric: put.batch.avg.time type: gauge - sourceUnit: ms unit: s desc: The average time taken by this task to put a batch of sinks records. # kafka.connect.sink.put.batch.max.time put-batch-max-time-ms: metric: put.batch.max.time type: gauge - sourceUnit: ms unit: s desc: The maximum time taken by this task to put a batch of sinks records. # kafka.connect.sink.record.active.count @@ -576,14 +393,12 @@ rules: poll-batch-avg-time-ms: metric: poll.batch.avg.time type: gauge - sourceUnit: ms unit: s desc: The average time in milliseconds taken by this task to poll for a batch of source records. # kafka.connect.source.poll.batch.max.time poll-batch-max-time-ms: metric: poll.batch.max.time type: gauge - sourceUnit: ms unit: s desc: The maximum time in milliseconds taken by this task to poll for a batch of source records. # kafka.connect.source.record.active.count @@ -669,7 +484,6 @@ rules: last-error-timestamp: metric: last.error.timestamp type: gauge - sourceUnit: ms unit: s dropNegativeValues: true desc: The epoch timestamp when this task last encountered an error. diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java index f9eb89774d7b..bd136c9fc5ad 100644 --- a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -135,21 +135,22 @@ void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Excep registerMBean( "kafka.connect:type=connector-metrics,connector=confluent-connector", mapOf( - "connector-class", "io.test.MyConnector", "connector-type", "source", - "connector-version", "1.2.3", "status", "RUNNING")); registerMBean( "kafka.connect:type=connector-task-metrics,connector=confluent-connector,task=0", mapOf( "status", "DESTROYED", - "connector-class", "io.test.MyConnector", "connector-type", "sink", - "connector-version", "9.9", - "task-class", "io.test.Task", - "task-version", "1.0", - "offset-commit-avg-time-ms", 5L)); + "batch-size-avg", 1L, + "batch-size-max", 2L, + "offset-commit-avg-time-ms", 5L, + "offset-commit-failure-percentage", 0.0d, + "offset-commit-max-time-ms", 6L, + "offset-commit-success-percentage", 100.0d, + "pause-ratio", 0.0d, + "running-ratio", 1.0d)); startKafkaConnectTelemetry(); @@ -160,9 +161,6 @@ void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Excep Attributes.builder() .put("kafka.connect.connector", "confluent-connector") .put("kafka.connect.connector.state", "running") - .put("kafka.connect.connector.class", "io.test.MyConnector") - .put("kafka.connect.connector.type.raw", "source") - .put("kafka.connect.connector.version", "1.2.3") .build(); assertLongSum("kafka.connect.connector.status", connectorStatusAttributes, 1); @@ -170,11 +168,6 @@ void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Excep Attributes.builder() .put("kafka.connect.connector", "confluent-connector") .put("kafka.connect.task.id", "0") - .put("kafka.connect.connector.class", "io.test.MyConnector") - .put("kafka.connect.connector.type.raw", "sink") - .put("kafka.connect.connector.version", "9.9") - .put("kafka.connect.task.class", "io.test.Task") - .put("kafka.connect.task.version", "1.0") .put("kafka.connect.task.state", "destroyed") .build(); assertLongSum("kafka.connect.task.status", taskStatusAttributes, 1); @@ -191,28 +184,19 @@ void apacheSpecificMetricsAreReportedWhenPresent() throws Exception { "connector-running-task-count", 2L, "connector-unassigned-task-count", 0L)); - registerMBean( - "kafka.connect:type=connector-predicate-metrics,connector=apache-connector,task=1,predicate=sample", - mapOf("predicate-class", "io.test.Predicate", "predicate-version", "2.1.0")); - - registerMBean( - "kafka.connect:type=connector-transform-metrics,connector=apache-connector,task=1,transform=mask", - mapOf("transform-class", "io.test.Transform", "transform-version", "0.9.0")); - registerMBean( "kafka.connect:type=connector-task-metrics,connector=apache-connector,task=1", mapOf( - "connector-class", "io.test.ApacheConnector", "connector-type", "source", - "connector-version", "3.0", - "task-class", "io.test.Task", - "task-version", "3.1", - "header-converter-class", "io.test.HeaderConverter", - "header-converter-version", "1.0", - "key-converter-class", "io.test.KeyConverter", - "key-converter-version", "1.1", - "value-converter-class", "io.test.ValueConverter", - "value-converter-version", "1.2")); + "status", "RUNNING", + "batch-size-avg", 4L, + "batch-size-max", 5L, + "offset-commit-avg-time-ms", 6L, + "offset-commit-failure-percentage", 0.0d, + "offset-commit-max-time-ms", 7L, + "offset-commit-success-percentage", 100.0d, + "pause-ratio", 0.0d, + "running-ratio", 1.0d)); registerMBean( "kafka.connect:type=source-task-metrics,connector=apache-connector,task=1", @@ -240,50 +224,6 @@ void apacheSpecificMetricsAreReportedWhenPresent() throws Exception { "apache-connector"); assertLongSum("kafka.connect.worker.connector.task.running", connectorTaskAttributes, 2); - assertLongSum( - "kafka.connect.predicate.class", - Attributes.builder() - .put("kafka.connect.connector", "apache-connector") - .put("kafka.connect.task.id", "1") - .put("kafka.connect.predicate", "sample") - .put("kafka.connect.predicate.class", "io.test.Predicate") - .put("kafka.connect.predicate.class.state", "configured") - .put("kafka.connect.predicate.version", "2.1.0") - .build(), - 1); - - assertLongSum( - "kafka.connect.transform.class", - Attributes.builder() - .put("kafka.connect.connector", "apache-connector") - .put("kafka.connect.task.id", "1") - .put("kafka.connect.transform", "mask") - .put("kafka.connect.transform.class", "io.test.Transform") - .put("kafka.connect.transform.class.state", "configured") - .put("kafka.connect.transform.version", "0.9.0") - .build(), - 1); - - Attributes connectorTaskMetaAttributes = - Attributes.builder() - .put("kafka.connect.connector", "apache-connector") - .put("kafka.connect.task.id", "1") - .put("kafka.connect.connector.class", "io.test.ApacheConnector") - .put("kafka.connect.connector.type.raw", "source") - .put("kafka.connect.connector.version", "3.0") - .put("kafka.connect.task.class", "io.test.Task") - .put("kafka.connect.task.version", "3.1") - .build(); - assertLongSum( - "kafka.connect.task.header.converter.class", - Attributes.builder() - .putAll(connectorTaskMetaAttributes) - .put("kafka.connect.converter.header.class", "io.test.HeaderConverter") - .put("kafka.connect.task.header.converter.class.state", "configured") - .put("kafka.connect.converter.header.version", "1.0") - .build(), - 1); - assertLongGauge( "kafka.connect.source.transaction.size.max", Attributes.builder() From 6d3af5faf6497b48c6442a1d41fe4548d0f63397 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Mon, 8 Dec 2025 11:49:29 -0500 Subject: [PATCH 11/13] apply spotless --- .../jmx/rules/KafkaConnectRuleTest.java | 60 ++++++++++++------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java index bd136c9fc5ad..28c39794af03 100644 --- a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -141,16 +141,26 @@ void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Excep registerMBean( "kafka.connect:type=connector-task-metrics,connector=confluent-connector,task=0", mapOf( - "status", "DESTROYED", - "connector-type", "sink", - "batch-size-avg", 1L, - "batch-size-max", 2L, - "offset-commit-avg-time-ms", 5L, - "offset-commit-failure-percentage", 0.0d, - "offset-commit-max-time-ms", 6L, - "offset-commit-success-percentage", 100.0d, - "pause-ratio", 0.0d, - "running-ratio", 1.0d)); + "status", + "DESTROYED", + "connector-type", + "sink", + "batch-size-avg", + 1L, + "batch-size-max", + 2L, + "offset-commit-avg-time-ms", + 5L, + "offset-commit-failure-percentage", + 0.0d, + "offset-commit-max-time-ms", + 6L, + "offset-commit-success-percentage", + 100.0d, + "pause-ratio", + 0.0d, + "running-ratio", + 1.0d)); startKafkaConnectTelemetry(); @@ -187,16 +197,26 @@ void apacheSpecificMetricsAreReportedWhenPresent() throws Exception { registerMBean( "kafka.connect:type=connector-task-metrics,connector=apache-connector,task=1", mapOf( - "connector-type", "source", - "status", "RUNNING", - "batch-size-avg", 4L, - "batch-size-max", 5L, - "offset-commit-avg-time-ms", 6L, - "offset-commit-failure-percentage", 0.0d, - "offset-commit-max-time-ms", 7L, - "offset-commit-success-percentage", 100.0d, - "pause-ratio", 0.0d, - "running-ratio", 1.0d)); + "connector-type", + "source", + "status", + "RUNNING", + "batch-size-avg", + 4L, + "batch-size-max", + 5L, + "offset-commit-avg-time-ms", + 6L, + "offset-commit-failure-percentage", + 0.0d, + "offset-commit-max-time-ms", + 7L, + "offset-commit-success-percentage", + 100.0d, + "pause-ratio", + 0.0d, + "running-ratio", + 1.0d)); registerMBean( "kafka.connect:type=source-task-metrics,connector=apache-connector,task=1", From a2fca88efb852888597dc3d66aac70743824e61b Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Mon, 8 Dec 2025 16:53:54 -0500 Subject: [PATCH 12/13] remove worker startup percentages --- .../resources/jmx/rules/kafka-connect.yaml | 32 +++---------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml index fa7941e4a7ea..d0c50dc46121 100644 --- a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -16,27 +16,15 @@ rules: type: counter unit: "{attempt}" desc: The total number of connector startups that this worker has attempted. - # kafka.connect.worker.connector.startup.failure.percentage - connector-startup-failure-percentage: - metric: connector.startup.failure.percentage - type: gauge - unit: '1' - desc: The average percentage of this worker's connectors starts that failed. # kafka.connect.worker.connector.startup.failure.total connector-startup-failure-total: - metric: connector.startup.failure.total + metric: connector.startup.failures type: counter unit: "{startup}" desc: The total number of connector starts that failed. - # kafka.connect.worker.connector.startup.success.percentage - connector-startup-success-percentage: - metric: connector.startup.success.percentage - type: gauge - unit: '1' - desc: The average percentage of this worker's connectors starts that succeeded. # kafka.connect.worker.connector.startup.success.total connector-startup-success-total: - metric: connector.startup.success.total + metric: connector.startup.successes type: counter unit: "{startup}" desc: The total number of connector starts that succeeded. @@ -52,27 +40,15 @@ rules: type: counter unit: "{attempt}" desc: The total number of task startups that this worker has attempted. - # kafka.connect.worker.task.startup.failure.percentage - task-startup-failure-percentage: - metric: task.startup.failure.percentage - type: gauge - unit: '1' - desc: The average percentage of this worker's tasks starts that failed. # kafka.connect.worker.task.startup.failure.total task-startup-failure-total: - metric: task.startup.failure.total + metric: task.startup.failures type: counter unit: "{startup}" desc: The total number of task starts that failed. - # kafka.connect.worker.task.startup.success.percentage - task-startup-success-percentage: - metric: task.startup.success.percentage - type: gauge - unit: '1' - desc: The average percentage of this worker's tasks starts that succeeded. # kafka.connect.worker.task.startup.success.total task-startup-success-total: - metric: task.startup.success.total + metric: task.startup.successes type: counter unit: "{startup}" desc: The total number of task starts that succeeded. From c08109fcfecceb8ab8876117c5216d444c5a2c28 Mon Sep 17 00:00:00 2001 From: Aaron Augustine Date: Mon, 8 Dec 2025 16:59:06 -0500 Subject: [PATCH 13/13] remove unnecessary metrics --- .../resources/jmx/rules/kafka-connect.yaml | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml index d0c50dc46121..0c758d57aa8f 100644 --- a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -150,16 +150,6 @@ rules: metricAttribute: kafka.connect.connector: param(connector) mapping: - # kafka.connect.connector.type - connector-type: - metric: type - type: updowncounter - desc: The type of the connector. One of 'source' or 'sink'. - metricAttribute: - kafka.connect.connector.type: - sink: [sink, SINK] - source: [source, SOURCE] - unknown: "*" # kafka.connect.connector.status status: metric: status @@ -194,16 +184,6 @@ rules: type: gauge unit: "{record}" desc: The number of records in the largest batch the task has processed so far. - # kafka.connect.task.connector.class - connector-type: - metric: connector.type - type: updowncounter - desc: The type of the connector. One of 'source' or 'sink'. - metricAttribute: - kafka.connect.task.connector.type: - sink: [sink, SINK] - source: [source, SOURCE] - unknown: "*" # kafka.connect.task.offset.commit.avg.time offset-commit-avg-time-ms: metric: offset.commit.avg.time