Skip to content

Commit 6c3995b

Browse files
authored
MINOR: Port changes from KAFKA-18569 for ShareConsumers (apache#19402)
ShareConsumers` may wait on an unneeded `FindCoordinator` during `close()`(i.e after the acknowledgements are sent). apache#18590 added the `StopFindCoordinatorOnClose` event and was used by the regular consumers. We are using the same event in `ShareConsumers` as well to prevent sending this event when coordinator is no longer needed. Reviewers: Andrew Schofield <[email protected]>
1 parent cdbb8fa commit 6c3995b

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
4848
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
4949
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
50+
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
5051
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
5152
import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
5253
import org.apache.kafka.common.KafkaException;
@@ -887,6 +888,8 @@ private void close(final Duration timeout, final boolean swallowException) {
887888
// Prepare shutting down the network thread
888889
swallow(log, Level.ERROR, "Failed to release assignment before closing consumer",
889890
() -> sendAcknowledgementsAndLeaveGroup(closeTimer, firstException), firstException);
891+
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
892+
this::stopFindCoordinatorOnClose, firstException);
890893
swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback",
891894
this::handleCompletedAcknowledgements, firstException);
892895
if (applicationEventHandler != null)
@@ -915,6 +918,11 @@ private void close(final Duration timeout, final boolean swallowException) {
915918
}
916919
}
917920

921+
private void stopFindCoordinatorOnClose() {
922+
log.debug("Stop finding coordinator during consumer close");
923+
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
924+
}
925+
918926
private Timer createTimerForCloseRequests(Duration timeout) {
919927
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
920928
final Time time = (this.time == null) ? Time.SYSTEM : this.time;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
3030
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
3131
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
32+
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
3233
import org.apache.kafka.common.KafkaException;
3334
import org.apache.kafka.common.TopicIdPartition;
3435
import org.apache.kafka.common.TopicPartition;
@@ -49,6 +50,7 @@
4950
import org.junit.jupiter.api.AfterEach;
5051
import org.junit.jupiter.api.Test;
5152
import org.mockito.ArgumentMatchers;
53+
import org.mockito.InOrder;
5254
import org.mockito.Mockito;
5355

5456
import java.time.Duration;
@@ -78,6 +80,7 @@
7880
import static org.mockito.ArgumentMatchers.argThat;
7981
import static org.mockito.Mockito.doAnswer;
8082
import static org.mockito.Mockito.doReturn;
83+
import static org.mockito.Mockito.inOrder;
8184
import static org.mockito.Mockito.mock;
8285
import static org.mockito.Mockito.times;
8386
import static org.mockito.Mockito.verify;
@@ -351,6 +354,25 @@ public void testCloseWithTopicAuthorizationException() {
351354
assertDoesNotThrow(() -> consumer.close());
352355
}
353356

357+
@Test
358+
public void testStopFindCoordinatorOnClose() {
359+
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
360+
consumer = newConsumer(subscriptions);
361+
362+
// Setup the expected successful completion of close events
363+
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
364+
completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
365+
366+
// Close the consumer
367+
consumer.close();
368+
369+
// Verify events are sent in correct order using InOrder
370+
InOrder inOrder = inOrder(applicationEventHandler);
371+
inOrder.verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
372+
inOrder.verify(applicationEventHandler).add(any(ShareUnsubscribeEvent.class));
373+
inOrder.verify(applicationEventHandler).add(any(StopFindCoordinatorOnCloseEvent.class));
374+
}
375+
354376
@Test
355377
public void testVerifyApplicationEventOnShutdown() {
356378
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);

0 commit comments

Comments
 (0)