Skip to content

Commit aee7a37

Browse files
authored
MINOR : Tolerate GroupIdNotFoundException in consumer when leaving a group. (#21239)
*What* - Currently if a consumer/share-consumer calls `close()` before it has joined a group, then the heartbeat on close will be sent with `epoch` = -1 and the broker would return "`GroupIdNotFoundException`". - This was causing couple of tests in `ShareConsumerTest` to be flaky if the heartbeat to join the group was sent with `epoch` = -1. - Since this can occur in real scenarios as well, it would be better to tolerate this exception while we are leaving the group so that the consumer can close cleanly. Reviewers: Andrew Schofield <[email protected]>
1 parent 3e9ae03 commit aee7a37

File tree

3 files changed

+118
-0
lines changed

3 files changed

+118
-0
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,22 @@ private void onErrorResponse(final R response, final long currentTimeMs) {
432432
"subscribe. " + errorMessage));
433433
break;
434434

435+
case GROUP_ID_NOT_FOUND:
436+
// If the group doesn't exist (e.g., member never joined due to InvalidTopicException),
437+
// GROUP_ID_NOT_FOUND should be ignored - the leave is effectively complete.
438+
// When a leave heartbeat (epoch=-1) is sent, the state transitions synchronously
439+
// from LEAVING to UNSUBSCRIBED in onHeartbeatRequestGenerated() before the request is sent.
440+
if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
441+
logger.info("{} received GROUP_ID_NOT_FOUND for group {} while unsubscribed. ",
442+
heartbeatRequestName(), membershipManager().groupId());
443+
membershipManager().onHeartbeatRequestSkipped();
444+
} else {
445+
// Else, this is a fatal error, we should throw it and transition to fatal state.
446+
logger.error("{} failed due to unexpected error {}: {}", heartbeatRequestName(), error, errorMessage);
447+
handleFatalFailure(error.exception(errorMessage));
448+
}
449+
break;
450+
435451
default:
436452
if (!handleSpecificExceptionInResponse(response, currentTimeMs)) {
437453
// If the manager receives an unknown error - there could be a bug in the code or a new error code

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,57 @@ public void testFailureOnFatalException() {
429429
verify(backgroundEventHandler).add(any());
430430
}
431431

432+
/**
433+
* Test that GROUP_ID_NOT_FOUND error while unsubscribed is not a fatal error.
434+
* This can happen when the consumer never successfully joined the group
435+
* (e.g., due to an InvalidTopicException during poll() and close() sends
436+
* a leave heartbeat for a group that was never created.
437+
*/
438+
@Test
439+
public void testGroupIdNotFoundExceptionWhileUnsubscribed() {
440+
// Setup: member is in UNSUBSCRIBED state with epoch -1
441+
when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
442+
when(membershipManager.memberEpoch()).thenReturn(-1);
443+
444+
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
445+
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
446+
assertEquals(1, result.unsentRequests.size());
447+
448+
// Complete the heartbeat with GROUP_ID_NOT_FOUND error
449+
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
450+
result.unsentRequests.get(0).handler().onComplete(response);
451+
452+
// Verify: no fatal error, heartbeat skipped (benign)
453+
verify(membershipManager, never()).transitionToFatal();
454+
verify(membershipManager).onHeartbeatRequestSkipped();
455+
verify(backgroundEventHandler, never()).add(any());
456+
}
457+
458+
/**
459+
* Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal.
460+
* This would indicate the group was unexpectedly deleted while the member
461+
* was actively participating.
462+
*/
463+
@Test
464+
public void testGroupIdNotFoundWhileStableIsFatal() {
465+
// Setup: member is in STABLE state with positive epoch
466+
when(membershipManager.state()).thenReturn(MemberState.STABLE);
467+
when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
468+
469+
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
470+
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
471+
assertEquals(1, result.unsentRequests.size());
472+
473+
// Complete the heartbeat with GROUP_ID_NOT_FOUND error
474+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
475+
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
476+
result.unsentRequests.get(0).handler().onComplete(response);
477+
478+
// Verify: fatal error
479+
verify(membershipManager).transitionToFatal();
480+
verify(backgroundEventHandler).add(any());
481+
}
482+
432483
@Test
433484
public void testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() {
434485
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,57 @@ public void testFailureOnFatalException() {
354354
verify(backgroundEventHandler).add(any());
355355
}
356356

357+
/**
358+
* Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as fatal.
359+
* This can happen when the consumer never successfully joined the group
360+
* (e.g., due to an InvalidTopicException during poll() and close() sends
361+
* a leave heartbeat for a group that was never created.
362+
*/
363+
@Test
364+
public void testGroupIdNotFoundExceptionWhileUnsubscribed() {
365+
// Setup: member is in UNSUBSCRIBED state with epoch -1
366+
when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
367+
when(membershipManager.memberEpoch()).thenReturn(-1);
368+
369+
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
370+
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
371+
assertEquals(1, result.unsentRequests.size());
372+
373+
// Complete the heartbeat with GROUP_ID_NOT_FOUND error
374+
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
375+
result.unsentRequests.get(0).handler().onComplete(response);
376+
377+
// Verify: no fatal error, heartbeat skipped (benign)
378+
verify(membershipManager, never()).transitionToFatal();
379+
verify(membershipManager).onHeartbeatRequestSkipped();
380+
verify(backgroundEventHandler, never()).add(any());
381+
}
382+
383+
/**
384+
* Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal.
385+
* This would indicate the group was unexpectedly deleted while the member
386+
* was actively participating.
387+
*/
388+
@Test
389+
public void testGroupIdNotFoundWhileStableIsFatal() {
390+
// Setup: member is in STABLE state with positive epoch
391+
when(membershipManager.state()).thenReturn(MemberState.STABLE);
392+
when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
393+
394+
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
395+
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
396+
assertEquals(1, result.unsentRequests.size());
397+
398+
// Complete the heartbeat with GROUP_ID_NOT_FOUND error
399+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
400+
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
401+
result.unsentRequests.get(0).handler().onComplete(response);
402+
403+
// Verify: fatal error
404+
verify(membershipManager).transitionToFatal();
405+
verify(backgroundEventHandler).add(any());
406+
}
407+
357408
@Test
358409
public void testNoCoordinator() {
359410
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

0 commit comments

Comments
 (0)