Skip to content

Commit 2da02d9

Browse files
authored
KAFKA-19723 Adding consumer rebalance metrics test (#20565)
Added Testcases for consumer rebalance metric manager test. Reviewers: Lianet Magrans <[email protected]>, TengYao Chi <[email protected]>, Hong-Yi Chen <[email protected]>
1 parent 162db13 commit 2da02d9

File tree

1 file changed

+288
-10
lines changed

1 file changed

+288
-10
lines changed

clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java

Lines changed: 288 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,45 +19,323 @@
1919
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
2020
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
2121
import org.apache.kafka.common.TopicPartition;
22+
import org.apache.kafka.common.metrics.MetricConfig;
2223
import org.apache.kafka.common.metrics.Metrics;
2324
import org.apache.kafka.common.utils.LogContext;
2425
import org.apache.kafka.common.utils.MockTime;
2526
import org.apache.kafka.common.utils.Time;
2627

28+
import org.junit.jupiter.api.AfterEach;
29+
import org.junit.jupiter.api.BeforeEach;
2730
import org.junit.jupiter.api.Test;
2831

2932
import java.util.Optional;
3033
import java.util.Set;
3134

3235
import static org.junit.jupiter.api.Assertions.assertEquals;
36+
import static org.junit.jupiter.api.Assertions.assertFalse;
3337
import static org.junit.jupiter.api.Assertions.assertNotNull;
38+
import static org.junit.jupiter.api.Assertions.assertTrue;
3439
import static org.mockito.Mockito.mock;
3540

3641

3742
class ConsumerRebalanceMetricsManagerTest {
3843

39-
private final Time time = new MockTime();
40-
private final Metrics metrics = new Metrics(time);
44+
private Time time;
45+
private Metrics metrics;
46+
private SubscriptionState subscriptionState;
47+
private ConsumerRebalanceMetricsManager metricsManager;
48+
private MetricConfig metricConfig;
49+
private long windowSizeMs;
50+
private int numSamples;
51+
52+
@BeforeEach
53+
public void setUp() {
54+
time = new MockTime();
55+
// Use MetricConfig with its default values
56+
windowSizeMs = 30000; // 30 seconds - default value
57+
numSamples = 2; // default value
58+
metricConfig = new MetricConfig()
59+
.samples(numSamples)
60+
.timeWindow(windowSizeMs, java.util.concurrent.TimeUnit.MILLISECONDS);
61+
metrics = new Metrics(metricConfig, time);
62+
subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);
63+
metricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState);
64+
}
65+
66+
@AfterEach
67+
public void tearDown() {
68+
metrics.close();
69+
}
4170

4271
@Test
4372
public void testAssignedPartitionCountMetric() {
44-
SubscriptionState subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);
45-
ConsumerRebalanceMetricsManager consumerRebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState);
46-
47-
assertNotNull(metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount), "Metric assigned-partitions has not been registered as expected");
73+
assertNotNull(metrics.metric(metricsManager.assignedPartitionsCount), "Metric assigned-partitions has not been registered as expected");
4874

4975
// Check for manually assigned partitions
5076
subscriptionState.assignFromUser(Set.of(new TopicPartition("topic", 0), new TopicPartition("topic", 1)));
51-
assertEquals(2.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
77+
assertEquals(2.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
5278
subscriptionState.assignFromUser(Set.of());
53-
assertEquals(0.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
79+
assertEquals(0.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
5480

5581
subscriptionState.unsubscribe();
56-
assertEquals(0.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
82+
assertEquals(0.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
5783

5884
// Check for automatically assigned partitions
5985
subscriptionState.subscribe(Set.of("topic"), Optional.empty());
6086
subscriptionState.assignFromSubscribed(Set.of(new TopicPartition("topic", 0)));
61-
assertEquals(1.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
87+
assertEquals(1.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
88+
}
89+
90+
@Test
91+
public void testRebalanceTimingMetrics() {
92+
93+
// Verify timing metrics are registered
94+
assertNotNull(metrics.metric(metricsManager.rebalanceLatencyAvg));
95+
assertNotNull(metrics.metric(metricsManager.rebalanceLatencyMax));
96+
assertNotNull(metrics.metric(metricsManager.rebalanceLatencyTotal));
97+
assertNotNull(metrics.metric(metricsManager.rebalanceTotal));
98+
99+
// Record first rebalance (10ms duration)
100+
metricsManager.recordRebalanceStarted(time.milliseconds());
101+
time.sleep(10);
102+
metricsManager.recordRebalanceEnded(time.milliseconds());
103+
104+
// Verify metrics after first rebalance
105+
assertEquals(10.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue());
106+
assertEquals(10.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue());
107+
assertEquals(10.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue());
108+
assertEquals(1.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue());
109+
110+
// Record second rebalance (30ms duration)
111+
metricsManager.recordRebalanceStarted(time.milliseconds());
112+
time.sleep(30);
113+
metricsManager.recordRebalanceEnded(time.milliseconds());
114+
115+
// Verify metrics after second rebalance
116+
assertEquals(20.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
117+
"Average latency should be (10 + 30) / 2 = 20ms");
118+
assertEquals(30.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
119+
"Max latency should be max(10, 30) = 30ms");
120+
assertEquals(40.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
121+
"Total latency should be 10 + 30 = 40ms");
122+
assertEquals(2.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue());
123+
124+
// Record third rebalance (50ms duration)
125+
metricsManager.recordRebalanceStarted(time.milliseconds());
126+
time.sleep(50);
127+
metricsManager.recordRebalanceEnded(time.milliseconds());
128+
129+
// Verify metrics after third rebalance
130+
assertEquals(30.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
131+
"Average latency should be (10 + 30 + 50) / 3 = 30ms");
132+
assertEquals(50.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
133+
"Max latency should be max(10, 30, 50) = 50ms");
134+
assertEquals(90.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
135+
"Total latency should be 10 + 30 + 50 = 90ms");
136+
assertEquals(3.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue());
137+
}
138+
139+
@Test
140+
public void testRebalanceRateMetric() {
141+
142+
// Verify rate metric is registered
143+
assertNotNull(metrics.metric(metricsManager.rebalanceRatePerHour));
144+
145+
// Record 3 rebalances within 30ms total (3 x 10ms)
146+
int rebalanceCount = 3;
147+
long startTime = time.milliseconds();
148+
for (int i = 0; i < rebalanceCount; i++) {
149+
metricsManager.recordRebalanceStarted(time.milliseconds());
150+
time.sleep(10);
151+
metricsManager.recordRebalanceEnded(time.milliseconds());
152+
}
153+
long endTime = time.milliseconds();
154+
long actualElapsedMs = endTime - startTime;
155+
156+
double ratePerHour = (Double) metrics.metric(metricsManager.rebalanceRatePerHour).metricValue();
157+
158+
// The Rate metric calculation:
159+
// - Uses elapsed time from the oldest sample
160+
// - Ensures minimum window size of (numSamples - 1) * windowSizeMs
161+
// - With default config: minWindow = (2-1) * 30000 = 30000ms
162+
long minWindowMs = (numSamples - 1) * windowSizeMs; // (2-1) * 30000 = 30000ms
163+
164+
// Since actualElapsedMs (30ms) is much less than minWindowMs (30000ms),
165+
// the rate calculation will use minWindowMs as the window
166+
// Rate per hour = count / (windowMs / 1000) * 3600
167+
double expectedRatePerHour = (double) rebalanceCount / (minWindowMs / 1000.0) * 3600.0;
168+
169+
assertEquals(expectedRatePerHour, ratePerHour, 1.0,
170+
String.format("With %d rebalances in %dms, min window %dms: expecting %.1f rebalances/hour",
171+
rebalanceCount, actualElapsedMs, minWindowMs, expectedRatePerHour));
172+
}
173+
174+
@Test
175+
public void testFailedRebalanceMetrics() {
176+
177+
// Verify failed rebalance metrics are registered
178+
assertNotNull(metrics.metric(metricsManager.failedRebalanceTotal));
179+
assertNotNull(metrics.metric(metricsManager.failedRebalanceRate));
180+
181+
assertEquals(0.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
182+
"Initially, there should be no failed rebalances");
183+
184+
// Start a rebalance but don't complete it
185+
metricsManager.recordRebalanceStarted(time.milliseconds());
186+
time.sleep(10);
187+
188+
metricsManager.maybeRecordRebalanceFailed();
189+
assertEquals(1.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
190+
"Failed rebalance count should increment to 1 after recording failure");
191+
192+
// Complete a successful rebalance
193+
metricsManager.recordRebalanceStarted(time.milliseconds());
194+
time.sleep(10);
195+
metricsManager.recordRebalanceEnded(time.milliseconds());
196+
197+
metricsManager.maybeRecordRebalanceFailed();
198+
assertEquals(1.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
199+
"Failed count should not increment after successful rebalance completes");
200+
201+
// Start another rebalance, don't complete it, then record failure
202+
time.sleep(10);
203+
metricsManager.recordRebalanceStarted(time.milliseconds());
204+
assertTrue(metricsManager.rebalanceStarted(), "Rebalance should be in progress");
205+
time.sleep(10);
206+
// Don't call recordRebalanceEnded() to simulate an incomplete rebalance
207+
metricsManager.maybeRecordRebalanceFailed();
208+
assertEquals(2.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue());
209+
210+
double failedRate = (Double) metrics.metric(metricsManager.failedRebalanceRate).metricValue();
211+
212+
// Calculate expected failed rate based on Rate metric behavior
213+
// We had 2 failures over ~40ms, but minimum window is (numSamples - 1) * windowSizeMs
214+
long minWindowMs = (numSamples - 1) * windowSizeMs; // (2-1) * 30000 = 30000ms
215+
double expectedFailedRatePerHour = 2.0 / (minWindowMs / 1000.0) * 3600.0;
216+
217+
assertEquals(expectedFailedRatePerHour, failedRate, 1.0,
218+
String.format("With 2 failures, min window %dms: expecting %.1f failures/hour",
219+
minWindowMs, expectedFailedRatePerHour));
220+
}
221+
222+
@Test
223+
public void testLastRebalanceSecondsAgoMetric() {
224+
225+
// Verify metric is registered
226+
assertNotNull(metrics.metric(metricsManager.lastRebalanceSecondsAgo));
227+
228+
assertEquals(-1.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(),
229+
"Should return -1 when no rebalance has occurred");
230+
231+
// Complete a rebalance
232+
metricsManager.recordRebalanceStarted(time.milliseconds());
233+
time.sleep(10);
234+
metricsManager.recordRebalanceEnded(time.milliseconds());
235+
236+
assertEquals(0.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(),
237+
"Should return 0 immediately after rebalance completes");
238+
239+
// Advance time by 5 seconds
240+
time.sleep(5000);
241+
assertEquals(5.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue());
242+
243+
// Advance time by another 10 seconds
244+
time.sleep(10000);
245+
assertEquals(15.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue());
246+
247+
// Complete another rebalance
248+
metricsManager.recordRebalanceStarted(time.milliseconds());
249+
time.sleep(20);
250+
metricsManager.recordRebalanceEnded(time.milliseconds());
251+
252+
assertEquals(0.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(),
253+
"Should reset to 0 after a new rebalance completes");
254+
}
255+
256+
@Test
257+
public void testRebalanceStartedFlag() {
258+
259+
assertFalse(metricsManager.rebalanceStarted(),
260+
"Initially, no rebalance should be in progress");
261+
262+
metricsManager.recordRebalanceStarted(time.milliseconds());
263+
assertTrue(metricsManager.rebalanceStarted(),
264+
"Rebalance should be marked as started after recordRebalanceStarted()");
265+
266+
time.sleep(10);
267+
metricsManager.recordRebalanceEnded(time.milliseconds());
268+
assertFalse(metricsManager.rebalanceStarted(),
269+
"Rebalance should not be in progress after recordRebalanceEnded()");
270+
271+
// Start another rebalance - advance time first
272+
time.sleep(100);
273+
metricsManager.recordRebalanceStarted(time.milliseconds());
274+
assertTrue(metricsManager.rebalanceStarted(),
275+
"New rebalance should be marked as started");
276+
}
277+
278+
@Test
279+
public void testMultipleConsecutiveFailures() {
280+
281+
// Record multiple consecutive failures
282+
for (int i = 0; i < 5; i++) {
283+
metricsManager.recordRebalanceStarted(time.milliseconds());
284+
time.sleep(10);
285+
metricsManager.maybeRecordRebalanceFailed();
286+
}
287+
288+
assertEquals(5.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
289+
"Should have recorded 5 consecutive failed rebalances");
290+
291+
assertEquals(0.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue(),
292+
"Successful rebalance count should remain 0 when only failures occur");
293+
}
294+
295+
@Test
296+
public void testMixedSuccessAndFailureScenarios() {
297+
298+
// Success -> Failure -> Success -> Failure pattern
299+
// First success
300+
metricsManager.recordRebalanceStarted(time.milliseconds());
301+
time.sleep(20);
302+
metricsManager.recordRebalanceEnded(time.milliseconds());
303+
assertEquals(1.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue());
304+
305+
// First failure
306+
time.sleep(10);
307+
metricsManager.recordRebalanceStarted(time.milliseconds());
308+
assertTrue(metricsManager.rebalanceStarted(), "First failure rebalance should be in progress");
309+
time.sleep(30);
310+
metricsManager.maybeRecordRebalanceFailed();
311+
312+
double failedAfterFirst = (Double) metrics.metric(metricsManager.failedRebalanceTotal).metricValue();
313+
assertEquals(1.0d, failedAfterFirst, "Should have recorded one failed rebalance after first failure");
314+
315+
// Second success
316+
time.sleep(10);
317+
metricsManager.recordRebalanceStarted(time.milliseconds());
318+
time.sleep(40);
319+
metricsManager.recordRebalanceEnded(time.milliseconds());
320+
assertEquals(2.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue());
321+
322+
// Second failure
323+
time.sleep(10);
324+
metricsManager.recordRebalanceStarted(time.milliseconds());
325+
assertTrue(metricsManager.rebalanceStarted(), "Second failure rebalance should be in progress");
326+
time.sleep(50);
327+
metricsManager.maybeRecordRebalanceFailed();
328+
329+
assertEquals(2.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue(),
330+
"Should have 2 successful rebalances in mixed scenario");
331+
assertEquals(2.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(),
332+
"Should have 2 failed rebalances in mixed scenario");
333+
334+
assertEquals(30.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
335+
"Average latency should only include successful rebalances: (20 + 40) / 2 = 30ms");
336+
assertEquals(40.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
337+
"Max latency should be 40ms from successful rebalances only");
338+
assertEquals(60.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
339+
"Total latency should only include successful rebalances: 20 + 40 = 60ms");
62340
}
63341
}

0 commit comments

Comments
 (0)