Skip to content

Commit 0721d21

Browse files
authored
KAFKA-18415: Fix for event queue metric and flaky test (apache#18416)
Reviewers: Andrew Schofield <[email protected]>
1 parent 0377e80 commit 0721d21

File tree

3 files changed

+8
-13
lines changed

3 files changed

+8
-13
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,10 @@ public ApplicationEventHandler(final LogContext logContext,
8181
public void add(final ApplicationEvent event) {
8282
Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null");
8383
event.setEnqueuedMs(time.milliseconds());
84+
// Record the updated queue size before actually adding the event to the queue
85+
// to avoid race conditions (the background thread is continuously removing from this queue)
86+
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size() + 1);
8487
applicationEventQueue.add(event);
85-
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size());
8688
wakeupNetworkThread();
8789
}
8890

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public BackgroundEventHandler(final BlockingQueue<BackgroundEvent> backgroundEve
5353
public void add(BackgroundEvent event) {
5454
Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null");
5555
event.setEnqueuedMs(time.milliseconds());
56+
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size() + 1);
5657
backgroundEventQueue.add(event);
57-
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
5858
}
5959

6060
/**

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@
3232
import java.util.concurrent.BlockingQueue;
3333
import java.util.concurrent.LinkedBlockingQueue;
3434

35-
import static org.junit.jupiter.api.Assertions.assertEquals;
3635
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.spy;
37+
import static org.mockito.Mockito.verify;
3738

3839
public class ApplicationEventHandlerTest {
3940
private final Time time = new MockTime();
@@ -46,7 +47,7 @@ public class ApplicationEventHandlerTest {
4647
@Test
4748
public void testRecordApplicationEventQueueSize() {
4849
try (Metrics metrics = new Metrics();
49-
AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
50+
AsyncConsumerMetrics asyncConsumerMetrics = spy(new AsyncConsumerMetrics(metrics));
5051
ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler(
5152
new LogContext(),
5253
time,
@@ -59,15 +60,7 @@ public void testRecordApplicationEventQueueSize() {
5960
)) {
6061
// add event
6162
applicationEventHandler.add(new PollEvent(time.milliseconds()));
62-
assertEquals(
63-
1,
64-
(double) metrics.metric(
65-
metrics.metricName(
66-
AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
67-
ConsumerUtils.CONSUMER_METRIC_GROUP
68-
)
69-
).metricValue()
70-
);
63+
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
7164
}
7265
}
7366
}

0 commit comments

Comments
 (0)