Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,23 @@ private void onErrorResponse(final R response, final long currentTimeMs) {
"subscribe. " + errorMessage));
break;

case GROUP_ID_NOT_FOUND:
// If the group doesn't exist (e.g., member never joined due to InvalidTopicException),
// GROUP_ID_NOT_FOUND should be ignored - the leave is effectively complete.
// When a leave heartbeat (epoch=-1) is sent, the state transitions synchronously
// from LEAVING to UNSUBSCRIBED in onHeartbeatRequestGenerated() before the request is sent.
if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
logger.info("{} received GROUP_ID_NOT_FOUND for group {} while unsubscribed. " +
"Not treating as fatal since consumer is leaving group.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the "Not treating as fatal since consumer is leaving group.". There's no sense in using "fatal" in an information log line. This is nothing to worry about in the slightest.

heartbeatRequestName(), membershipManager().groupId());
membershipManager().onHeartbeatRequestSkipped();
} else {
// Else, this is a fatal error, we should throw it and transition to fatal state.
logger.error("{} failed due to unexpected error {}: {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The following logger.error is identical but on a single line. Please change this to a single line also.

heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
}
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's have a blank line following the break to match the other cases in this switch.

default:
if (!handleSpecificExceptionInResponse(response, currentTimeMs)) {
// If the manager receives an unknown error - there could be a bug in the code or a new error code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,57 @@ public void testFailureOnFatalException() {
verify(backgroundEventHandler).add(any());
}

/**
* Test that GROUP_ID_NOT_FOUND error while unsubscribed is not a fatal error.
* This can happen when the consumer never successfully joined the group
* (e.g., due to an InvalidTopicException during poll() and close() sends
* a leave heartbeat for a group that was never created.
*/
@Test
public void testGroupIdNotFoundExceptionWhileUnsubscribed() {
// Setup: member is in UNSUBSCRIBED state with epoch -1
when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
when(membershipManager.memberEpoch()).thenReturn(-1);

time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

// Complete the heartbeat with GROUP_ID_NOT_FOUND error
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
result.unsentRequests.get(0).handler().onComplete(response);

// Verify: no fatal error, heartbeat skipped (benign)
verify(membershipManager, never()).transitionToFatal();
verify(membershipManager).onHeartbeatRequestSkipped();
verify(backgroundEventHandler, never()).add(any());
}

/**
* Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal.
* This would indicate the group was unexpectedly deleted while the member
* was actively participating.
*/
@Test
public void testGroupIdNotFoundWhileStableIsFatal() {
// Setup: member is in STABLE state with positive epoch
when(membershipManager.state()).thenReturn(MemberState.STABLE);
when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);

time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

// Complete the heartbeat with GROUP_ID_NOT_FOUND error
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
result.unsentRequests.get(0).handler().onComplete(response);

// Verify: fatal error
verify(membershipManager).transitionToFatal();
verify(backgroundEventHandler).add(any());
}

@Test
public void testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,57 @@ public void testFailureOnFatalException() {
verify(backgroundEventHandler).add(any());
}

/**
* Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as fatal.
* This can happen when the consumer never successfully joined the group
* (e.g., due to an InvalidTopicException during poll() and close() sends
* a leave heartbeat for a group that was never created.
*/
@Test
public void testGroupIdNotFoundExceptionWhileUnsubscribed() {
// Setup: member is in UNSUBSCRIBED state with epoch -1
when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
when(membershipManager.memberEpoch()).thenReturn(-1);

time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

// Complete the heartbeat with GROUP_ID_NOT_FOUND error
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
result.unsentRequests.get(0).handler().onComplete(response);

// Verify: no fatal error, heartbeat skipped (benign)
verify(membershipManager, never()).transitionToFatal();
verify(membershipManager).onHeartbeatRequestSkipped();
verify(backgroundEventHandler, never()).add(any());
}

/**
* Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal.
* This would indicate the group was unexpectedly deleted while the member
* was actively participating.
*/
@Test
public void testGroupIdNotFoundWhileStableIsFatal() {
// Setup: member is in STABLE state with positive epoch
when(membershipManager.state()).thenReturn(MemberState.STABLE);
when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);

time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

// Complete the heartbeat with GROUP_ID_NOT_FOUND error
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND);
result.unsentRequests.get(0).handler().onComplete(response);

// Verify: fatal error
verify(membershipManager).transitionToFatal();
verify(backgroundEventHandler).add(any());
}

@Test
public void testNoCoordinator() {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
Expand Down