Skip to content

Commit 848e3d0

Browse files
authored
KAFKA-19722: Adding missing metric assigned-partitions for new consumer (#20557)
Adding the missing metric to track the number of partitions assigned. This metric should be registered whenever the consumer is using a groupId, and should track the number of partitions from the subscription state, regardless of the subscription type (manual or automatic). This PR registers the missing metric as part of the ConsumerRebalanceMetricsManager setup. This manager is created if there is a group ID, and reused by the consumer membershipMgr and the streamsMemberhipMgr, so we ensure that the metric is registered for the new consumer and streams. Reviewers: Lucas Brutschy <[email protected]>, TengYao Chi <[email protected]>
1 parent cfa0b41 commit 848e3d0

File tree

8 files changed

+141
-5
lines changed

8 files changed

+141
-5
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public ConsumerMembershipManager(String groupId,
163163
logContext,
164164
backgroundEventHandler,
165165
time,
166-
new ConsumerRebalanceMetricsManager(metrics),
166+
new ConsumerRebalanceMetricsManager(metrics, subscriptions),
167167
autoCommitEnabled);
168168
}
169169

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public StreamsMembershipManager(final String groupId,
296296
this.backgroundEventHandler = backgroundEventHandler;
297297
this.streamsRebalanceData = streamsRebalanceData;
298298
this.subscriptionState = subscriptionState;
299-
metricsManager = new ConsumerRebalanceMetricsManager(metrics);
299+
metricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState);
300300
this.time = time;
301301
}
302302

clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ public synchronized List<TopicPartition> assignedPartitionsList() {
477477
* Provides the number of assigned partitions in a thread safe manner.
478478
* @return the number of assigned partitions.
479479
*/
480-
synchronized int numAssignedPartitions() {
480+
public synchronized int numAssignedPartitions() {
481481
return this.assignment.size();
482482
}
483483

clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.kafka.clients.consumer.internals.metrics;
1818

19+
import org.apache.kafka.clients.consumer.KafkaConsumer;
20+
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
1921
import org.apache.kafka.common.MetricName;
2022
import org.apache.kafka.common.metrics.Measurable;
2123
import org.apache.kafka.common.metrics.Metrics;
@@ -27,7 +29,9 @@
2729
import org.apache.kafka.common.metrics.stats.Rate;
2830
import org.apache.kafka.common.metrics.stats.WindowedCount;
2931

32+
import java.util.Collection;
3033
import java.util.concurrent.TimeUnit;
34+
import java.util.regex.Pattern;
3135

3236
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
3337
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
@@ -44,11 +48,14 @@ public final class ConsumerRebalanceMetricsManager extends RebalanceMetricsManag
4448
public final MetricName lastRebalanceSecondsAgo;
4549
public final MetricName failedRebalanceTotal;
4650
public final MetricName failedRebalanceRate;
51+
public final MetricName assignedPartitionsCount;
4752
private long lastRebalanceEndMs = -1L;
4853
private long lastRebalanceStartMs = -1L;
54+
private final Metrics metrics;
4955

50-
public ConsumerRebalanceMetricsManager(Metrics metrics) {
56+
public ConsumerRebalanceMetricsManager(Metrics metrics, SubscriptionState subscriptions) {
5157
super(CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX);
58+
this.metrics = metrics;
5259

5360
rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg",
5461
"The average time in ms taken for a group to complete a rebalance");
@@ -64,6 +71,9 @@ public ConsumerRebalanceMetricsManager(Metrics metrics) {
6471
"The total number of failed rebalance events");
6572
failedRebalanceRate = createMetric(metrics, "failed-rebalance-rate-per-hour",
6673
"The number of failed rebalance events per hour");
74+
assignedPartitionsCount = createMetric(metrics, "assigned-partitions",
75+
"The number of partitions currently assigned to this consumer");
76+
registerAssignedPartitionCount(subscriptions);
6777

6878
successfulRebalanceSensor = metrics.sensor("rebalance-latency");
6979
successfulRebalanceSensor.add(rebalanceLatencyAvg, new Avg());
@@ -106,4 +116,15 @@ public void maybeRecordRebalanceFailed() {
106116
public boolean rebalanceStarted() {
107117
return lastRebalanceStartMs > lastRebalanceEndMs;
108118
}
119+
120+
/**
121+
* Register metric to track the number of assigned partitions.
122+
* It will consider partitions assigned to the consumer
123+
* regardless of whether they were assigned via {@link KafkaConsumer#subscribe(Pattern)} or
124+
* {@link KafkaConsumer#assign(Collection)}
125+
*/
126+
private void registerAssignedPartitionCount(SubscriptionState subscriptions) {
127+
Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions();
128+
metrics.addMetric(assignedPartitionsCount, numParts);
129+
}
109130
}

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@
155155

156156
import static java.util.Collections.singletonList;
157157
import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON;
158+
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
159+
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
158160
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
159161
import static org.apache.kafka.common.utils.Utils.propsToMap;
160162
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -271,6 +273,35 @@ public void testSubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(
271273
}
272274
}
273275

276+
@ParameterizedTest
277+
@EnumSource(GroupProtocol.class)
278+
public void testAssignedPartitionsMetrics(GroupProtocol groupProtocol) throws InterruptedException {
279+
consumer = newConsumer(groupProtocol, time, mock(KafkaClient.class), subscription,
280+
mock(ConsumerMetadata.class), assignor, false, groupInstanceId);
281+
Metrics metrics = consumer.metricsRegistry();
282+
283+
// This metric is added in the background thread for the AsyncConsumer, so waiting on it to avoid flakiness.
284+
TestUtils.waitForCondition(() -> getMetric(metrics, "assigned-partitions") != null,
285+
"Consumer should register the assigned-partitions metric");
286+
assertNotNull(getMetric(metrics, "assigned-partitions"));
287+
assertEquals(0.0d, getMetric(metrics, "assigned-partitions").metricValue());
288+
289+
subscription.assignFromUser(Set.of(tp0));
290+
assertEquals(1.0d, getMetric(metrics, "assigned-partitions").metricValue());
291+
292+
subscription.assignFromUser(Set.of(tp0, tp1));
293+
assertEquals(2.0d, getMetric(metrics, "assigned-partitions").metricValue());
294+
295+
subscription.unsubscribe();
296+
subscription.subscribe(Set.of(topic), Optional.empty());
297+
subscription.assignFromSubscribed(Set.of(tp0));
298+
assertEquals(1.0d, getMetric(metrics, "assigned-partitions").metricValue());
299+
}
300+
301+
private KafkaMetric getMetric(Metrics metrics, String name) {
302+
return metrics.metrics().get(metrics.metricName(name, CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX));
303+
}
304+
274305
@ParameterizedTest
275306
@EnumSource(GroupProtocol.class)
276307
public void testUnsubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171

7272
import static org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
7373
import static org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks;
74+
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
75+
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
7476
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
7577
import static org.apache.kafka.common.utils.Utils.mkEntry;
7678
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -125,7 +127,7 @@ public void setup() {
125127
time = new MockTime(0);
126128
backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, mock(AsyncConsumerMetrics.class));
127129
metrics = new Metrics(time);
128-
rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics);
130+
rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState);
129131

130132
when(commitRequestManager.maybeAutoCommitSyncBeforeRebalance(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
131133
}
@@ -181,6 +183,15 @@ public void testMembershipManagerRackId() {
181183
assertEquals(Optional.of("rack1"), membershipManager.rackId());
182184
}
183185

186+
@Test
187+
public void testAssignedPartitionCountMetricRegistered() {
188+
MetricName metricName = metrics.metricName(
189+
"assigned-partitions",
190+
CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX
191+
);
192+
assertNotNull(metrics.metric(metricName), "Metric assigned-partitions should have been registered");
193+
}
194+
184195
@Test
185196
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
186197
createMembershipManagerJoiningGroup();

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static org.junit.jupiter.api.Assertions.assertFalse;
6868
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
6969
import static org.junit.jupiter.api.Assertions.assertNotEquals;
70+
import static org.junit.jupiter.api.Assertions.assertNotNull;
7071
import static org.junit.jupiter.api.Assertions.assertThrows;
7172
import static org.junit.jupiter.api.Assertions.assertTrue;
7273
import static org.mockito.ArgumentMatchers.any;
@@ -131,6 +132,15 @@ public void setup() {
131132
verifyInStateUnsubscribed(membershipManager);
132133
}
133134

135+
@Test
136+
public void testAssignedPartitionCountMetricRegistered() {
137+
MetricName metricName = metrics.metricName(
138+
"assigned-partitions",
139+
CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX
140+
);
141+
assertNotNull(metrics.metric(metricName), "Metric assigned-partitions should have been registered");
142+
}
143+
134144
@Test
135145
public void testUnexpectedErrorInHeartbeatResponse() {
136146
final String errorMessage = "Nobody expects the Spanish Inquisition!";
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer.internals.metrics;
18+
19+
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
20+
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
21+
import org.apache.kafka.common.TopicPartition;
22+
import org.apache.kafka.common.metrics.Metrics;
23+
import org.apache.kafka.common.utils.LogContext;
24+
import org.apache.kafka.common.utils.MockTime;
25+
import org.apache.kafka.common.utils.Time;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.Optional;
30+
import java.util.Set;
31+
32+
import static org.junit.jupiter.api.Assertions.assertEquals;
33+
import static org.junit.jupiter.api.Assertions.assertNotNull;
34+
import static org.mockito.Mockito.mock;
35+
36+
37+
class ConsumerRebalanceMetricsManagerTest {
38+
39+
private final Time time = new MockTime();
40+
private final Metrics metrics = new Metrics(time);
41+
42+
@Test
43+
public void testAssignedPartitionCountMetric() {
44+
SubscriptionState subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);
45+
ConsumerRebalanceMetricsManager consumerRebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState);
46+
47+
assertNotNull(metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount), "Metric assigned-partitions has not been registered as expected");
48+
49+
// Check for manually assigned partitions
50+
subscriptionState.assignFromUser(Set.of(new TopicPartition("topic", 0), new TopicPartition("topic", 1)));
51+
assertEquals(2.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
52+
subscriptionState.assignFromUser(Set.of());
53+
assertEquals(0.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
54+
55+
subscriptionState.unsubscribe();
56+
assertEquals(0.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
57+
58+
// Check for automatically assigned partitions
59+
subscriptionState.subscribe(Set.of("topic"), Optional.empty());
60+
subscriptionState.assignFromSubscribed(Set.of(new TopicPartition("topic", 0)));
61+
assertEquals(1.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
62+
}
63+
}

0 commit comments

Comments
 (0)