diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 359c1fddd819e..501c048352fd2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; import java.util.Arrays; import java.util.List; @@ -160,6 +161,13 @@ void validateOffsetFetch( */ void createGroupTombstoneRecords(List records); + /** + * Cancel any timers associated with the group. + * + * @param timer The coordinator timer. + */ + default void cancelTimers(CoordinatorTimer timer) {} + /** * @return Whether the group is in Empty state. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 3165d10d8f477..d4c1ba1d3cd9b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -620,7 +620,7 @@ public CoordinatorResult records ) { // At this point, we have already validated the group id, so we know that the group exists and that no exception will be thrown. - createGroupTombstoneRecords(group(groupId), records); + createGroupTombstoneRecordsAndCancelTimers(group(groupId), records); } /** @@ -8266,12 +8266,12 @@ public void createGroupTombstoneRecords( * @param group The group to be deleted. * @param records The record list to populate. */ - public void createGroupTombstoneRecords( + public void createGroupTombstoneRecordsAndCancelTimers( Group group, List records ) { group.createGroupTombstoneRecords(records); - timer.cancel(streamsInitialRebalanceKey(group.groupId())); + group.cancelTimers(timer); } /** @@ -8666,7 +8666,7 @@ void validateDeleteGroup(String groupId) throws ApiException { public void maybeDeleteGroup(String groupId, List records) { Group group = groups.get(groupId); if (group != null && group.isEmpty()) { - createGroupTombstoneRecords(groupId, records); + createGroupTombstoneRecordsAndCancelTimers(groupId, records); } } @@ -8703,7 +8703,7 @@ private boolean maybeDeleteEmptyClassicGroup(Group group, List records) { records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId())); } + @Override + public void cancelTimers(CoordinatorTimer timer) { + timer.cancel("initial-rebalance-timeout-" + groupId); + } + @Override public boolean isEmpty() { return state() == StreamsGroupState.EMPTY; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 145b7613f1fcd..4dc0c8ff78c15 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -306,14 +306,14 @@ public void testDeleteGroups() { List records = invocation.getArgument(1); records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId)); return null; - }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList()); + }).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(), anyList()); CoordinatorResult coordinatorResult = coordinator.deleteGroups(context, groupIds); for (String groupId : groupIds) { verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId)); - verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(ArgumentMatchers.eq(groupId), anyList()); + verify(groupMetadataManager, times(1)).createGroupTombstoneRecordsAndCancelTimers(ArgumentMatchers.eq(groupId), anyList()); verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList()); } assertEquals(expectedResult, coordinatorResult); @@ -372,7 +372,7 @@ public void testDeleteGroupsInvalidGroupId() { List records = invocation.getArgument(1); records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId)); return null; - }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList()); + }).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(), anyList()); CoordinatorResult coordinatorResult = coordinator.deleteGroups(context, groupIds); @@ -380,7 +380,7 @@ public void testDeleteGroupsInvalidGroupId() { for (String groupId : groupIds) { verify(groupMetadataManager, times(1)).validateDeleteGroup(eq(groupId)); if (!groupId.equals("group-id-2")) { - verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(eq(groupId), anyList()); + verify(groupMetadataManager, times(1)).createGroupTombstoneRecordsAndCancelTimers(eq(groupId), anyList()); verify(offsetMetadataManager, times(1)).deleteAllOffsets(eq(groupId), anyList()); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ef90ba0b63122..ef81022dceef4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -10460,7 +10460,7 @@ public void testClassicGroupDelete() { List expectedRecords = List.of(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id")); List records = new ArrayList<>(); - context.groupMetadataManager.createGroupTombstoneRecords("group-id", records); + context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id", records); assertEquals(expectedRecords, records); } @@ -10498,7 +10498,7 @@ public void testConsumerGroupDelete() { GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId) ); List records = new ArrayList<>(); - context.groupMetadataManager.createGroupTombstoneRecords("group-id", records); + context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id", records); assertEquals(expectedRecords, records); } @@ -10540,7 +10540,7 @@ public void testStreamsGroupDeleteCancelsInitialRebalanceTimer() { assertTrue(context.timer.isScheduled(timerKey), "Timer should be scheduled after first member joins"); List records = new ArrayList<>(); - context.groupMetadataManager.createGroupTombstoneRecords(groupId, records); + context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId, records); assertFalse(context.timer.isScheduled(timerKey), "Timer should be cancelled after group deletion"); @@ -15952,7 +15952,7 @@ public void testShareGroupDeleteTombstones() { GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId) ); List records = new ArrayList<>(); - context.groupMetadataManager.createGroupTombstoneRecords("share-group-id", records); + context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("share-group-id", records); assertEquals(expectedRecords, records); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index de1bf2d82c87f..ced7d7ee4e71d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer; import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage; import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder; import org.apache.kafka.coordinator.group.CommitPartitionValidator; @@ -77,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class StreamsGroupTest { @@ -1228,4 +1230,14 @@ public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyE assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId()); assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics()); } + + @Test + public void testCancelTimers() { + StreamsGroup streamsGroup = createStreamsGroup("test-group"); + CoordinatorTimer timer = mock(CoordinatorTimer.class); + + streamsGroup.cancelTimers(timer); + + verify(timer).cancel("initial-rebalance-timeout-test-group"); + } }