diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index a07e12a518abb..04b4f26caddc1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.CloseOptions; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; @@ -1145,6 +1146,8 @@ protected void close(Timer timer, CloseOptions.GroupMembershipOperation membersh log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", client.pendingRequestCount(coordinator)); } + + Utils.closeQuietly(sensors, "coordinator metrics"); } } @@ -1330,14 +1333,6 @@ boolean generationUnchanged() { } } - protected final Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) { - return new Meter(new WindowedCount(), - metrics.metricName(baseName + "-rate", groupName, - String.format("The number of %s per second", descriptiveName)), - metrics.metricName(baseName + "-total", groupName, - String.format("The total number of %s", descriptiveName))); - } - /** * Visible for testing. */ @@ -1345,82 +1340,69 @@ protected BaseHeartbeatThread heartbeatThread() { return heartbeatThread; } - private class GroupCoordinatorMetrics { - public final String metricGrpName; - + private class GroupCoordinatorMetrics extends AbstractConsumerMetricsManager { public final Sensor heartbeatSensor; public final Sensor joinSensor; public final Sensor syncSensor; public final Sensor successfulRebalanceSensor; public final Sensor failedRebalanceSensor; + @SuppressWarnings({"this-escape"}) public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { - this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; + super(metrics, metricGrpPrefix + "-coordinator-metrics"); - this.heartbeatSensor = metrics.sensor("heartbeat-latency"); - this.heartbeatSensor.add(metrics.metricName("heartbeat-response-time-max", - this.metricGrpName, + this.heartbeatSensor = sensor("heartbeat-latency"); + this.heartbeatSensor.add(metricName("heartbeat-response-time-max", "The max time taken to receive a response to a heartbeat request"), new Max()); - this.heartbeatSensor.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats")); + this.heartbeatSensor.add(createMeter("heartbeat", "heartbeats")); - this.joinSensor = metrics.sensor("join-latency"); - this.joinSensor.add(metrics.metricName("join-time-avg", - this.metricGrpName, + this.joinSensor = sensor("join-latency"); + this.joinSensor.add(metricName("join-time-avg", "The average time taken for a group rejoin"), new Avg()); - this.joinSensor.add(metrics.metricName("join-time-max", - this.metricGrpName, + this.joinSensor.add(metricName("join-time-max", "The max time taken for a group rejoin"), new Max()); - this.joinSensor.add(createMeter(metrics, metricGrpName, "join", "group joins")); + this.joinSensor.add(createMeter("join", "group joins")); - this.syncSensor = metrics.sensor("sync-latency"); - this.syncSensor.add(metrics.metricName("sync-time-avg", - this.metricGrpName, + this.syncSensor = sensor("sync-latency"); + this.syncSensor.add(metricName("sync-time-avg", "The average time taken for a group sync"), new Avg()); - this.syncSensor.add(metrics.metricName("sync-time-max", - this.metricGrpName, + this.syncSensor.add(metricName("sync-time-max", "The max time taken for a group sync"), new Max()); - this.syncSensor.add(createMeter(metrics, metricGrpName, "sync", "group syncs")); + this.syncSensor.add(createMeter("sync", "group syncs")); - this.successfulRebalanceSensor = metrics.sensor("rebalance-latency"); - this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-avg", - this.metricGrpName, + this.successfulRebalanceSensor = sensor("rebalance-latency"); + this.successfulRebalanceSensor.add(metricName("rebalance-latency-avg", "The average time taken for a group to complete a successful rebalance, which may be composed of " + "several failed re-trials until it succeeded"), new Avg()); - this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-max", - this.metricGrpName, + this.successfulRebalanceSensor.add(metricName("rebalance-latency-max", "The max time taken for a group to complete a successful rebalance, which may be composed of " + "several failed re-trials until it succeeded"), new Max()); - this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-total", - this.metricGrpName, + this.successfulRebalanceSensor.add(metricName("rebalance-latency-total", "The total number of milliseconds this consumer has spent in successful rebalances since creation"), new CumulativeSum()); this.successfulRebalanceSensor.add( - metrics.metricName("rebalance-total", - this.metricGrpName, + metricName("rebalance-total", "The total number of successful rebalance events, each event is composed of " + "several failed re-trials until it succeeded"), new CumulativeCount() ); this.successfulRebalanceSensor.add( - metrics.metricName( + metricName( "rebalance-rate-per-hour", - this.metricGrpName, "The number of successful rebalance events per hour, each event is composed of " + "several failed re-trials until it succeeded"), new Rate(TimeUnit.HOURS, new WindowedCount()) ); - this.failedRebalanceSensor = metrics.sensor("failed-rebalance"); + this.failedRebalanceSensor = sensor("failed-rebalance"); this.failedRebalanceSensor.add( - metrics.metricName("failed-rebalance-total", - this.metricGrpName, + metricName("failed-rebalance-total", "The total number of failed rebalance events"), new CumulativeCount() ); this.failedRebalanceSensor.add( - metrics.metricName( + metricName( "failed-rebalance-rate-per-hour", - this.metricGrpName, "The number of failed rebalance events per hour"), new Rate(TimeUnit.HOURS, new WindowedCount()) ); @@ -1432,8 +1414,7 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { else return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS); }; - metrics.addMetric(metrics.metricName("last-rebalance-seconds-ago", - this.metricGrpName, + addMetric(metricName("last-rebalance-seconds-ago", "The number of seconds since the last successful rebalance event"), lastRebalance); @@ -1444,11 +1425,18 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { else return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); }; - metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", - this.metricGrpName, + addMetric(metricName("last-heartbeat-seconds-ago", "The number of seconds since the last coordinator heartbeat was sent"), lastHeartbeat); } + + protected final Meter createMeter(String baseName, String descriptiveName) { + return new Meter(new WindowedCount(), + metricName(baseName + "-rate", + String.format("The number of %s per second", descriptiveName)), + metricName(baseName + "-total", + String.format("The total number of %s", descriptiveName))); + } } private class HeartbeatThread extends BaseHeartbeatThread { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 938ae909027d0..4c12967fd5580 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -287,6 +287,8 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() { private final ApplicationEventHandler applicationEventHandler; private final Time time; private final AtomicReference> groupMetadata = new AtomicReference<>(Optional.empty()); + private final FetchMetricsManager fetchMetricsManager; + private final RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager; private final AsyncConsumerMetrics asyncConsumerMetrics; private final KafkaConsumerMetrics kafkaConsumerMetrics; private Logger log; @@ -416,7 +418,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, final List addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); - FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics); + this.fetchMetricsManager = createFetchMetricsManager(metrics); FetchConfig fetchConfig = new FetchConfig(config); this.isolationLevel = fetchConfig.isolationLevel; @@ -475,11 +477,12 @@ public AsyncKafkaConsumer(final ConsumerConfig config, requestManagersSupplier, asyncConsumerMetrics ); + this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics) + rebalanceCallbackMetricsManager ); this.streamsRebalanceListenerInvoker = streamsRebalanceData.map(s -> new StreamsRebalanceListenerInvoker(logContext, s)); @@ -518,6 +521,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, String clientId, Deserializers deserializers, FetchBuffer fetchBuffer, + FetchMetricsManager fetchMetricsManager, + RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager, FetchCollector fetchCollector, ConsumerInterceptors interceptors, Time time, @@ -537,6 +542,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.subscriptions = subscriptions; this.clientId = clientId; this.fetchBuffer = fetchBuffer; + this.fetchMetricsManager = fetchMetricsManager; + this.rebalanceCallbackMetricsManager = rebalanceCallbackMetricsManager; this.fetchCollector = fetchCollector; this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = Objects.requireNonNull(interceptors); @@ -591,7 +598,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.clientTelemetryReporter = Optional.empty(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(); - FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); + this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); this.fetchCollector = new FetchCollector<>(logContext, metadata, subscriptions, @@ -616,11 +623,12 @@ public AsyncKafkaConsumer(final ConsumerConfig config, time, asyncConsumerMetrics ); + this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics) + rebalanceCallbackMetricsManager ); ApiVersions apiVersions = new ApiVersions(); Supplier networkClientDelegateSupplier = () -> new NetworkClientDelegate( @@ -1463,6 +1471,8 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(asyncConsumerMetrics, "async consumer metrics", firstException); + closeQuietly(fetchMetricsManager, "consumer fetch metrics", firstException); + closeQuietly(rebalanceCallbackMetricsManager, "consumer rebalance callback metrics"); closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "async consumer telemetry reporter", firstException)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 787d710535e0c..f52ec096a85ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -125,6 +125,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { private final Optional groupId; private final ConsumerCoordinator coordinator; private final Deserializers deserializers; + private final FetchMetricsManager fetchMetricsManager; private final Fetcher fetcher; private final OffsetFetcher offsetFetcher; private final TopicMetadataFetcher topicMetadataFetcher; @@ -191,7 +192,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { List addresses = ClientUtils.parseAndValidateAddresses(config); this.metadata.bootstrap(addresses); - FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics); + this.fetchMetricsManager = createFetchMetricsManager(metrics); FetchConfig fetchConfig = new FetchConfig(config); this.isolationLevel = fetchConfig.isolationLevel; @@ -362,7 +363,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { boolean checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG); ConsumerMetrics metricsRegistry = new ConsumerMetrics(); - FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); + this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); ApiVersions apiVersions = new ApiVersions(); FetchConfig fetchConfig = new FetchConfig( minBytes, @@ -381,7 +382,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { subscriptions, fetchConfig, deserializers, - metricsManager, + fetchMetricsManager, time, apiVersions ); @@ -1179,6 +1180,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); + closeQuietly(fetchMetricsManager, "kafka fetch metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(client, "consumer network client", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 4956d64228dbb..88dd984be1e7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -55,6 +56,8 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; @@ -108,6 +111,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final List assignors; private final ConsumerMetadata metadata; private final ConsumerCoordinatorMetrics coordinatorMetrics; + private final RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager; private final SubscriptionState subscriptions; private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; @@ -271,11 +275,12 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, protocol = null; } + this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix); this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, - new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix) + rebalanceCallbackMetricsManager ); this.metadata.requestUpdate(true); } @@ -1024,6 +1029,8 @@ public void close(final Timer timer, CloseOptions.GroupMembershipOperation membe } } finally { super.close(timer, membershipOperation); + Utils.closeQuietly(coordinatorMetrics, "consumer coordinator metrics"); + Utils.closeQuietly(rebalanceCallbackMetricsManager, "consumer rebalance callback metrics"); } } @@ -1621,24 +1628,26 @@ public String toString() { } } - private class ConsumerCoordinatorMetrics { + private class ConsumerCoordinatorMetrics extends AbstractConsumerMetricsManager { private final Sensor commitSensor; + @SuppressWarnings({"this-escape"}) private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { - String metricGrpName = metricGrpPrefix + COORDINATOR_METRICS_SUFFIX; + super(metrics, metricGrpPrefix + COORDINATOR_METRICS_SUFFIX); - this.commitSensor = metrics.sensor("commit-latency"); - this.commitSensor.add(metrics.metricName("commit-latency-avg", - metricGrpName, + this.commitSensor = sensor("commit-latency"); + this.commitSensor.add(metricName("commit-latency-avg", "The average time taken for a commit request"), new Avg()); - this.commitSensor.add(metrics.metricName("commit-latency-max", - metricGrpName, + this.commitSensor.add(metricName("commit-latency-max", "The max time taken for a commit request"), new Max()); - this.commitSensor.add(createMeter(metrics, metricGrpName, "commit", "commit calls")); + this.commitSensor.add(new Meter(new WindowedCount(), + metricName("commit-rate", + String.format("The number of %s per second", "commit calls")), + metricName("commit-total", + String.format("The total number of %s", "commit calls")))); Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions(); - metrics.addMetric(metrics.metricName("assigned-partitions", - metricGrpName, + addMetric(metricName("assigned-partitions", "The number of partitions currently assigned to this consumer"), numParts); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java index 98644180e8b0b..b6853e8e5d3ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Gauge; @@ -35,9 +36,8 @@ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it * records matches up with the topic-partitions in use. */ -public class FetchMetricsManager { +public class FetchMetricsManager extends AbstractConsumerMetricsManager { - private final Metrics metrics; private final FetchMetricsRegistry metricsRegistry; private final Sensor throttleTime; private final Sensor bytesFetched; @@ -49,32 +49,33 @@ public class FetchMetricsManager { private int assignmentId = 0; private Set assignedPartitions = Collections.emptySet(); + @SuppressWarnings({"this-escape"}) public FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry) { - this.metrics = metrics; + super(metrics, metricsRegistry.groupName()); this.metricsRegistry = metricsRegistry; - this.throttleTime = new SensorBuilder(metrics, "fetch-throttle-time") + this.throttleTime = sensorBuilder("fetch-throttle-time") .withAvg(metricsRegistry.fetchThrottleTimeAvg) .withMax(metricsRegistry.fetchThrottleTimeMax) .build(); - this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched") + this.bytesFetched = sensorBuilder("bytes-fetched") .withAvg(metricsRegistry.fetchSizeAvg) .withMax(metricsRegistry.fetchSizeMax) .withMeter(metricsRegistry.bytesConsumedRate, metricsRegistry.bytesConsumedTotal) .build(); - this.recordsFetched = new SensorBuilder(metrics, "records-fetched") + this.recordsFetched = sensorBuilder("records-fetched") .withAvg(metricsRegistry.recordsPerRequestAvg) .withMeter(metricsRegistry.recordsConsumedRate, metricsRegistry.recordsConsumedTotal) .build(); - this.fetchLatency = new SensorBuilder(metrics, "fetch-latency") + this.fetchLatency = sensorBuilder("fetch-latency") .withAvg(metricsRegistry.fetchLatencyAvg) .withMax(metricsRegistry.fetchLatencyMax) .withMeter(new WindowedCount(), metricsRegistry.fetchRequestRate, metricsRegistry.fetchRequestTotal) .build(); - this.recordsLag = new SensorBuilder(metrics, "records-lag") + this.recordsLag = sensorBuilder("records-lag") .withMax(metricsRegistry.recordsLagMax) .build(); - this.recordsLead = new SensorBuilder(metrics, "records-lead") + this.recordsLead = sensorBuilder("records-lead") .withMin(metricsRegistry.recordsLeadMin) .build(); } @@ -87,7 +88,7 @@ void recordLatency(String node, long requestLatencyMs) { fetchLatency.record(requestLatencyMs); if (!node.isEmpty()) { String nodeTimeName = "node-" + node + ".latency"; - Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); + Sensor nodeRequestTime = getSensor(nodeTimeName); if (nodeRequestTime != null) nodeRequestTime.record(requestLatencyMs); } @@ -105,7 +106,7 @@ void recordBytesFetched(String topic, int bytes) { String name = topicBytesFetchedMetricName(topic); maybeRecordDeprecatedBytesFetched(name, topic, bytes); - Sensor bytesFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic)) + Sensor bytesFetched = sensorBuilder(name, () -> Map.of("topic", topic)) .withAvg(metricsRegistry.topicFetchSizeAvg) .withMax(metricsRegistry.topicFetchSizeMax) .withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal) @@ -117,7 +118,7 @@ void recordRecordsFetched(String topic, int records) { String name = topicRecordsFetchedMetricName(topic); maybeRecordDeprecatedRecordsFetched(name, topic, records); - Sensor recordsFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic)) + Sensor recordsFetched = sensorBuilder(name, () -> Map.of("topic", topic)) .withAvg(metricsRegistry.topicRecordsPerRequestAvg) .withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal) .build(); @@ -130,7 +131,7 @@ void recordPartitionLag(TopicPartition tp, long lag) { String name = partitionRecordsLagMetricName(tp); maybeRecordDeprecatedPartitionLag(name, tp, lag); - Sensor recordsLag = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) + Sensor recordsLag = sensorBuilder(name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) .withValue(metricsRegistry.partitionRecordsLag) .withMax(metricsRegistry.partitionRecordsLagMax) .withAvg(metricsRegistry.partitionRecordsLagAvg) @@ -145,7 +146,7 @@ void recordPartitionLead(TopicPartition tp, long lead) { String name = partitionRecordsLeadMetricName(tp); maybeRecordDeprecatedPartitionLead(name, tp, lead); - Sensor recordsLead = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) + Sensor recordsLead = sensorBuilder(name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) .withValue(metricsRegistry.partitionRecordsLead) .withMin(metricsRegistry.partitionRecordsLeadMin) .withAvg(metricsRegistry.partitionRecordsLeadAvg) @@ -169,13 +170,13 @@ void maybeUpdateAssignment(SubscriptionState subscription) { for (TopicPartition tp : this.assignedPartitions) { if (!newAssignedPartitions.contains(tp)) { - metrics.removeSensor(partitionRecordsLagMetricName(tp)); - metrics.removeSensor(partitionRecordsLeadMetricName(tp)); - metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp)); + removeSensor(partitionRecordsLagMetricName(tp)); + removeSensor(partitionRecordsLeadMetricName(tp)); + removeMetric(partitionPreferredReadReplicaMetricName(tp)); // Remove deprecated metrics. - metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp))); - metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp))); - metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp)); + removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp))); + removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp))); + removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp)); } } @@ -184,7 +185,7 @@ void maybeUpdateAssignment(SubscriptionState subscription) { maybeRecordDeprecatedPreferredReadReplica(tp, subscription); MetricName metricName = partitionPreferredReadReplicaMetricName(tp); - metrics.addMetricIfAbsent( + addMetricIfAbsent( metricName, null, (Gauge) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) @@ -200,7 +201,7 @@ void maybeUpdateAssignment(SubscriptionState subscription) { @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedBytesFetched(String name, String topic, int bytes) { if (shouldReportDeprecatedMetric(topic)) { - Sensor deprecatedBytesFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic)) + Sensor deprecatedBytesFetched = sensorBuilder(deprecatedMetricName(name), () -> topicTags(topic)) .withAvg(metricsRegistry.topicFetchSizeAvg) .withMax(metricsRegistry.topicFetchSizeMax) .withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal) @@ -212,7 +213,7 @@ private void maybeRecordDeprecatedBytesFetched(String name, String topic, int by @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int records) { if (shouldReportDeprecatedMetric(topic)) { - Sensor deprecatedRecordsFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic)) + Sensor deprecatedRecordsFetched = sensorBuilder(deprecatedMetricName(name), () -> topicTags(topic)) .withAvg(metricsRegistry.topicRecordsPerRequestAvg) .withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal) .build(); @@ -223,7 +224,7 @@ private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, long lag) { if (shouldReportDeprecatedMetric(tp.topic())) { - Sensor deprecatedRecordsLag = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp)) + Sensor deprecatedRecordsLag = sensorBuilder(deprecatedMetricName(name), () -> topicPartitionTags(tp)) .withValue(metricsRegistry.partitionRecordsLag) .withMax(metricsRegistry.partitionRecordsLagMax) .withAvg(metricsRegistry.partitionRecordsLagAvg) @@ -236,7 +237,7 @@ private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, l @Deprecated // To be removed in Kafka 5.0 release. private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, double lead) { if (shouldReportDeprecatedMetric(tp.topic())) { - Sensor deprecatedRecordsLead = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp)) + Sensor deprecatedRecordsLead = sensorBuilder(deprecatedMetricName(name), () -> topicPartitionTags(tp)) .withValue(metricsRegistry.partitionRecordsLead) .withMin(metricsRegistry.partitionRecordsLeadMin) .withAvg(metricsRegistry.partitionRecordsLeadAvg) @@ -250,7 +251,7 @@ private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, SubscriptionState subscription) { if (shouldReportDeprecatedMetric(tp.topic())) { MetricName metricName = deprecatedPartitionPreferredReadReplicaMetricName(tp); - metrics.addMetricIfAbsent( + addMetricIfAbsent( metricName, null, (Gauge) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) @@ -284,13 +285,13 @@ private static boolean shouldReportDeprecatedMetric(String topic) { private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) { Map metricTags = mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))); - return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); + return metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); } @Deprecated private MetricName deprecatedPartitionPreferredReadReplicaMetricName(TopicPartition tp) { Map metricTags = topicPartitionTags(tp); - return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); + return metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); } @Deprecated diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java index 589cb6736b367..c0a66339c582e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java @@ -29,6 +29,7 @@ public class FetchMetricsRegistry { private static final String DEPRECATED_TOPIC_METRICS_MESSAGE = "Note: For topic names with periods (.), an additional " + "metric with underscores is emitted. However, the periods replaced metric is deprecated. Please use the metric with actual topic name instead."; + private final String groupName; public MetricNameTemplate fetchSizeAvg; public MetricNameTemplate fetchSizeMax; public MetricNameTemplate bytesConsumedRate; @@ -70,7 +71,7 @@ public FetchMetricsRegistry(String metricGrpPrefix) { public FetchMetricsRegistry(Set tags, String metricGrpPrefix) { /* Client level */ - String groupName = metricGrpPrefix + "-fetch-manager-metrics"; + this.groupName = metricGrpPrefix + "-fetch-manager-metrics"; this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, "The average number of bytes fetched per request", tags); @@ -148,6 +149,10 @@ public FetchMetricsRegistry(Set tags, String metricGrpPrefix) { "The current read replica for the partition, or -1 if reading from leader. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags); } + public String groupName() { + return groupName; + } + public List getAllTemplates() { return Arrays.asList( fetchSizeAvg, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java deleted file mode 100644 index a2346a3b376bb..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricNameTemplate; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Meter; -import org.apache.kafka.common.metrics.stats.Min; -import org.apache.kafka.common.metrics.stats.SampledStat; -import org.apache.kafka.common.metrics.stats.Value; - -import java.util.Collections; -import java.util.Map; -import java.util.function.Supplier; - -/** - * {@code SensorBuilder} takes a bit of the boilerplate out of creating {@link Sensor sensors} for recording - * {@link Metric metrics}. - */ -public class SensorBuilder { - - private final Metrics metrics; - - private final Sensor sensor; - - private final boolean preexisting; - - private final Map tags; - - public SensorBuilder(Metrics metrics, String name) { - this(metrics, name, Collections::emptyMap); - } - - public SensorBuilder(Metrics metrics, String name, Supplier> tagsSupplier) { - this.metrics = metrics; - Sensor s = metrics.getSensor(name); - - if (s != null) { - sensor = s; - tags = Collections.emptyMap(); - preexisting = true; - } else { - sensor = metrics.sensor(name); - tags = tagsSupplier.get(); - preexisting = false; - } - } - - SensorBuilder withAvg(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Avg()); - - return this; - } - - SensorBuilder withMin(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Min()); - - return this; - } - - SensorBuilder withMax(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Max()); - - return this; - } - - SensorBuilder withValue(MetricNameTemplate name) { - if (!preexisting) - sensor.add(metrics.metricInstance(name, tags), new Value()); - - return this; - } - - SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate totalName) { - if (!preexisting) { - sensor.add(new Meter(metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags))); - } - - return this; - } - - SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate rateName, MetricNameTemplate totalName) { - if (!preexisting) { - sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags))); - } - - return this; - } - - Sensor build() { - return sensor; - } - -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java index d3e60a3dfaaee..4a59884d2a2c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java @@ -16,15 +16,12 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.WindowedCount; -import java.io.IOException; -import java.util.Arrays; - -public class ShareFetchMetricsManager implements AutoCloseable { - private final Metrics metrics; +public class ShareFetchMetricsManager extends AbstractConsumerMetricsManager { private final Sensor throttleTime; private final Sensor bytesFetched; private final Sensor recordsFetched; @@ -32,35 +29,35 @@ public class ShareFetchMetricsManager implements AutoCloseable { private final Sensor sentAcknowledgements; private final Sensor failedAcknowledgements; + @SuppressWarnings({"this-escape"}) public ShareFetchMetricsManager(Metrics metrics, ShareFetchMetricsRegistry metricsRegistry) { - this.metrics = metrics; - - this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched") + super(metrics, metricsRegistry.groupName()); + this.bytesFetched = sensorBuilder("bytes-fetched") .withAvg(metricsRegistry.fetchSizeAvg) .withMax(metricsRegistry.fetchSizeMax) .withMeter(metricsRegistry.bytesFetchedRate, metricsRegistry.bytesFetchedTotal) .build(); - this.recordsFetched = new SensorBuilder(metrics, "records-fetched") + this.recordsFetched = sensorBuilder("records-fetched") .withAvg(metricsRegistry.recordsPerRequestAvg) .withMax(metricsRegistry.recordsPerRequestMax) .withMeter(metricsRegistry.recordsFetchedRate, metricsRegistry.recordsFetchedTotal) .build(); - this.sentAcknowledgements = new SensorBuilder(metrics, "sent-acknowledgements") + this.sentAcknowledgements = sensorBuilder("sent-acknowledgements") .withMeter(metricsRegistry.acknowledgementSendRate, metricsRegistry.acknowledgementSendTotal) .build(); - this.failedAcknowledgements = new SensorBuilder(metrics, "failed-acknowledgements") + this.failedAcknowledgements = sensorBuilder("failed-acknowledgements") .withMeter(metricsRegistry.acknowledgementErrorRate, metricsRegistry.acknowledgementErrorTotal) .build(); - this.fetchLatency = new SensorBuilder(metrics, "fetch-latency") + this.fetchLatency = sensorBuilder("fetch-latency") .withAvg(metricsRegistry.fetchLatencyAvg) .withMax(metricsRegistry.fetchLatencyMax) .withMeter(new WindowedCount(), metricsRegistry.fetchRequestRate, metricsRegistry.fetchRequestTotal) .build(); - this.throttleTime = new SensorBuilder(metrics, "fetch-throttle-time") + this.throttleTime = sensorBuilder("fetch-throttle-time") .withAvg(metricsRegistry.fetchThrottleTimeAvg) .withMax(metricsRegistry.fetchThrottleTimeMax) .build(); @@ -74,7 +71,7 @@ void recordLatency(String node, long requestLatencyMs) { fetchLatency.record(requestLatencyMs); if (!node.isEmpty()) { String nodeTimeName = "node-" + node + ".latency"; - Sensor nodeRequestTime = metrics.getSensor(nodeTimeName); + Sensor nodeRequestTime = getSensor(nodeTimeName); if (nodeRequestTime != null) nodeRequestTime.record(requestLatencyMs); } @@ -95,16 +92,4 @@ void recordAcknowledgementSent(int acknowledgements) { void recordFailedAcknowledgements(int acknowledgements) { failedAcknowledgements.record(acknowledgements); } - - @Override - public void close() throws IOException { - Arrays.asList( - throttleTime.name(), - bytesFetched.name(), - recordsFetched.name(), - fetchLatency.name(), - sentAcknowledgements.name(), - failedAcknowledgements.name() - ).forEach(metrics::removeSensor); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java index 2ae7952b6671d..c810d99accf57 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsRegistry.java @@ -23,6 +23,8 @@ public class ShareFetchMetricsRegistry { + private final String groupName; + public MetricNameTemplate fetchSizeAvg; public MetricNameTemplate fetchSizeMax; public MetricNameTemplate bytesFetchedRate; @@ -53,7 +55,7 @@ public ShareFetchMetricsRegistry(String metricGrpPrefix) { public ShareFetchMetricsRegistry(Set tags, String metricGrpPrefix) { /* Client level */ - String groupName = metricGrpPrefix + "-fetch-manager-metrics"; + this.groupName = metricGrpPrefix + "-fetch-manager-metrics"; this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, "The average number of bytes fetched per request", tags); @@ -98,4 +100,8 @@ public ShareFetchMetricsRegistry(Set tags, String metricGrpPrefix) { this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName, "The maximum throttle time in ms", tags); } + + public String groupName() { + return groupName; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java new file mode 100644 index 0000000000000..2de6b882a2c0f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.SampledStat; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; + +/** + * Utility class that serves as a common abstraction point + */ +public abstract class AbstractConsumerMetricsManager implements AutoCloseable { + + private final Metrics metrics; + private final String metricGroupName; + private final Set metricNames; + private final Set sensors; + + protected AbstractConsumerMetricsManager(Metrics metrics, String metricGroupName) { + this.metrics = Objects.requireNonNull(metrics); + this.metricGroupName = Objects.requireNonNull(metricGroupName); + this.metricNames = new HashSet<>(); + this.sensors = new HashSet<>(); + } + + protected MetricName metricName(String name, String description) { + MetricName metricName = metrics.metricName(name, metricGroupName, description); + metricNames.add(metricName); + return metricName; + } + + protected MetricName metricInstance(MetricNameTemplate template, Map tags) { + MetricName metricName = metrics.metricInstance(template, tags); + metricNames.add(metricName); + return metricName; + } + + protected void addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider metricValueProvider) { + metrics.addMetricIfAbsent(metricName, config, metricValueProvider); + metricNames.add(metricName); + } + + protected void addMetric(MetricName metricName, Measurable measurable) { + metrics.addMetric(metricName, measurable); + metricNames.add(metricName); + } + + protected void removeMetric(MetricName metricName) { + metrics.removeMetric(metricName); + metricNames.remove(metricName); + } + + protected Sensor sensor(String name) { + Sensor sensor = metrics.sensor(name); + sensors.add(sensor); + return sensor; + } + + protected Sensor getSensor(String name) { + Sensor sensor = metrics.getSensor(name); + + if (sensor != null) + sensors.add(sensor); + + return sensor; + } + + protected void removeSensor(String name) { + Sensor s = getSensor(name); + metrics.removeSensor(name); + sensors.remove(s); + } + + protected SensorBuilder sensorBuilder(String name) { + return new SensorBuilder(name); + } + + protected SensorBuilder sensorBuilder(String name, Supplier> tagsSupplier) { + return new SensorBuilder(name, tagsSupplier); + } + + @Override + public final void close() { + sensors.forEach(s -> { + metrics.removeSensor(s.name()); + }); + + metricNames.forEach(metrics::removeMetric); + } + + /** + * {@code SensorBuilder} takes a bit of the boilerplate out of creating {@link Sensor sensors} for recording + * {@link Metric metrics}. + */ + public class SensorBuilder { + + private final Sensor sensor; + + private final boolean preexisting; + + private final Map tags; + + public SensorBuilder(String name) { + this(name, Collections::emptyMap); + } + + public SensorBuilder(String name, Supplier> tagsSupplier) { + Sensor s = getSensor(name); + + if (s != null) { + sensor = s; + tags = Collections.emptyMap(); + preexisting = true; + } else { + sensor = sensor(name); + sensors.add(sensor); + tags = tagsSupplier.get(); + preexisting = false; + } + } + + public SensorBuilder withAvg(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Avg()); + + return this; + } + + public SensorBuilder withMin(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Min()); + + return this; + } + + public SensorBuilder withMax(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Max()); + + return this; + } + + public SensorBuilder withValue(MetricNameTemplate name) { + if (!preexisting) + sensor.add(metricInstance(name, tags), new Value()); + + return this; + } + + public SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate totalName) { + if (!preexisting) { + sensor.add(new Meter(metricInstance(rateName, tags), metricInstance(totalName, tags))); + } + + return this; + } + + public SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate rateName, MetricNameTemplate totalName) { + if (!preexisting) { + sensor.add(new Meter(sampledStat, metricInstance(rateName, tags), metricInstance(totalName, tags))); + } + + return this; + } + + public Sensor build() { + return sensor; + } + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java index 2f90440a66244..271c29c0163df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java @@ -22,10 +22,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Value; -import java.util.Arrays; - -public class AsyncConsumerMetrics implements AutoCloseable { - private final Metrics metrics; +public class AsyncConsumerMetrics extends AbstractConsumerMetricsManager { public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; @@ -48,150 +45,136 @@ public class AsyncConsumerMetrics implements AutoCloseable { private final Sensor unsentRequestsQueueSizeSensor; private final Sensor unsentRequestsQueueTimeSensor; + @SuppressWarnings({"this-escape"}) public AsyncConsumerMetrics(Metrics metrics, String groupName) { - this.metrics = metrics; - this.timeBetweenNetworkThreadPollSensor = metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); + super(metrics, groupName); + + this.timeBetweenNetworkThreadPollSensor = sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME); this.timeBetweenNetworkThreadPollSensor.add( - metrics.metricName( + metricName( "time-between-network-thread-poll-avg", - groupName, "The average time taken, in milliseconds, between each poll in the network thread." ), new Avg() ); this.timeBetweenNetworkThreadPollSensor.add( - metrics.metricName( + metricName( "time-between-network-thread-poll-max", - groupName, "The maximum time taken, in milliseconds, between each poll in the network thread." ), new Max() ); - this.applicationEventQueueSizeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.applicationEventQueueSizeSensor = sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME); this.applicationEventQueueSizeSensor.add( - metrics.metricName( + metricName( APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME, - groupName, "The current number of events in the queue to send from the application thread to the background thread." ), new Value() ); - this.applicationEventQueueTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME); + this.applicationEventQueueTimeSensor = sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME); this.applicationEventQueueTimeSensor.add( - metrics.metricName( + metricName( "application-event-queue-time-avg", - groupName, "The average time, in milliseconds, that application events are taking to be dequeued." ), new Avg() ); this.applicationEventQueueTimeSensor.add( - metrics.metricName( + metricName( "application-event-queue-time-max", - groupName, "The maximum time, in milliseconds, that an application event took to be dequeued." ), new Max() ); - this.applicationEventQueueProcessingTimeSensor = metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.applicationEventQueueProcessingTimeSensor = sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); this.applicationEventQueueProcessingTimeSensor.add( - metrics.metricName( + metricName( "application-event-queue-processing-time-avg", - groupName, "The average time, in milliseconds, that the background thread takes to process all available application events." ), new Avg() ); this.applicationEventQueueProcessingTimeSensor.add( - metrics.metricName("application-event-queue-processing-time-max", - groupName, + metricName("application-event-queue-processing-time-max", "The maximum time, in milliseconds, that the background thread took to process all available application events." ), new Max() ); - this.applicationEventExpiredSizeSensor = metrics.sensor(APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME); + this.applicationEventExpiredSizeSensor = sensor(APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME); this.applicationEventExpiredSizeSensor.add( - metrics.metricName( + metricName( APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME, - groupName, "The current number of expired application events." ), new Value() ); - this.unsentRequestsQueueSizeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME); + this.unsentRequestsQueueSizeSensor = sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME); this.unsentRequestsQueueSizeSensor.add( - metrics.metricName( + metricName( UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME, - groupName, "The current number of unsent requests in the background thread." ), new Value() ); - this.unsentRequestsQueueTimeSensor = metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME); + this.unsentRequestsQueueTimeSensor = sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME); this.unsentRequestsQueueTimeSensor.add( - metrics.metricName( + metricName( "unsent-requests-queue-time-avg", - groupName, "The average time, in milliseconds, that requests are taking to be sent in the background thread." ), new Avg() ); this.unsentRequestsQueueTimeSensor.add( - metrics.metricName( + metricName( "unsent-requests-queue-time-max", - groupName, "The maximum time, in milliseconds, that a request remained unsent in the background thread." ), new Max() ); - this.backgroundEventQueueSizeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME); + this.backgroundEventQueueSizeSensor = sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME); this.backgroundEventQueueSizeSensor.add( - metrics.metricName( + metricName( BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, - groupName, "The current number of events in the queue to send from the background thread to the application thread." ), new Value() ); - this.backgroundEventQueueTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME); + this.backgroundEventQueueTimeSensor = sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME); this.backgroundEventQueueTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-time-avg", - groupName, "The average time, in milliseconds, that background events are taking to be dequeued." ), new Avg() ); this.backgroundEventQueueTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-time-max", - groupName, "The maximum time, in milliseconds, that background events are taking to be dequeued." ), new Max() ); - this.backgroundEventQueueProcessingTimeSensor = metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); + this.backgroundEventQueueProcessingTimeSensor = sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME); this.backgroundEventQueueProcessingTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-processing-time-avg", - groupName, "The average time, in milliseconds, that the consumer took to process all available background events." ), new Avg() ); this.backgroundEventQueueProcessingTimeSensor.add( - metrics.metricName( + metricName( "background-event-queue-processing-time-max", - groupName, "The maximum time, in milliseconds, that the consumer took to process all available background events." ), new Max() @@ -237,20 +220,4 @@ public void recordBackgroundEventQueueTime(long time) { public void recordBackgroundEventQueueProcessingTime(long processingTime) { this.backgroundEventQueueProcessingTimeSensor.record(processingTime); } - - @Override - public void close() { - Arrays.asList( - timeBetweenNetworkThreadPollSensor.name(), - applicationEventQueueSizeSensor.name(), - applicationEventQueueTimeSensor.name(), - applicationEventQueueProcessingTimeSensor.name(), - applicationEventExpiredSizeSensor.name(), - backgroundEventQueueSizeSensor.name(), - backgroundEventQueueTimeSensor.name(), - backgroundEventQueueProcessingTimeSensor.name(), - unsentRequestsQueueSizeSensor.name(), - unsentRequestsQueueTimeSensor.name() - ).forEach(metrics::removeSensor); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java index e271dee526172..d8b38725b82ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java @@ -51,38 +51,37 @@ public final class ConsumerRebalanceMetricsManager extends RebalanceMetricsManag public final MetricName assignedPartitionsCount; private long lastRebalanceEndMs = -1L; private long lastRebalanceStartMs = -1L; - private final Metrics metrics; + @SuppressWarnings({"this-escape"}) public ConsumerRebalanceMetricsManager(Metrics metrics, SubscriptionState subscriptions) { - super(CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); - this.metrics = metrics; + super(metrics, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); - rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg", + rebalanceLatencyAvg = metricName("rebalance-latency-avg", "The average time in ms taken for a group to complete a rebalance"); - rebalanceLatencyMax = createMetric(metrics, "rebalance-latency-max", + rebalanceLatencyMax = metricName("rebalance-latency-max", "The max time in ms taken for a group to complete a rebalance"); - rebalanceLatencyTotal = createMetric(metrics, "rebalance-latency-total", + rebalanceLatencyTotal = metricName("rebalance-latency-total", "The total number of milliseconds spent in rebalances"); - rebalanceTotal = createMetric(metrics, "rebalance-total", + rebalanceTotal = metricName("rebalance-total", "The total number of rebalance events"); - rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour", + rebalanceRatePerHour = metricName("rebalance-rate-per-hour", "The number of rebalance events per hour"); - failedRebalanceTotal = createMetric(metrics, "failed-rebalance-total", + failedRebalanceTotal = metricName("failed-rebalance-total", "The total number of failed rebalance events"); - failedRebalanceRate = createMetric(metrics, "failed-rebalance-rate-per-hour", + failedRebalanceRate = metricName("failed-rebalance-rate-per-hour", "The number of failed rebalance events per hour"); - assignedPartitionsCount = createMetric(metrics, "assigned-partitions", + assignedPartitionsCount = metricName("assigned-partitions", "The number of partitions currently assigned to this consumer"); registerAssignedPartitionCount(subscriptions); - successfulRebalanceSensor = metrics.sensor("rebalance-latency"); + successfulRebalanceSensor = sensor("rebalance-latency"); successfulRebalanceSensor.add(rebalanceLatencyAvg, new Avg()); successfulRebalanceSensor.add(rebalanceLatencyMax, new Max()); successfulRebalanceSensor.add(rebalanceLatencyTotal, new CumulativeSum()); successfulRebalanceSensor.add(rebalanceTotal, new CumulativeCount()); successfulRebalanceSensor.add(rebalanceRatePerHour, new Rate(TimeUnit.HOURS, new WindowedCount())); - failedRebalanceSensor = metrics.sensor("failed-rebalance"); + failedRebalanceSensor = sensor("failed-rebalance"); failedRebalanceSensor.add(failedRebalanceTotal, new CumulativeSum()); failedRebalanceSensor.add(failedRebalanceRate, new Rate(TimeUnit.HOURS, new WindowedCount())); @@ -92,10 +91,10 @@ public ConsumerRebalanceMetricsManager(Metrics metrics, SubscriptionState subscr else return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS); }; - lastRebalanceSecondsAgo = createMetric(metrics, + lastRebalanceSecondsAgo = metricName( "last-rebalance-seconds-ago", "The number of seconds since the last rebalance event"); - metrics.addMetric(lastRebalanceSecondsAgo, lastRebalance); + addMetric(lastRebalanceSecondsAgo, lastRebalance); } public void recordRebalanceStarted(long nowMs) { @@ -125,6 +124,6 @@ public boolean rebalanceStarted() { */ private void registerAssignedPartitionCount(SubscriptionState subscriptions) { Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions(); - metrics.addMetric(assignedPartitionsCount, numParts); + addMetric(assignedPartitionsCount, numParts); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java index 926e267d98907..0dd34bf55eacf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java @@ -29,7 +29,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; -public class HeartbeatMetricsManager { +public class HeartbeatMetricsManager extends AbstractConsumerMetricsManager { // MetricName visible for testing final MetricName heartbeatResponseTimeMax; final MetricName heartbeatRate; @@ -42,17 +42,17 @@ public HeartbeatMetricsManager(Metrics metrics) { this(metrics, CONSUMER_METRIC_GROUP_PREFIX); } + @SuppressWarnings({"this-escape"}) public HeartbeatMetricsManager(Metrics metrics, String metricGroupPrefix) { - final String metricGroupName = metricGroupPrefix + COORDINATOR_METRICS_SUFFIX; - heartbeatSensor = metrics.sensor("heartbeat-latency"); - heartbeatResponseTimeMax = metrics.metricName("heartbeat-response-time-max", - metricGroupName, + super(metrics, metricGroupPrefix + COORDINATOR_METRICS_SUFFIX); + heartbeatSensor = sensor("heartbeat-latency"); + heartbeatResponseTimeMax = metricName("heartbeat-response-time-max", "The max time taken to receive a response to a heartbeat request"); heartbeatSensor.add(heartbeatResponseTimeMax, new Max()); // windowed meters - heartbeatRate = metrics.metricName("heartbeat-rate", metricGroupName, "The number of heartbeats per second"); - heartbeatTotal = metrics.metricName("heartbeat-total", metricGroupName, "The total number of heartbeats"); + heartbeatRate = metricName("heartbeat-rate", "The number of heartbeats per second"); + heartbeatTotal = metricName("heartbeat-total", "The total number of heartbeats"); heartbeatSensor.add(new Meter(new WindowedCount(), heartbeatRate, heartbeatTotal)); @@ -65,10 +65,9 @@ public HeartbeatMetricsManager(Metrics metrics, String metricGroupPrefix) { else return TimeUnit.SECONDS.convert(now - lastHeartbeatSend, TimeUnit.MILLISECONDS); }; - lastHeartbeatSecondsAgo = metrics.metricName("last-heartbeat-seconds-ago", - metricGroupName, + lastHeartbeatSecondsAgo = metricName("last-heartbeat-seconds-ago", "The number of seconds since the last coordinator heartbeat was sent"); - metrics.addMetric(lastHeartbeatSecondsAgo, lastHeartbeat); + addMetric(lastHeartbeatSecondsAgo, lastHeartbeat); } public void recordHeartbeatSentMs(long timeMs) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java index 1b2bb4518f979..3c454d99a127a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java @@ -28,9 +28,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; -public class KafkaConsumerMetrics implements AutoCloseable { - private final Metrics metrics; - private final MetricName lastPollMetricName; +public class KafkaConsumerMetrics extends AbstractConsumerMetricsManager { private final Sensor timeBetweenPollSensor; private final Sensor pollIdleSensor; private final Sensor committedSensor; @@ -39,9 +37,9 @@ public class KafkaConsumerMetrics implements AutoCloseable { private long pollStartMs; private long timeSinceLastPollMs; + @SuppressWarnings({"this-escape"}) public KafkaConsumerMetrics(Metrics metrics) { - this.metrics = metrics; - final String metricGroupName = CONSUMER_METRIC_GROUP; + super(metrics, CONSUMER_METRIC_GROUP); Measurable lastPoll = (mConfig, now) -> { if (lastPollMs == 0L) // if no poll is ever triggered, just return -1. @@ -49,41 +47,36 @@ public KafkaConsumerMetrics(Metrics metrics) { else return TimeUnit.SECONDS.convert(now - lastPollMs, TimeUnit.MILLISECONDS); }; - this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago", - metricGroupName, "The number of seconds since the last poll() invocation."); - metrics.addMetric(lastPollMetricName, lastPoll); + MetricName lastPollMetricName = metricName("last-poll-seconds-ago", + "The number of seconds since the last poll() invocation."); + addMetric(lastPollMetricName, lastPoll); - this.timeBetweenPollSensor = metrics.sensor("time-between-poll"); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg", - metricGroupName, + this.timeBetweenPollSensor = sensor("time-between-poll"); + this.timeBetweenPollSensor.add(metricName("time-between-poll-avg", "The average delay between invocations of poll() in milliseconds."), new Avg()); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-max", - metricGroupName, + this.timeBetweenPollSensor.add(metricName("time-between-poll-max", "The max delay between invocations of poll() in milliseconds."), new Max()); - this.pollIdleSensor = metrics.sensor("poll-idle-ratio-avg"); - this.pollIdleSensor.add(metrics.metricName("poll-idle-ratio-avg", - metricGroupName, + this.pollIdleSensor = sensor("poll-idle-ratio-avg"); + this.pollIdleSensor.add(metricName("poll-idle-ratio-avg", "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."), new Avg()); - this.commitSyncSensor = metrics.sensor("commit-sync-time-ns-total"); + this.commitSyncSensor = sensor("commit-sync-time-ns-total"); this.commitSyncSensor.add( - metrics.metricName( + metricName( "commit-sync-time-ns-total", - metricGroupName, "The total time the consumer has spent in commitSync in nanoseconds" ), new CumulativeSum() ); - this.committedSensor = metrics.sensor("committed-time-ns-total"); + this.committedSensor = sensor("committed-time-ns-total"); this.committedSensor.add( - metrics.metricName( + metricName( "committed-time-ns-total", - metricGroupName, "The total time the consumer has spent in committed in nanoseconds" ), new CumulativeSum() @@ -110,13 +103,4 @@ public void recordCommitSync(long duration) { public void recordCommitted(long duration) { this.committedSensor.record(duration); } - - @Override - public void close() { - metrics.removeMetric(lastPollMetricName); - metrics.removeSensor(timeBetweenPollSensor.name()); - metrics.removeSensor(pollIdleSensor.name()); - metrics.removeSensor(commitSyncSensor.name()); - metrics.removeSensor(committedSensor.name()); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java index e154b97da5a80..df99a012cc12f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java @@ -27,18 +27,16 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP; -public class KafkaShareConsumerMetrics implements AutoCloseable { - private final Metrics metrics; - private final MetricName lastPollMetricName; +public class KafkaShareConsumerMetrics extends AbstractConsumerMetricsManager { private final Sensor timeBetweenPollSensor; private final Sensor pollIdleSensor; private long lastPollMs; private long pollStartMs; private long timeSinceLastPollMs; + @SuppressWarnings({"this-escape"}) public KafkaShareConsumerMetrics(Metrics metrics) { - this.metrics = metrics; - final String metricGroupName = CONSUMER_SHARE_METRIC_GROUP; + super(metrics, CONSUMER_SHARE_METRIC_GROUP); Measurable lastPoll = (mConfig, now) -> { if (lastPollMs == 0L) // if no poll is ever triggered, just return -1. @@ -46,23 +44,20 @@ public KafkaShareConsumerMetrics(Metrics metrics) { else return TimeUnit.SECONDS.convert(now - lastPollMs, TimeUnit.MILLISECONDS); }; - this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago", - metricGroupName, "The number of seconds since the last poll() invocation."); - metrics.addMetric(lastPollMetricName, lastPoll); + MetricName lastPollMetricName = metricName("last-poll-seconds-ago", + "The number of seconds since the last poll() invocation."); + addMetric(lastPollMetricName, lastPoll); - this.timeBetweenPollSensor = metrics.sensor("time-between-poll"); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg", - metricGroupName, + this.timeBetweenPollSensor = sensor("time-between-poll"); + this.timeBetweenPollSensor.add(metricName("time-between-poll-avg", "The average delay between invocations of poll() in milliseconds."), new Avg()); - this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-max", - metricGroupName, + this.timeBetweenPollSensor.add(metricName("time-between-poll-max", "The max delay between invocations of poll() in milliseconds."), new Max()); - this.pollIdleSensor = metrics.sensor("poll-idle-ratio-avg"); - this.pollIdleSensor.add(metrics.metricName("poll-idle-ratio-avg", - metricGroupName, + this.pollIdleSensor = sensor("poll-idle-ratio-avg"); + this.pollIdleSensor.add(metricName("poll-idle-ratio-avg", "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."), new Avg()); } @@ -79,11 +74,4 @@ public void recordPollEnd(long pollEndMs) { double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); this.pollIdleSensor.record(pollIdleRatio); } - - @Override - public void close() { - metrics.removeMetric(lastPollMetricName); - metrics.removeSensor(timeBetweenPollSensor.name()); - metrics.removeSensor(pollIdleSensor.name()); - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java index d700299ef1801..f9c2d7456f834 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java @@ -27,29 +27,26 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; -public class OffsetCommitMetricsManager { +public class OffsetCommitMetricsManager extends AbstractConsumerMetricsManager { final MetricName commitLatencyAvg; final MetricName commitLatencyMax; final MetricName commitRate; final MetricName commitTotal; private final Sensor commitSensor; + @SuppressWarnings({"this-escape"}) public OffsetCommitMetricsManager(Metrics metrics) { - final String metricGroupName = CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX; - commitSensor = metrics.sensor("commit-latency"); - commitLatencyAvg = metrics.metricName("commit-latency-avg", - metricGroupName, + super(metrics, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); + commitSensor = sensor("commit-latency"); + commitLatencyAvg = metricName("commit-latency-avg", "The average time taken for a commit request"); commitSensor.add(commitLatencyAvg, new Avg()); - commitLatencyMax = metrics.metricName("commit-latency-max", - metricGroupName, + commitLatencyMax = metricName("commit-latency-max", "The max time taken for a commit request"); commitSensor.add(commitLatencyMax, new Max()); - commitRate = metrics.metricName("commit-rate", - metricGroupName, + commitRate = metricName("commit-rate", "The number of commit calls per second"); - commitTotal = metrics.metricName("commit-total", - metricGroupName, + commitTotal = metricName("commit-total", "The total number of commit calls"); commitSensor.add(new Meter(new WindowedCount(), commitRate, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java index f70b891864f1c..fafc7c926eccb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java @@ -25,7 +25,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; -public class RebalanceCallbackMetricsManager { +public class RebalanceCallbackMetricsManager extends AbstractConsumerMetricsManager { final MetricName partitionRevokeLatencyAvg; final MetricName partitionAssignLatencyAvg; final MetricName partitionLostLatencyAvg; @@ -40,35 +40,30 @@ public RebalanceCallbackMetricsManager(Metrics metrics) { this(metrics, CONSUMER_METRIC_GROUP_PREFIX); } + @SuppressWarnings({"this-escape"}) public RebalanceCallbackMetricsManager(Metrics metrics, String grpMetricsPrefix) { - final String metricGroupName = grpMetricsPrefix + COORDINATOR_METRICS_SUFFIX; - partitionRevokeCallbackSensor = metrics.sensor("partition-revoked-latency"); - partitionRevokeLatencyAvg = metrics.metricName("partition-revoked-latency-avg", - metricGroupName, + super(metrics, grpMetricsPrefix + COORDINATOR_METRICS_SUFFIX); + partitionRevokeCallbackSensor = sensor("partition-revoked-latency"); + partitionRevokeLatencyAvg = metricName("partition-revoked-latency-avg", "The average time taken for a partition-revoked rebalance listener callback"); partitionRevokeCallbackSensor.add(partitionRevokeLatencyAvg, new Avg()); - partitionRevokeLatencyMax = metrics.metricName("partition-revoked-latency-max", - metricGroupName, + partitionRevokeLatencyMax = metricName("partition-revoked-latency-max", "The max time taken for a partition-revoked rebalance listener callback"); partitionRevokeCallbackSensor.add(partitionRevokeLatencyMax, new Max()); - partitionAssignCallbackSensor = metrics.sensor("partition-assigned-latency"); - partitionAssignLatencyAvg = metrics.metricName("partition-assigned-latency-avg", - metricGroupName, + partitionAssignCallbackSensor = sensor("partition-assigned-latency"); + partitionAssignLatencyAvg = metricName("partition-assigned-latency-avg", "The average time taken for a partition-assigned rebalance listener callback"); partitionAssignCallbackSensor.add(partitionAssignLatencyAvg, new Avg()); - partitionAssignLatencyMax = metrics.metricName("partition-assigned-latency-max", - metricGroupName, + partitionAssignLatencyMax = metricName("partition-assigned-latency-max", "The max time taken for a partition-assigned rebalance listener callback"); partitionAssignCallbackSensor.add(partitionAssignLatencyMax, new Max()); - partitionLostCallbackSensor = metrics.sensor("partition-lost-latency"); - partitionLostLatencyAvg = metrics.metricName("partition-lost-latency-avg", - metricGroupName, + partitionLostCallbackSensor = sensor("partition-lost-latency"); + partitionLostLatencyAvg = metricName("partition-lost-latency-avg", "The average time taken for a partition-lost rebalance listener callback"); partitionLostCallbackSensor.add(partitionLostLatencyAvg, new Avg()); - partitionLostLatencyMax = metrics.metricName("partition-lost-latency-max", - metricGroupName, + partitionLostLatencyMax = metricName("partition-lost-latency-max", "The max time taken for a partition-lost rebalance listener callback"); partitionLostCallbackSensor.add(partitionLostLatencyMax, new Max()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java index 16ad1b39817f5..51ea2e4ff620b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java @@ -16,18 +16,12 @@ */ package org.apache.kafka.clients.consumer.internals.metrics; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; -public abstract class RebalanceMetricsManager { - protected final String metricGroupName; +public abstract class RebalanceMetricsManager extends AbstractConsumerMetricsManager { - RebalanceMetricsManager(String metricGroupName) { - this.metricGroupName = metricGroupName; - } - - protected MetricName createMetric(Metrics metrics, String name, String description) { - return metrics.metricName(name, metricGroupName, description); + RebalanceMetricsManager(Metrics metrics, String metricGroupName) { + super(metrics, metricGroupName); } public abstract void recordRebalanceStarted(long nowMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java index 0760eaa6d5cfd..bac33a2b6bdd2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java @@ -35,15 +35,16 @@ public final class ShareRebalanceMetricsManager extends RebalanceMetricsManager private long lastRebalanceEndMs = -1L; private long lastRebalanceStartMs = -1L; + @SuppressWarnings({"this-escape"}) public ShareRebalanceMetricsManager(Metrics metrics) { - super(CONSUMER_SHARE_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); + super(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX); - rebalanceTotal = createMetric(metrics, "rebalance-total", + rebalanceTotal = metricName("rebalance-total", "The total number of rebalance events"); - rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour", + rebalanceRatePerHour = metricName("rebalance-rate-per-hour", "The number of rebalance events per hour"); - rebalanceSensor = metrics.sensor("rebalance-latency"); + rebalanceSensor = sensor("rebalance-latency"); rebalanceSensor.add(rebalanceTotal, new CumulativeCount()); rebalanceSensor.add(rebalanceRatePerHour, new Rate(TimeUnit.HOURS, new WindowedCount())); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 78ff15cee5f8e..9334fe610250b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -389,6 +389,19 @@ public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol consumer.close(CloseOptions.timeout(Duration.ZERO)); } + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testMetricsRemovedOnClose(GroupProtocol groupProtocol) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + + assertTrue(consumer.metrics().size() > 1, "The consumer should have created many metrics"); + consumer.close(CloseOptions.timeout(Duration.ZERO)); + assertTrue(consumer.metrics().size() <= 1, "The consumer should have removed all of its metrics"); + } + @ParameterizedTest @EnumSource(GroupProtocol.class) public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8e44b3fcc25d5..f9d39313c7bf3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -59,6 +59,7 @@ import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.Node; @@ -260,6 +261,8 @@ private AsyncKafkaConsumer newConsumer( "client-id", new Deserializers<>(new StringDeserializer(), new StringDeserializer(), metrics), fetchBuffer, + mock(FetchMetricsManager.class), + mock(RebalanceCallbackMetricsManager.class), fetchCollector, interceptors, time, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java new file mode 100644 index 0000000000000..1917ddd6df9b0 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public abstract class AbstractConsumerMetricsManagerTest { + + protected abstract AbstractConsumerMetricsManager metricsManager(Metrics metrics, String groupDescription); + + @Test + public void testCleanup() { + try (Metrics metrics = new Metrics()) { + int metricCount = metrics.metrics().size(); + + try (AbstractConsumerMetricsManager metricsManager = metricsManager(metrics, "test")) { + assertTrue(metrics.metrics().size() > metricCount); + } + + assertEquals(metricCount, metrics.metrics().size()); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java index 876bc3ffa12da..bec5ec2785d4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java @@ -32,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class AsyncConsumerMetricsTest { +public class AsyncConsumerMetricsTest extends AbstractConsumerMetricsManagerTest { private static final long METRIC_VALUE = 123L; private final Metrics metrics = new Metrics(); @@ -53,6 +53,11 @@ public void tearDown() { metrics.close(); } + @Override + protected AbstractConsumerMetricsManager metricsManager(Metrics metrics, String groupDescription) { + return new AsyncConsumerMetrics(metrics, groupDescription); + } + @ParameterizedTest @MethodSource("groupNameProvider") public void shouldMetricNames(String groupName) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java index a7d9122776750..fcd950f59a21c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java @@ -34,11 +34,17 @@ import static org.mockito.Mockito.mock; -class ConsumerRebalanceMetricsManagerTest { +class ConsumerRebalanceMetricsManagerTest extends AbstractConsumerMetricsManagerTest { private final Time time = new MockTime(); private final Metrics metrics = new Metrics(time); + @Override + protected AbstractConsumerMetricsManager metricsManager(Metrics metrics, String groupDescription) { + SubscriptionState subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST); + return new ConsumerRebalanceMetricsManager(metrics, subscriptionState); + } + @Test public void testAssignedPartitionCountMetric() { SubscriptionState subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);