Skip to content

Commit 92169b8

Browse files
authored
KAFKA-19357 AsyncConsumer#close hangs as commitAsync never completes when coordinator is missing (#19914)
Problem: When AsyncConsumer is closing, CoordinatorRequestManager stops looking for coordinator by returning EMPTY in poll() method when closing flag is true. This prevents commitAsync() and other coordinator-dependent operations from completing, causing close() to hang until timeout. Solution: Modified the closing flag check in poll() method of CommitRequestManager to be more targeted: - When both coordinators are unknown and the consumer is closing, only return EMPTY - When this condition is met, proactively fail all pending commit requests with CommitFailedException - This allows coordinator lookup to continue when coordinator is available during shutdown, while preventing indefinite hangs when coordinator is unreachable Reviewers: PoAn Yang <[email protected]>, Andrew Schofield <[email protected]>, TengYao Chi <[email protected]>, Kirk True <[email protected]>, Jhen-Yung Hsu <[email protected]>, Lan Ding <[email protected]>, TaiJuWu <[email protected]>, Ken Huang <[email protected]>, KuoChe <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 3c08439 commit 92169b8

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.test.api.ClusterTestDefaults;
2929
import org.apache.kafka.common.test.api.Type;
3030
import org.apache.kafka.test.MockConsumerInterceptor;
31+
import org.apache.kafka.test.TestUtils;
3132

3233
import org.junit.jupiter.api.BeforeEach;
3334

@@ -452,6 +453,40 @@ private void testPositionAndCommit(GroupProtocol groupProtocol) throws Interrupt
452453
}
453454
}
454455

456+
/**
457+
* This is testing when closing the consumer but commit request has already been sent.
458+
* During the closing, the consumer won't find the coordinator anymore.
459+
*/
460+
@ClusterTest
461+
public void testCommitAsyncFailsWhenCoordinatorUnavailableDuringClose() throws InterruptedException {
462+
try (Producer<byte[], byte[]> producer = cluster.producer();
463+
var consumer = createConsumer(GroupProtocol.CONSUMER, false)
464+
) {
465+
sendRecords(producer, tp, 3, System.currentTimeMillis());
466+
consumer.assign(List.of(tp));
467+
468+
var callback = new CountConsumerCommitCallback();
469+
470+
// Close the coordinator before committing because otherwise the commit will fail to find the coordinator.
471+
cluster.brokerIds().forEach(cluster::shutdownBroker);
472+
473+
TestUtils.waitForCondition(() -> cluster.aliveBrokers().isEmpty(), "All brokers should be shut down");
474+
475+
consumer.poll(Duration.ofMillis(500));
476+
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), callback);
477+
478+
long startTime = System.currentTimeMillis();
479+
consumer.close(CloseOptions.timeout(Duration.ofMillis(500)));
480+
long closeDuration = System.currentTimeMillis() - startTime;
481+
482+
assertTrue(closeDuration < 1000, "The closing process for the consumer was too long: " + closeDuration + " ms");
483+
assertTrue(callback.lastError.isPresent());
484+
assertEquals(CommitFailedException.class, callback.lastError.get().getClass());
485+
assertEquals("Failed to commit offsets: Coordinator unknown and consumer is closing", callback.lastError.get().getMessage());
486+
assertEquals(1, callback.exceptionCount);
487+
}
488+
}
489+
455490
// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
456491
@ClusterTest
457492
public void testCommitAsyncCompletedBeforeConsumerCloses() throws InterruptedException {
@@ -575,13 +610,15 @@ void sendAsyncCommit() {
575610

576611
private static class CountConsumerCommitCallback implements OffsetCommitCallback {
577612
private int successCount = 0;
613+
private int exceptionCount = 0;
578614
private Optional<Exception> lastError = Optional.empty();
579615

580616
@Override
581617
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
582618
if (exception == null) {
583619
successCount += 1;
584620
} else {
621+
exceptionCount += 1;
585622
lastError = Optional.of(exception);
586623
}
587624
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
181181
// poll when the coordinator node is known and fatal error is not present
182182
if (coordinatorRequestManager.coordinator().isEmpty()) {
183183
pendingRequests.maybeFailOnCoordinatorFatalError();
184+
185+
if (closing && pendingRequests.hasUnsentRequests()) {
186+
CommitFailedException exception = new CommitFailedException(
187+
"Failed to commit offsets: Coordinator unknown and consumer is closing");
188+
pendingRequests.drainPendingCommits()
189+
.forEach(request -> request.future().completeExceptionally(exception));
190+
}
191+
184192
return EMPTY;
185193
}
186194

clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,47 @@ private static void assertEmptyPendingRequests(CommitRequestManager commitReques
14941494
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
14951495
}
14961496

1497+
@Test
1498+
public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
1499+
CommitRequestManager commitRequestManager = create(true, 100);
1500+
1501+
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new TopicPartition("topic", 1),
1502+
new OffsetAndMetadata(0));
1503+
1504+
var commitFuture = commitRequestManager.commitAsync(offsets);
1505+
1506+
commitRequestManager.signalClose();
1507+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
1508+
when(coordinatorRequestManager.fatalError())
1509+
.thenReturn(Optional.of(new GroupAuthorizationException("Fatal error")));
1510+
1511+
assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds()));
1512+
1513+
assertTrue(commitFuture.isCompletedExceptionally());
1514+
1515+
TestUtils.assertFutureThrows(GroupAuthorizationException.class, commitFuture, "Fatal error");
1516+
}
1517+
1518+
@Test
1519+
public void testPollWithClosingAndPendingRequests() {
1520+
CommitRequestManager commitRequestManager = create(true, 100);
1521+
1522+
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new TopicPartition("topic", 1),
1523+
new OffsetAndMetadata(0));
1524+
1525+
var commitFuture = commitRequestManager.commitAsync(offsets);
1526+
1527+
commitRequestManager.signalClose();
1528+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
1529+
1530+
assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds()));
1531+
1532+
assertTrue(commitFuture.isCompletedExceptionally());
1533+
1534+
TestUtils.assertFutureThrows(CommitFailedException.class, commitFuture,
1535+
"Failed to commit offsets: Coordinator unknown and consumer is closing");
1536+
}
1537+
14971538
// Supplies (error, isRetriable)
14981539
private static Stream<Arguments> partitionDataErrorSupplier() {
14991540
return Stream.of(

0 commit comments

Comments
 (0)