diff --git a/instrumentation/jmx-metrics/library/kafka-connect.md b/instrumentation/jmx-metrics/library/kafka-connect.md new file mode 100644 index 000000000000..4440ee99df04 --- /dev/null +++ b/instrumentation/jmx-metrics/library/kafka-connect.md @@ -0,0 +1,139 @@ +# Kafka Connect Metrics + +Here is the list of metrics based on MBeans exposed by Apache Kafka Connect. String-valued JMX +attributes are exported as `UpDownCounter` metrics with value `1` and only include connector/task +identifiers alongside any state-mapping attributes. + +## Compatibility + +This rule set targets both Apache Kafka Connect and Confluent Platform. Apache documents several +metrics not surfaced in Confluent docs (worker rebalance protocol, per-connector task counts on +workers, source transaction size stats, and sink record lag max); all of them are included below. Status metrics use +the superset of values across both variants (connector: running, paused, stopped, failed, +restarting, unassigned, degraded; task: running, paused, failed, restarting, unassigned, +destroyed) and fall back to `unknown` for any new values. Differences in bean placeholder +formatting between the docs are cosmetic; bean names align across both variants. + +## Worker metrics + +| Metric Name | Type | Unit | Attributes | Description | +|-----------------------------------------------------------|---------------|-------------|------------|---------------------------------------------------------------------------| +| kafka.connect.worker.connector.count | UpDownCounter | {connector} | | The number of connectors run in this worker. | +| kafka.connect.worker.connector.startup.attempts | Counter | {attempt} | | The total number of connector startups that this worker has attempted. | +| kafka.connect.worker.connector.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that failed. | +| kafka.connect.worker.connector.startup.failure.total | Counter | {startup} | | The total number of connector starts that failed. | +| kafka.connect.worker.connector.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's connectors starts that succeeded. | +| kafka.connect.worker.connector.startup.success.total | Counter | {startup} | | The total number of connector starts that succeeded. | +| kafka.connect.worker.task.count | UpDownCounter | {task} | | The number of tasks run in this worker. | +| kafka.connect.worker.task.startup.attempts | Counter | {attempt} | | The total number of task startups that this worker has attempted. | +| kafka.connect.worker.task.startup.failure.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that failed. | +| kafka.connect.worker.task.startup.failure.total | Counter | {startup} | | The total number of task starts that failed. | +| kafka.connect.worker.task.startup.success.percentage | Gauge | 1 | | The average percentage of this worker's tasks starts that succeeded. | +| kafka.connect.worker.task.startup.success.total | Counter | {startup} | | The total number of task starts that succeeded. | + +## Worker connector task metrics + +| Metric Name | Type | Unit | Attributes | Description | +|------------------------------------------------|---------------|--------|-------------------------|----------------------------------------------------------------| +| kafka.connect.worker.connector.task.destroyed | UpDownCounter | {task} | kafka.connect.connector | The number of destroyed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.failed | UpDownCounter | {task} | kafka.connect.connector | The number of failed tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.paused | UpDownCounter | {task} | kafka.connect.connector | The number of paused tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.restarting | UpDownCounter | {task} | kafka.connect.connector | The number of restarting tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.running | UpDownCounter | {task} | kafka.connect.connector | The number of running tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.total | UpDownCounter | {task} | kafka.connect.connector | The number of tasks of the connector on the worker. | +| kafka.connect.worker.connector.task.unassigned | UpDownCounter | {task} | kafka.connect.connector | The number of unassigned tasks of the connector on the worker. | + +## Worker rebalance metrics + +| Metric Name | Type | Unit | Attributes | Description | +|------------------------------------------------|---------------|-------------|--------------------------------------|---------------------------------------------------------------------------------| +| kafka.connect.worker.rebalance.completed.total | Counter | {rebalance} | | The total number of rebalances completed by this worker. | +| kafka.connect.worker.rebalance.protocol | UpDownCounter | 1 | kafka.connect.protocol.state | The Connect protocol used by this cluster. | +| kafka.connect.worker.rebalance.epoch | UpDownCounter | {epoch} | | The epoch or generation number of this worker. | +| kafka.connect.worker.rebalance.avg.time | Gauge | s | | The average time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.max.time | Gauge | s | | The maximum time in milliseconds spent by this worker to rebalance. | +| kafka.connect.worker.rebalance.active | UpDownCounter | 1 | kafka.connect.worker.rebalance.state | Whether this worker is currently rebalancing. | +| kafka.connect.worker.rebalance.since_last | Gauge | s | | The time in milliseconds since this worker completed the most recent rebalance. | + +## Connector metrics + +Attributes: `kafka.connect.connector` and the state attribute shown. + +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------|---------------|------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| kafka.connect.connector.type | UpDownCounter | 1 | kafka.connect.connector.type | The type of the connector. One of 'source' or 'sink'. | +| kafka.connect.connector.status | UpDownCounter | 1 | kafka.connect.connector.state | Connector lifecycle state indicator (1 when the state matches the attribute value); accepts running, paused, stopped, failed, restarting, unassigned, degraded, or unknown. | + +## Connector task metrics + +All metrics include `kafka.connect.connector` and `kafka.connect.task.id`. Attributes column lists any additional state attributes. + +| Metric Name | Type | Unit | Attributes | Description | +|-----------------------------------------------------|---------------|----------|---------------------------------------------------|--------------------------------------------------------------------------------------------------------------------| +| kafka.connect.task.batch.size.avg | Gauge | {record} | | The average number of records in the batches the task has processed so far. | +| kafka.connect.task.batch.size.max | Gauge | {record} | | The number of records in the largest batch the task has processed so far. | +| kafka.connect.task.connector.type | UpDownCounter | 1 | kafka.connect.task.connector.type | The type of the connector. One of 'source' or 'sink'. | +| kafka.connect.task.offset.commit.avg.time | Gauge | s | | The average time in milliseconds taken by this task to commit offsets. | +| kafka.connect.task.offset.commit.failure.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that failed. | +| kafka.connect.task.offset.commit.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to commit offsets. | +| kafka.connect.task.offset.commit.success.percentage | Gauge | 1 | | The average percentage of this task's offset commit attempts that succeeded. | +| kafka.connect.task.pause.ratio | Gauge | 1 | | The fraction of time this task has spent in the pause state. | +| kafka.connect.task.running.ratio | Gauge | 1 | | The fraction of time this task has spent in the running state. | +| kafka.connect.task.status | UpDownCounter | 1 | kafka.connect.task.state | The status of the connector task; supports running, paused, failed, restarting, unassigned, destroyed, or unknown. | + +## Sink task metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. + +| Metric Name | Type | Unit | Attributes | Description | +|---------------------------------------------------|---------------|-------------|------------|--------------------------------------------------------------------------------------------------------------------------------------| +| kafka.connect.sink.offset.commit.completion.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were completed successfully. | +| kafka.connect.sink.offset.commit.completion.total | Counter | {commit} | | The total number of offset commit completions that were completed successfully. | +| kafka.connect.sink.offset.commit.seq | UpDownCounter | {sequence} | | The current sequence number for offset commits. | +| kafka.connect.sink.offset.commit.skip.rate | Gauge | {commit}/s | | The average per-second number of offset commit completions that were received too late and skipped/ignored. | +| kafka.connect.sink.offset.commit.skip.total | Counter | {commit} | | The total number of offset commit completions that were received too late and skipped/ignored. | +| kafka.connect.sink.partition.count | UpDownCounter | {partition} | | The number of topic partitions assigned to this task. | +| kafka.connect.sink.put.batch.avg.time | Gauge | s | | The average time taken by this task to put a batch of sinks records. | +| kafka.connect.sink.put.batch.max.time | Gauge | s | | The maximum time taken by this task to put a batch of sinks records. | +| kafka.connect.sink.record.active.count | UpDownCounter | {record} | | The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.active.count.avg | Gauge | {record} | | The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.active.count.max | Gauge | {record} | | The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. | +| kafka.connect.sink.record.lag.max | Gauge | {record} | | The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions. | +| kafka.connect.sink.record.read.rate | Gauge | {record}/s | | The average per-second number of records read from Kafka for this task before transformations are applied. | +| kafka.connect.sink.record.read.total | Counter | {record} | | The total number of records read from Kafka by this task since it was last restarted. | +| kafka.connect.sink.record.send.rate | Gauge | {record}/s | | The average per-second number of records output from the transformations and sent/put to this task. | +| kafka.connect.sink.record.send.total | Counter | {record} | | The total number of records output from the transformations and sent/put to this task since it was last restarted. | + +## Source task metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. + +| Metric Name | Type | Unit | Attributes | Description | +|----------------------------------------------|---------------|------------|------------|-------------------------------------------------------------------------------------------------------------| +| kafka.connect.source.poll.batch.avg.time | Gauge | s | | The average time in milliseconds taken by this task to poll for a batch of source records. | +| kafka.connect.source.poll.batch.max.time | Gauge | s | | The maximum time in milliseconds taken by this task to poll for a batch of source records. | +| kafka.connect.source.record.active.count | UpDownCounter | {record} | | The number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.active.count.avg | Gauge | {record} | | The average number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.active.count.max | Gauge | {record} | | The maximum number of records that have been produced by this task but not yet completely written to Kafka. | +| kafka.connect.source.record.poll.rate | Gauge | {record}/s | | The average per-second number of records produced/polled (before transformation) by this task. | +| kafka.connect.source.record.poll.total | Counter | {record} | | The total number of records produced/polled (before transformation) by this task. | +| kafka.connect.source.record.write.rate | Gauge | {record}/s | | The average per-second number of records written to Kafka for this task. | +| kafka.connect.source.record.write.total | Counter | {record} | | The number of records output written to Kafka for this task. | +| kafka.connect.source.transaction.size.avg | Gauge | {record} | | The average number of records in the transactions the task has committed so far. | +| kafka.connect.source.transaction.size.max | Gauge | {record} | | The number of records in the largest transaction the task has committed so far. | +| kafka.connect.source.transaction.size.min | Gauge | {record} | | The number of records in the smallest transaction the task has committed so far. | + +## Task error metrics + +Attributes: `kafka.connect.connector`, `kafka.connect.task.id`. + +| Metric Name | Type | Unit | Attributes | Description | +|-----------------------------------------------------------|---------|-----------|------------|---------------------------------------------------------------| +| kafka.connect.task.error.deadletterqueue.produce.failures | Counter | {failure} | | The number of failed writes to the dead letter queue. | +| kafka.connect.task.error.deadletterqueue.produce.requests | Counter | {request} | | The number of attempted writes to the dead letter queue. | +| kafka.connect.task.error.last.error.timestamp | Gauge | s | | The epoch timestamp when this task last encountered an error. | +| kafka.connect.task.error.total.errors.logged | Counter | {error} | | The number of errors that were logged. | +| kafka.connect.task.error.total.record.errors | Counter | {record} | | The number of record processing errors in this task. | +| kafka.connect.task.error.total.record.failures | Counter | {record} | | The number of record processing failures in this task. | +| kafka.connect.task.error.total.records.skipped | Counter | {record} | | The number of records skipped due to errors. | +| kafka.connect.task.error.total.retries | Counter | {retry} | | The number of operations retried. | diff --git a/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml new file mode 100644 index 000000000000..0c758d57aa8f --- /dev/null +++ b/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/kafka-connect.yaml @@ -0,0 +1,475 @@ +--- +rules: + + - bean: kafka.connect:type=connect-worker-metrics + prefix: kafka.connect.worker. + mapping: + # kafka.connect.worker.connector.count + connector-count: + metric: connector.count + type: updowncounter + unit: "{connector}" + desc: The number of connectors run in this worker. + # kafka.connect.worker.connector.startup.attempts + connector-startup-attempts-total: + metric: connector.startup.attempts + type: counter + unit: "{attempt}" + desc: The total number of connector startups that this worker has attempted. + # kafka.connect.worker.connector.startup.failure.total + connector-startup-failure-total: + metric: connector.startup.failures + type: counter + unit: "{startup}" + desc: The total number of connector starts that failed. + # kafka.connect.worker.connector.startup.success.total + connector-startup-success-total: + metric: connector.startup.successes + type: counter + unit: "{startup}" + desc: The total number of connector starts that succeeded. + # kafka.connect.worker.task.count + task-count: + metric: task.count + type: updowncounter + unit: "{task}" + desc: The number of tasks run in this worker. + # kafka.connect.worker.task.startup.attempts + task-startup-attempts-total: + metric: task.startup.attempts + type: counter + unit: "{attempt}" + desc: The total number of task startups that this worker has attempted. + # kafka.connect.worker.task.startup.failure.total + task-startup-failure-total: + metric: task.startup.failures + type: counter + unit: "{startup}" + desc: The total number of task starts that failed. + # kafka.connect.worker.task.startup.success.total + task-startup-success-total: + metric: task.startup.successes + type: counter + unit: "{startup}" + desc: The total number of task starts that succeeded. + + - bean: kafka.connect:type=connect-worker-metrics,connector=* + prefix: kafka.connect.worker.connector.task. + metricAttribute: + kafka.connect.connector: param(connector) + type: updowncounter + unit: "{task}" + mapping: + # kafka.connect.worker.connector.task.destroyed + connector-destroyed-task-count: + metric: destroyed + desc: The number of destroyed tasks of the connector on the worker. + # kafka.connect.worker.connector.task.failed + connector-failed-task-count: + metric: failed + desc: The number of failed tasks of the connector on the worker. + # kafka.connect.worker.connector.task.paused + connector-paused-task-count: + metric: paused + desc: The number of paused tasks of the connector on the worker. + # kafka.connect.worker.connector.task.restarting + connector-restarting-task-count: + metric: restarting + desc: The number of restarting tasks of the connector on the worker. + # kafka.connect.worker.connector.task.running + connector-running-task-count: + metric: running + desc: The number of running tasks of the connector on the worker. + # kafka.connect.worker.connector.task.total + connector-total-task-count: + metric: total + desc: The number of tasks of the connector on the worker. + # kafka.connect.worker.connector.task.unassigned + connector-unassigned-task-count: + metric: unassigned + desc: The number of unassigned tasks of the connector on the worker. + + - bean: kafka.connect:type=connect-worker-rebalance-metrics + prefix: kafka.connect.worker.rebalance. + mapping: + # kafka.connect.worker.rebalance.completed.total + completed-rebalances-total: + metric: completed.total + type: counter + unit: "{rebalance}" + desc: The total number of rebalances completed by this worker. + # kafka.connect.worker.rebalance.protocol + connect-protocol: + metric: protocol + type: updowncounter + desc: The Connect protocol used by this cluster. + metricAttribute: + kafka.connect.protocol.state: + eager: [eager, EAGER] + cooperative: [compatible, COMPATIBLE, cooperative, COOPERATIVE] + unknown: "*" + # kafka.connect.worker.rebalance.epoch + epoch: + metric: epoch + type: updowncounter + unit: "{epoch}" + desc: The epoch or generation number of this worker. + # kafka.connect.worker.rebalance.leader + # kafka.connect.worker.rebalance.avg.time + rebalance-avg-time-ms: + metric: avg.time + type: gauge + unit: s + desc: The average time in milliseconds spent by this worker to rebalance. + # kafka.connect.worker.rebalance.max.time + rebalance-max-time-ms: + metric: max.time + type: gauge + unit: s + desc: The maximum time in milliseconds spent by this worker to rebalance. + # kafka.connect.worker.rebalance.active + rebalancing: + metric: active + type: updowncounter + unit: '1' + desc: Whether this worker is currently rebalancing. + metricAttribute: + kafka.connect.worker.rebalance.state: + rebalancing: ["true", "TRUE"] + idle: ["false", "FALSE"] + unknown: "*" + # kafka.connect.worker.rebalance.since_last + time-since-last-rebalance-ms: + metric: since_last + type: gauge + unit: s + desc: The time in milliseconds since this worker completed the most recent rebalance. + + - bean: kafka.connect:type=connector-metrics,connector=* + prefix: kafka.connect.connector. + metricAttribute: + kafka.connect.connector: param(connector) + mapping: + # kafka.connect.connector.status + status: + metric: status + type: updowncounter + desc: Connector lifecycle state indicator (1 when the state matches the attribute value). Supports Apache and Confluent status values. + metricAttribute: + kafka.connect.connector.state: + running: [running, RUNNING] + failed: [failed, FAILED] + paused: [paused, PAUSED] + unassigned: [unassigned, UNASSIGNED] + restarting: [restarting, RESTARTING] + degraded: [degraded, DEGRADED] + stopped: [stopped, STOPPED] + unknown: "*" + + - bean: kafka.connect:type=connector-task-metrics,connector=*,task=* + prefix: kafka.connect.task. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + mapping: + # kafka.connect.task.batch.size.avg + batch-size-avg: + metric: batch.size.avg + type: gauge + unit: "{record}" + desc: The average number of records in the batches the task has processed so far. + # kafka.connect.task.batch.size.max + batch-size-max: + metric: batch.size.max + type: gauge + unit: "{record}" + desc: The number of records in the largest batch the task has processed so far. + # kafka.connect.task.offset.commit.avg.time + offset-commit-avg-time-ms: + metric: offset.commit.avg.time + type: gauge + unit: s + desc: The average time in milliseconds taken by this task to commit offsets. + # kafka.connect.task.offset.commit.failure.percentage + offset-commit-failure-percentage: + metric: offset.commit.failure.percentage + type: gauge + unit: '1' + desc: The average percentage of this task's offset commit attempts that failed. + # kafka.connect.task.offset.commit.max.time + offset-commit-max-time-ms: + metric: offset.commit.max.time + type: gauge + unit: s + desc: The maximum time in milliseconds taken by this task to commit offsets. + # kafka.connect.task.offset.commit.success.percentage + offset-commit-success-percentage: + metric: offset.commit.success.percentage + type: gauge + unit: '1' + desc: The average percentage of this task's offset commit attempts that succeeded. + # kafka.connect.task.pause.ratio + pause-ratio: + metric: pause.ratio + type: gauge + unit: '1' + desc: The fraction of time this task has spent in the pause state. + # kafka.connect.task.running.ratio + running-ratio: + metric: running.ratio + type: gauge + unit: '1' + desc: The fraction of time this task has spent in the running state. + # kafka.connect.task.status + status: + metric: status + type: updowncounter + desc: The status of the connector task. Supports Apache (unassigned, running, paused, failed, restarting) and Confluent (unassigned, running, paused, failed, destroyed) values. + metricAttribute: + kafka.connect.task.state: + running: [running, RUNNING] + failed: [failed, FAILED] + paused: [paused, PAUSED] + unassigned: [unassigned, UNASSIGNED] + restarting: [restarting, RESTARTING] + destroyed: [destroyed, DESTROYED] + unknown: "*" + # kafka.connect.task.class + + - bean: kafka.connect:type=sink-task-metrics,connector=*,task=* + prefix: kafka.connect.sink. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + mapping: + # kafka.connect.sink.offset.commit.completion.rate + offset-commit-completion-rate: + metric: offset.commit.completion.rate + type: gauge + unit: "{commit}/s" + desc: The average per-second number of offset commit completions that were completed successfully. + # kafka.connect.sink.offset.commit.completion.total + offset-commit-completion-total: + metric: offset.commit.completion.total + type: counter + unit: "{commit}" + desc: The total number of offset commit completions that were completed successfully. + # kafka.connect.sink.offset.commit.seq + offset-commit-seq-no: + metric: offset.commit.seq + type: updowncounter + unit: "{sequence}" + desc: The current sequence number for offset commits. + # kafka.connect.sink.offset.commit.skip.rate + offset-commit-skip-rate: + metric: offset.commit.skip.rate + type: gauge + unit: "{commit}/s" + desc: The average per-second number of offset commit completions that were received too late and skipped/ignored. + # kafka.connect.sink.offset.commit.skip.total + offset-commit-skip-total: + metric: offset.commit.skip.total + type: counter + unit: "{commit}" + desc: The total number of offset commit completions that were received too late and skipped/ignored. + # kafka.connect.sink.partition.count + partition-count: + metric: partition.count + type: updowncounter + unit: "{partition}" + desc: The number of topic partitions assigned to this task belonging to the named sink connector in this worker. + # kafka.connect.sink.put.batch.avg.time + put-batch-avg-time-ms: + metric: put.batch.avg.time + type: gauge + unit: s + desc: The average time taken by this task to put a batch of sinks records. + # kafka.connect.sink.put.batch.max.time + put-batch-max-time-ms: + metric: put.batch.max.time + type: gauge + unit: s + desc: The maximum time taken by this task to put a batch of sinks records. + # kafka.connect.sink.record.active.count + sink-record-active-count: + metric: record.active.count + type: updowncounter + unit: "{record}" + desc: The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. + # kafka.connect.sink.record.active.count.avg + sink-record-active-count-avg: + metric: record.active.count.avg + type: gauge + unit: "{record}" + desc: The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. + # kafka.connect.sink.record.active.count.max + sink-record-active-count-max: + metric: record.active.count.max + type: gauge + unit: "{record}" + desc: The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task. + # kafka.connect.sink.record.lag.max + sink-record-lag-max: + metric: record.lag.max + type: gauge + unit: "{record}" + desc: The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions. + # kafka.connect.sink.record.read.rate + sink-record-read-rate: + metric: record.read.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records read from Kafka for this task belonging to the named sink connector in this worker. This is before transformations are applied. + # kafka.connect.sink.record.read.total + sink-record-read-total: + metric: record.read.total + type: counter + unit: "{record}" + desc: The total number of records read from Kafka by this task belonging to the named sink connector in this worker, since the task was last restarted. + # kafka.connect.sink.record.send.rate + sink-record-send-rate: + metric: record.send.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations. + # kafka.connect.sink.record.send.total + sink-record-send-total: + metric: record.send.total + type: counter + unit: "{record}" + desc: The total number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker, since the task was last restarted. + + - bean: kafka.connect:type=source-task-metrics,connector=*,task=* + prefix: kafka.connect.source. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + mapping: + # kafka.connect.source.poll.batch.avg.time + poll-batch-avg-time-ms: + metric: poll.batch.avg.time + type: gauge + unit: s + desc: The average time in milliseconds taken by this task to poll for a batch of source records. + # kafka.connect.source.poll.batch.max.time + poll-batch-max-time-ms: + metric: poll.batch.max.time + type: gauge + unit: s + desc: The maximum time in milliseconds taken by this task to poll for a batch of source records. + # kafka.connect.source.record.active.count + source-record-active-count: + metric: record.active.count + type: updowncounter + unit: "{record}" + desc: The number of records that have been produced by this task but not yet completely written to Kafka. + # kafka.connect.source.record.active.count.avg + source-record-active-count-avg: + metric: record.active.count.avg + type: gauge + unit: "{record}" + desc: The average number of records that have been produced by this task but not yet completely written to Kafka. + # kafka.connect.source.record.active.count.max + source-record-active-count-max: + metric: record.active.count.max + type: gauge + unit: "{record}" + desc: The maximum number of records that have been produced by this task but not yet completely written to Kafka. + # kafka.connect.source.record.poll.rate + source-record-poll-rate: + metric: record.poll.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker. + # kafka.connect.source.record.poll.total + source-record-poll-total: + metric: record.poll.total + type: counter + unit: "{record}" + desc: The total number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker. + # kafka.connect.source.record.write.rate + source-record-write-rate: + metric: record.write.rate + type: gauge + unit: "{record}/s" + desc: The average per-second number of records written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations. + # kafka.connect.source.record.write.total + source-record-write-total: + metric: record.write.total + type: counter + unit: "{record}" + desc: The number of records output written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations. + # kafka.connect.source.transaction.size.avg + transaction-size-avg: + metric: transaction.size.avg + type: gauge + unit: "{record}" + desc: The average number of records in the transactions the task has committed so far. + # kafka.connect.source.transaction.size.max + transaction-size-max: + metric: transaction.size.max + type: gauge + unit: "{record}" + desc: The number of records in the largest transaction the task has committed so far. + # kafka.connect.source.transaction.size.min + transaction-size-min: + metric: transaction.size.min + type: gauge + unit: "{record}" + desc: The number of records in the smallest transaction the task has committed so far. + + - bean: kafka.connect:type=task-error-metrics,connector=*,task=* + prefix: kafka.connect.task.error. + metricAttribute: + kafka.connect.connector: param(connector) + kafka.connect.task.id: param(task) + mapping: + # kafka.connect.task.error.deadletterqueue.produce.failures + deadletterqueue-produce-failures: + metric: deadletterqueue.produce.failures + type: counter + unit: "{failure}" + desc: The number of failed writes to the dead letter queue. + # kafka.connect.task.error.deadletterqueue.produce.requests + deadletterqueue-produce-requests: + metric: deadletterqueue.produce.requests + type: counter + unit: "{request}" + desc: The number of attempted writes to the dead letter queue. + # kafka.connect.task.error.last.error.timestamp + last-error-timestamp: + metric: last.error.timestamp + type: gauge + unit: s + dropNegativeValues: true + desc: The epoch timestamp when this task last encountered an error. + # kafka.connect.task.error.total.errors.logged + total-errors-logged: + metric: total.errors.logged + type: counter + unit: "{error}" + desc: The number of errors that were logged. + # kafka.connect.task.error.total.record.errors + total-record-errors: + metric: total.record.errors + type: counter + unit: "{record}" + desc: The number of record processing errors in this task. + # kafka.connect.task.error.total.record.failures + total-record-failures: + metric: total.record.failures + type: counter + unit: "{record}" + desc: The number of record processing failures in this task. + # kafka.connect.task.error.total.records.skipped + total-records-skipped: + metric: total.records.skipped + type: counter + unit: "{record}" + desc: The number of records skipped due to errors. + # kafka.connect.task.error.total.retries + total-retries: + metric: total.retries + type: counter + unit: "{retry}" + desc: The number of operations retried. diff --git a/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java new file mode 100644 index 000000000000..28c39794af03 --- /dev/null +++ b/instrumentation/jmx-metrics/library/src/test/java/io/opentelemetry/instrumentation/jmx/rules/KafkaConnectRuleTest.java @@ -0,0 +1,466 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.jmx.rules; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.jmx.JmxTelemetry; +import io.opentelemetry.instrumentation.jmx.engine.MetricInfo; +import io.opentelemetry.instrumentation.jmx.yaml.JmxConfig; +import io.opentelemetry.instrumentation.jmx.yaml.JmxRule; +import io.opentelemetry.instrumentation.jmx.yaml.Metric; +import io.opentelemetry.instrumentation.jmx.yaml.RuleParser; +import io.opentelemetry.instrumentation.jmx.yaml.StateMapping; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.AttributeNotFoundException; +import javax.management.DynamicMBean; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class KafkaConnectRuleTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + private static MBeanServer mbeanServer; + + @Test + void kafkaConnectConfigParsesAndBuilds() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + assertThat(config.getRules()).isNotEmpty(); + + // ensure all metric definitions build without throwing + for (JmxRule rule : config.getRules()) { + assertThatCode(rule::buildMetricDef).doesNotThrowAnyException(); + } + } + + @Test + void kafkaConnectRulesUseBasicMetricTypes() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + + assertThat(config.getRules()) + .allSatisfy( + rule -> { + assertThat(rule.getMetricType()).isNotEqualTo(MetricInfo.Type.STATE); + rule.getMapping() + .values() + .forEach( + metric -> + assertThat(metric.getMetricType()).isNotEqualTo(MetricInfo.Type.STATE)); + }); + } + + @Test + void connectorStatusStateMappingPresent() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + + JmxRule connectorRule = + getRuleForBean(config, "kafka.connect:type=connector-metrics,connector=*"); + + StateMapping stateMapping = getMetric(connectorRule, "status").getStateMapping(); + assertThat(getMetric(connectorRule, "status").getMetricType()) + .isEqualTo(MetricInfo.Type.UPDOWNCOUNTER); + assertThat(stateMapping.isEmpty()).isFalse(); + assertThat(stateMapping.getStateKeys()) + .contains( + "running", + "failed", + "paused", + "unassigned", + "restarting", + "degraded", + "stopped", + "unknown"); + assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown"); + assertThat(stateMapping.getStateValue("RUNNING")).isEqualTo("running"); + assertThat(stateMapping.getStateValue("FAILED")).isEqualTo("failed"); + assertThat(stateMapping.getStateValue("PAUSED")).isEqualTo("paused"); + assertThat(stateMapping.getStateValue("UNKNOWN")).isEqualTo("unknown"); + } + + @Test + void taskStatusStateMappingSuperset() throws Exception { + JmxConfig config = loadKafkaConnectConfig(); + + JmxRule connectorTaskRule = + getRuleForBean(config, "kafka.connect:type=connector-task-metrics,connector=*,task=*"); + + StateMapping stateMapping = getMetric(connectorTaskRule, "status").getStateMapping(); + assertThat(getMetric(connectorTaskRule, "status").getMetricType()) + .isEqualTo(MetricInfo.Type.UPDOWNCOUNTER); + assertThat(stateMapping.isEmpty()).isFalse(); + assertThat(stateMapping.getStateKeys()) + .contains( + "running", "failed", "paused", "unassigned", "restarting", "destroyed", "unknown"); + assertThat(stateMapping.getDefaultStateKey()).isEqualTo("unknown"); + assertThat(stateMapping.getStateValue("DESTROYED")).isEqualTo("destroyed"); + assertThat(stateMapping.getStateValue("RESTARTING")).isEqualTo("restarting"); + assertThat(stateMapping.getStateValue("unexpected")).isEqualTo("unknown"); + } + + @Test + void confluentCompatibleMetricsCollectWithoutApacheOnlyAttributes() throws Exception { + registerMBean( + "kafka.connect:type=connect-worker-metrics", + mapOf("connector-count", 1L, "task-count", 2L)); + + registerMBean( + "kafka.connect:type=connector-metrics,connector=confluent-connector", + mapOf( + "connector-type", "source", + "status", "RUNNING")); + + registerMBean( + "kafka.connect:type=connector-task-metrics,connector=confluent-connector,task=0", + mapOf( + "status", + "DESTROYED", + "connector-type", + "sink", + "batch-size-avg", + 1L, + "batch-size-max", + 2L, + "offset-commit-avg-time-ms", + 5L, + "offset-commit-failure-percentage", + 0.0d, + "offset-commit-max-time-ms", + 6L, + "offset-commit-success-percentage", + 100.0d, + "pause-ratio", + 0.0d, + "running-ratio", + 1.0d)); + + startKafkaConnectTelemetry(); + + assertLongSum("kafka.connect.worker.connector.count", Attributes.empty(), 1); + assertLongSum("kafka.connect.worker.task.count", Attributes.empty(), 2); + + Attributes connectorStatusAttributes = + Attributes.builder() + .put("kafka.connect.connector", "confluent-connector") + .put("kafka.connect.connector.state", "running") + .build(); + assertLongSum("kafka.connect.connector.status", connectorStatusAttributes, 1); + + Attributes taskStatusAttributes = + Attributes.builder() + .put("kafka.connect.connector", "confluent-connector") + .put("kafka.connect.task.id", "0") + .put("kafka.connect.task.state", "destroyed") + .build(); + assertLongSum("kafka.connect.task.status", taskStatusAttributes, 1); + } + + @Test + void apacheSpecificMetricsAreReportedWhenPresent() throws Exception { + registerMBean( + "kafka.connect:type=connect-worker-rebalance-metrics", mapOf("connect-protocol", "eager")); + + registerMBean( + "kafka.connect:type=connect-worker-metrics,connector=apache-connector", + mapOf( + "connector-running-task-count", 2L, + "connector-unassigned-task-count", 0L)); + + registerMBean( + "kafka.connect:type=connector-task-metrics,connector=apache-connector,task=1", + mapOf( + "connector-type", + "source", + "status", + "RUNNING", + "batch-size-avg", + 4L, + "batch-size-max", + 5L, + "offset-commit-avg-time-ms", + 6L, + "offset-commit-failure-percentage", + 0.0d, + "offset-commit-max-time-ms", + 7L, + "offset-commit-success-percentage", + 100.0d, + "pause-ratio", + 0.0d, + "running-ratio", + 1.0d)); + + registerMBean( + "kafka.connect:type=source-task-metrics,connector=apache-connector,task=1", + mapOf( + "transaction-size-avg", 3L, + "transaction-size-max", 6L, + "transaction-size-min", 1L)); + + registerMBean( + "kafka.connect:type=sink-task-metrics,connector=apache-connector,task=1", + mapOf("sink-record-lag-max", 11L)); + + startKafkaConnectTelemetry(); + + assertLongSum( + "kafka.connect.worker.rebalance.protocol", + Attributes.of( + io.opentelemetry.api.common.AttributeKey.stringKey("kafka.connect.protocol.state"), + "eager"), + 1); + + Attributes connectorTaskAttributes = + Attributes.of( + io.opentelemetry.api.common.AttributeKey.stringKey("kafka.connect.connector"), + "apache-connector"); + assertLongSum("kafka.connect.worker.connector.task.running", connectorTaskAttributes, 2); + + assertLongGauge( + "kafka.connect.source.transaction.size.max", + Attributes.builder() + .put("kafka.connect.connector", "apache-connector") + .put("kafka.connect.task.id", "1") + .build(), + 6); + + assertLongGauge( + "kafka.connect.sink.record.lag.max", + Attributes.builder() + .put("kafka.connect.connector", "apache-connector") + .put("kafka.connect.task.id", "1") + .build(), + 11); + } + + @BeforeAll + static void setUp() { + mbeanServer = MBeanServerFactory.createMBeanServer("kafka.connect"); + } + + @AfterEach + void cleanUp() throws Exception { + for (ObjectName name : mbeanServer.queryNames(new ObjectName("kafka.connect:*"), null)) { + try { + mbeanServer.unregisterMBean(name); + } catch (InstanceNotFoundException | MBeanRegistrationException ignored) { + // best effort cleanup for flaky tests + } + } + testing.clearData(); + } + + @AfterAll + static void tearDown() { + MBeanServerFactory.releaseMBeanServer(mbeanServer); + } + + private JmxConfig loadKafkaConnectConfig() throws Exception { + try (InputStream input = + getClass().getClassLoader().getResourceAsStream("jmx/rules/kafka-connect.yaml")) { + assertThat(input).isNotNull(); + return RuleParser.get().loadConfig(input); + } + } + + private static void startKafkaConnectTelemetry() { + JmxTelemetry.builder(testing.getOpenTelemetry()) + .addClassPathRules("kafka-connect") + .build() + .start(() -> Collections.singletonList(mbeanServer)); + } + + private static JmxRule getRuleForBean(JmxConfig config, String bean) { + return config.getRules().stream() + .filter(rule -> rule.getBeans().contains(bean)) + .findFirst() + .orElseThrow(() -> new AssertionError("Missing rule for bean " + bean)); + } + + private static Metric getMetric(JmxRule rule, String metricKey) { + Metric metric = rule.getMapping().get(metricKey); + if (metric == null) { + throw new AssertionError("Missing metric " + metricKey + " in rule " + rule.getBeans()); + } + return metric; + } + + private static Map mapOf(Object... kvPairs) { + Map map = new HashMap<>(); + for (int i = 0; i < kvPairs.length; i += 2) { + map.put((String) kvPairs[i], kvPairs[i + 1]); + } + return map; + } + + private static void registerMBean(String objectName, Map attributes) + throws Exception { + mbeanServer.registerMBean(new MapBackedDynamicMBean(attributes), new ObjectName(objectName)); + } + + private static void assertLongSum(String metricName, Attributes attributes, long expectedValue) { + testing.waitAndAssertMetrics( + "io.opentelemetry.jmx", + metricName, + metrics -> + metrics.anySatisfy( + metric -> { + boolean matched = + metric.getLongSumData().getPoints().stream() + .anyMatch( + pointData -> + attributesMatch(pointData.getAttributes(), attributes) + && pointData.getValue() == expectedValue); + assertThat(matched) + .as( + "Expected %s to have a point with attributes %s and value %s", + metricName, attributes, expectedValue) + .isTrue(); + })); + } + + private static void assertLongGauge(String metricName, Attributes attributes, long expected) { + testing.waitAndAssertMetrics( + "io.opentelemetry.jmx", + metricName, + metrics -> + metrics.anySatisfy( + metric -> { + boolean matched = + metric.getLongGaugeData().getPoints().stream() + .anyMatch( + pointData -> + attributesMatch(pointData.getAttributes(), attributes) + && pointData.getValue() == expected); + assertThat(matched) + .as( + "Expected %s to have a point with attributes %s and value %s", + metricName, attributes, expected) + .isTrue(); + })); + } + + private static boolean attributesMatch(Attributes actual, Attributes expected) { + for (Map.Entry, Object> entry : + expected.asMap().entrySet()) { + Object actualValue = actual.get(entry.getKey()); + if (!entry.getValue().equals(actualValue)) { + return false; + } + } + return true; + } + + /** + * Minimal DynamicMBean implementation backed by a simple attribute map. This keeps the functional + * Kafka Connect coverage self contained in the test file. + */ + static class MapBackedDynamicMBean extends NotificationBroadcasterSupport + implements DynamicMBean { + + private final Map attributes; + private long sequenceNumber = 1; + + MapBackedDynamicMBean(Map attributes) { + this.attributes = new HashMap<>(attributes); + } + + @Override + public Object getAttribute(String attribute) + throws AttributeNotFoundException, MBeanException, ReflectionException { + if (!attributes.containsKey(attribute)) { + throw new AttributeNotFoundException(attribute); + } + return attributes.get(attribute); + } + + @Override + public void setAttribute(Attribute attribute) { + attributes.put(attribute.getName(), attribute.getValue()); + Notification n = + new Notification( + "jmx.attribute.changed", + this, + sequenceNumber++, + System.currentTimeMillis(), + attribute.getName()); + sendNotification(n); + } + + @Override + public AttributeList getAttributes(String[] attributes) { + AttributeList list = new AttributeList(); + for (String attr : attributes) { + Object value = this.attributes.get(attr); + if (value != null) { + list.add(new Attribute(attr, value)); + } + } + return list; + } + + @Override + public AttributeList setAttributes(AttributeList attributes) { + AttributeList updated = new AttributeList(); + for (Object attribute : attributes) { + if (attribute instanceof Attribute) { + Attribute attr = (Attribute) attribute; + setAttribute(attr); + updated.add(attr); + } + } + return updated; + } + + @Override + public Object invoke(String actionName, Object[] params, String[] signature) + throws MBeanException, ReflectionException { + return null; + } + + @Override + public MBeanInfo getMBeanInfo() { + List infos = new ArrayList<>(); + attributes.forEach( + (name, value) -> + infos.add( + new javax.management.MBeanAttributeInfo( + name, value.getClass().getName(), name + " attribute", true, true, false))); + return new MBeanInfo( + this.getClass().getName(), + "Map backed test MBean", + infos.toArray(new javax.management.MBeanAttributeInfo[0]), + null, + null, + null); + } + } +}