Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -160,6 +161,13 @@ void validateOffsetFetch(
*/
void createGroupTombstoneRecords(List<CoordinatorRecord> records);

/**
* Cancel any timers associated with the group.
*
* @param timer The coordinator timer.
*/
default void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {}

/**
* @return Whether the group is in Empty state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
try {
groupMetadataManager.validateDeleteGroup(groupId);
numDeletedOffsets += offsetMetadataManager.deleteAllOffsets(groupId, records);
groupMetadataManager.createGroupTombstoneRecords(groupId, records);
groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId, records);
deletedGroups.add(groupId);

resultCollection.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8251,12 +8251,12 @@ private void removeCurrentMemberFromClassicGroup(
* @param groupId The id of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}.
* @param records The record list to populate.
*/
public void createGroupTombstoneRecords(
public void createGroupTombstoneRecordsAndCancelTimers(
String groupId,
List<CoordinatorRecord> records
) {
// At this point, we have already validated the group id, so we know that the group exists and that no exception will be thrown.
createGroupTombstoneRecords(group(groupId), records);
createGroupTombstoneRecordsAndCancelTimers(group(groupId), records);
}

/**
Expand All @@ -8266,12 +8266,12 @@ public void createGroupTombstoneRecords(
* @param group The group to be deleted.
* @param records The record list to populate.
*/
public void createGroupTombstoneRecords(
public void createGroupTombstoneRecordsAndCancelTimers(
Group group,
List<CoordinatorRecord> records
) {
group.createGroupTombstoneRecords(records);
timer.cancel(streamsInitialRebalanceKey(group.groupId()));
group.cancelTimers(timer);
}

/**
Expand Down Expand Up @@ -8666,7 +8666,7 @@ void validateDeleteGroup(String groupId) throws ApiException {
public void maybeDeleteGroup(String groupId, List<CoordinatorRecord> records) {
Group group = groups.get(groupId);
if (group != null && group.isEmpty()) {
createGroupTombstoneRecords(groupId, records);
createGroupTombstoneRecordsAndCancelTimers(groupId, records);
}
}

Expand Down Expand Up @@ -8703,7 +8703,7 @@ private boolean maybeDeleteEmptyClassicGroup(Group group, List<CoordinatorRecord
if (isEmptyClassicGroup(group)) {
// Delete the classic group by adding tombstones.
// There's no need to remove the group as the replay of tombstones removes it.
createGroupTombstoneRecords(group, records);
createGroupTombstoneRecordsAndCancelTimers(group, records);
return true;
}
return false;
Expand All @@ -8722,7 +8722,7 @@ private boolean maybeDeleteEmptyConsumerGroup(String groupId, List<CoordinatorRe
if (isEmptyConsumerGroup(group)) {
// Add tombstones for the previous consumer group. The tombstones won't actually be
// replayed because its coordinator result has a non-null appendFuture.
createGroupTombstoneRecords(group, records);
createGroupTombstoneRecordsAndCancelTimers(group, records);
removeGroup(groupId);
return true;
}
Expand All @@ -8742,7 +8742,7 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRec
if (isEmptyStreamsGroup(group)) {
// Add tombstones for the previous streams group. The tombstones won't actually be
// replayed because its coordinator result has a non-null appendFuture.
createGroupTombstoneRecords(group, records);
createGroupTombstoneRecordsAndCancelTimers(group, records);
removeGroup(groupId);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
Expand Down Expand Up @@ -823,6 +824,11 @@ public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
}

@Override
public void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {
timer.cancel("initial-rebalance-timeout-" + groupId);
}

@Override
public boolean isEmpty() {
return state() == StreamsGroupState.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,14 @@ public void testDeleteGroups() {
List<CoordinatorRecord> records = invocation.getArgument(1);
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
return null;
}).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList());
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(), anyList());

CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, CoordinatorRecord> coordinatorResult =
coordinator.deleteGroups(context, groupIds);

for (String groupId : groupIds) {
verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(ArgumentMatchers.eq(groupId), anyList());
verify(groupMetadataManager, times(1)).createGroupTombstoneRecordsAndCancelTimers(ArgumentMatchers.eq(groupId), anyList());
verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
}
assertEquals(expectedResult, coordinatorResult);
Expand Down Expand Up @@ -372,15 +372,15 @@ public void testDeleteGroupsInvalidGroupId() {
List<CoordinatorRecord> records = invocation.getArgument(1);
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
return null;
}).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList());
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(), anyList());

CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, CoordinatorRecord> coordinatorResult =
coordinator.deleteGroups(context, groupIds);

for (String groupId : groupIds) {
verify(groupMetadataManager, times(1)).validateDeleteGroup(eq(groupId));
if (!groupId.equals("group-id-2")) {
verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(eq(groupId), anyList());
verify(groupMetadataManager, times(1)).createGroupTombstoneRecordsAndCancelTimers(eq(groupId), anyList());
verify(offsetMetadataManager, times(1)).deleteAllOffsets(eq(groupId), anyList());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10460,7 +10460,7 @@ public void testClassicGroupDelete() {

List<CoordinatorRecord> expectedRecords = List.of(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
List<CoordinatorRecord> records = new ArrayList<>();
context.groupMetadataManager.createGroupTombstoneRecords("group-id", records);
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id", records);
assertEquals(expectedRecords, records);
}

Expand Down Expand Up @@ -10498,7 +10498,7 @@ public void testConsumerGroupDelete() {
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
);
List<CoordinatorRecord> records = new ArrayList<>();
context.groupMetadataManager.createGroupTombstoneRecords("group-id", records);
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id", records);
assertEquals(expectedRecords, records);
}

Expand Down Expand Up @@ -10540,7 +10540,7 @@ public void testStreamsGroupDeleteCancelsInitialRebalanceTimer() {
assertTrue(context.timer.isScheduled(timerKey), "Timer should be scheduled after first member joins");

List<CoordinatorRecord> records = new ArrayList<>();
context.groupMetadataManager.createGroupTombstoneRecords(groupId, records);
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId, records);

assertFalse(context.timer.isScheduled(timerKey), "Timer should be cancelled after group deletion");

Expand Down Expand Up @@ -15952,7 +15952,7 @@ public void testShareGroupDeleteTombstones() {
GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId)
);
List<CoordinatorRecord> records = new ArrayList<>();
context.groupMetadataManager.createGroupTombstoneRecords("share-group-id", records);
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("share-group-id", records);
assertEquals(expectedRecords, records);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
Expand Down Expand Up @@ -77,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class StreamsGroupTest {
Expand Down Expand Up @@ -1228,4 +1230,14 @@ public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyE
assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId());
assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics());
}

@Test
public void testCancelTimers() {
StreamsGroup streamsGroup = createStreamsGroup("test-group");
CoordinatorTimer<Void, CoordinatorRecord> timer = mock(CoordinatorTimer.class);

streamsGroup.cancelTimers(timer);

verify(timer).cancel("initial-rebalance-timeout-test-group");
}
}