Skip to content

Commit 437337c

Browse files
authored
MINOR: update method name from createGroupTombstoneRecords to createGroupTombStoneRecordsAndCancelTimers (#20949)
Since now we are canceling timers in the method, we need to update the method name Reviewers: Lucas Brutschy <[email protected]>, Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
1 parent 40c3f31 commit 437337c

File tree

7 files changed

+54
-18
lines changed

7 files changed

+54
-18
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.KafkaException;
2020
import org.apache.kafka.common.message.ListGroupsResponseData;
2121
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
22+
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
2223

2324
import java.util.Arrays;
2425
import java.util.List;
@@ -160,6 +161,13 @@ void validateOffsetFetch(
160161
*/
161162
void createGroupTombstoneRecords(List<CoordinatorRecord> records);
162163

164+
/**
165+
* Cancel any timers associated with the group.
166+
*
167+
* @param timer The coordinator timer.
168+
*/
169+
default void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {}
170+
163171
/**
164172
* @return Whether the group is in Empty state.
165173
*/

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
620620
try {
621621
groupMetadataManager.validateDeleteGroup(groupId);
622622
numDeletedOffsets += offsetMetadataManager.deleteAllOffsets(groupId, records);
623-
groupMetadataManager.createGroupTombstoneRecords(groupId, records);
623+
groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId, records);
624624
deletedGroups.add(groupId);
625625

626626
resultCollection.add(

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8251,12 +8251,12 @@ private void removeCurrentMemberFromClassicGroup(
82518251
* @param groupId The id of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}.
82528252
* @param records The record list to populate.
82538253
*/
8254-
public void createGroupTombstoneRecords(
8254+
public void createGroupTombstoneRecordsAndCancelTimers(
82558255
String groupId,
82568256
List<CoordinatorRecord> records
82578257
) {
82588258
// At this point, we have already validated the group id, so we know that the group exists and that no exception will be thrown.
8259-
createGroupTombstoneRecords(group(groupId), records);
8259+
createGroupTombstoneRecordsAndCancelTimers(group(groupId), records);
82608260
}
82618261

82628262
/**
@@ -8266,12 +8266,12 @@ public void createGroupTombstoneRecords(
82668266
* @param group The group to be deleted.
82678267
* @param records The record list to populate.
82688268
*/
8269-
public void createGroupTombstoneRecords(
8269+
public void createGroupTombstoneRecordsAndCancelTimers(
82708270
Group group,
82718271
List<CoordinatorRecord> records
82728272
) {
82738273
group.createGroupTombstoneRecords(records);
8274-
timer.cancel(streamsInitialRebalanceKey(group.groupId()));
8274+
group.cancelTimers(timer);
82758275
}
82768276

82778277
/**
@@ -8666,7 +8666,7 @@ void validateDeleteGroup(String groupId) throws ApiException {
86668666
public void maybeDeleteGroup(String groupId, List<CoordinatorRecord> records) {
86678667
Group group = groups.get(groupId);
86688668
if (group != null && group.isEmpty()) {
8669-
createGroupTombstoneRecords(groupId, records);
8669+
createGroupTombstoneRecordsAndCancelTimers(group, records);
86708670
}
86718671
}
86728672

@@ -8703,7 +8703,7 @@ private boolean maybeDeleteEmptyClassicGroup(Group group, List<CoordinatorRecord
87038703
if (isEmptyClassicGroup(group)) {
87048704
// Delete the classic group by adding tombstones.
87058705
// There's no need to remove the group as the replay of tombstones removes it.
8706-
createGroupTombstoneRecords(group, records);
8706+
createGroupTombstoneRecordsAndCancelTimers(group, records);
87078707
return true;
87088708
}
87098709
return false;
@@ -8722,7 +8722,7 @@ private boolean maybeDeleteEmptyConsumerGroup(String groupId, List<CoordinatorRe
87228722
if (isEmptyConsumerGroup(group)) {
87238723
// Add tombstones for the previous consumer group. The tombstones won't actually be
87248724
// replayed because its coordinator result has a non-null appendFuture.
8725-
createGroupTombstoneRecords(group, records);
8725+
createGroupTombstoneRecordsAndCancelTimers(group, records);
87268726
removeGroup(groupId);
87278727
return true;
87288728
}
@@ -8742,7 +8742,7 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRec
87428742
if (isEmptyStreamsGroup(group)) {
87438743
// Add tombstones for the previous streams group. The tombstones won't actually be
87448744
// replayed because its coordinator result has a non-null appendFuture.
8745-
createGroupTombstoneRecords(group, records);
8745+
createGroupTombstoneRecordsAndCancelTimers(group, records);
87468746
removeGroup(groupId);
87478747
return true;
87488748
}
@@ -8908,7 +8908,7 @@ static String classicGroupSyncKey(String groupId) {
89088908
* @return the initial rebalance key.
89098909
*/
89108910
static String streamsInitialRebalanceKey(String groupId) {
8911-
return "initial-rebalance-timeout-" + groupId;
8911+
return StreamsGroup.initialRebalanceTimeoutKey(groupId);
89128912
}
89138913

89148914
/**

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.common.utils.LogContext;
2828
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
2929
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
30+
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
3031
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
3132
import org.apache.kafka.coordinator.group.Group;
3233
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
@@ -823,6 +824,21 @@ public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
823824
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
824825
}
825826

827+
/**
828+
* Generate an initial rebalance key for the timer.
829+
*
830+
* @param groupId The group id.
831+
* @return The initial rebalance key.
832+
*/
833+
public static String initialRebalanceTimeoutKey(String groupId) {
834+
return "initial-rebalance-timeout-" + groupId;
835+
}
836+
837+
@Override
838+
public void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {
839+
timer.cancel(initialRebalanceTimeoutKey(groupId));
840+
}
841+
826842
@Override
827843
public boolean isEmpty() {
828844
return state() == StreamsGroupState.EMPTY;

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,14 +306,14 @@ public void testDeleteGroups() {
306306
List<CoordinatorRecord> records = invocation.getArgument(1);
307307
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
308308
return null;
309-
}).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList());
309+
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(), anyList());
310310

311311
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, CoordinatorRecord> coordinatorResult =
312312
coordinator.deleteGroups(context, groupIds);
313313

314314
for (String groupId : groupIds) {
315315
verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
316-
verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(ArgumentMatchers.eq(groupId), anyList());
316+
verify(groupMetadataManager, times(1)).createGroupTombstoneRecordsAndCancelTimers(ArgumentMatchers.eq(groupId), anyList());
317317
verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
318318
}
319319
assertEquals(expectedResult, coordinatorResult);
@@ -372,15 +372,15 @@ public void testDeleteGroupsInvalidGroupId() {
372372
List<CoordinatorRecord> records = invocation.getArgument(1);
373373
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
374374
return null;
375-
}).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList());
375+
}).when(groupMetadataManager).createGroupTombstoneRecordsAndCancelTimers(anyString(), anyList());
376376

377377
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, CoordinatorRecord> coordinatorResult =
378378
coordinator.deleteGroups(context, groupIds);
379379

380380
for (String groupId : groupIds) {
381381
verify(groupMetadataManager, times(1)).validateDeleteGroup(eq(groupId));
382382
if (!groupId.equals("group-id-2")) {
383-
verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(eq(groupId), anyList());
383+
verify(groupMetadataManager, times(1)).createGroupTombstoneRecordsAndCancelTimers(eq(groupId), anyList());
384384
verify(offsetMetadataManager, times(1)).deleteAllOffsets(eq(groupId), anyList());
385385
}
386386
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10460,7 +10460,7 @@ public void testClassicGroupDelete() {
1046010460

1046110461
List<CoordinatorRecord> expectedRecords = List.of(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
1046210462
List<CoordinatorRecord> records = new ArrayList<>();
10463-
context.groupMetadataManager.createGroupTombstoneRecords("group-id", records);
10463+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id", records);
1046410464
assertEquals(expectedRecords, records);
1046510465
}
1046610466

@@ -10498,7 +10498,7 @@ public void testConsumerGroupDelete() {
1049810498
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
1049910499
);
1050010500
List<CoordinatorRecord> records = new ArrayList<>();
10501-
context.groupMetadataManager.createGroupTombstoneRecords("group-id", records);
10501+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("group-id", records);
1050210502
assertEquals(expectedRecords, records);
1050310503
}
1050410504

@@ -10540,7 +10540,7 @@ public void testStreamsGroupDeleteCancelsInitialRebalanceTimer() {
1054010540
assertTrue(context.timer.isScheduled(timerKey), "Timer should be scheduled after first member joins");
1054110541

1054210542
List<CoordinatorRecord> records = new ArrayList<>();
10543-
context.groupMetadataManager.createGroupTombstoneRecords(groupId, records);
10543+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers(groupId, records);
1054410544

1054510545
assertFalse(context.timer.isScheduled(timerKey), "Timer should be cancelled after group deletion");
1054610546

@@ -15952,7 +15952,7 @@ public void testShareGroupDeleteTombstones() {
1595215952
GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId)
1595315953
);
1595415954
List<CoordinatorRecord> records = new ArrayList<>();
15955-
context.groupMetadataManager.createGroupTombstoneRecords("share-group-id", records);
15955+
context.groupMetadataManager.createGroupTombstoneRecordsAndCancelTimers("share-group-id", records);
1595615956
assertEquals(expectedRecords, records);
1595715957
}
1595815958

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.common.utils.MockTime;
3030
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
3131
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
32+
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
3233
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
3334
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
3435
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
@@ -77,6 +78,7 @@
7778
import static org.junit.jupiter.api.Assertions.assertThrows;
7879
import static org.junit.jupiter.api.Assertions.assertTrue;
7980
import static org.mockito.Mockito.mock;
81+
import static org.mockito.Mockito.verify;
8082
import static org.mockito.Mockito.when;
8183

8284
public class StreamsGroupTest {
@@ -1228,4 +1230,14 @@ public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyE
12281230
assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId());
12291231
assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics());
12301232
}
1233+
1234+
@Test
1235+
public void testCancelTimers() {
1236+
StreamsGroup streamsGroup = createStreamsGroup("test-group");
1237+
CoordinatorTimer<Void, CoordinatorRecord> timer = mock(CoordinatorTimer.class);
1238+
1239+
streamsGroup.cancelTimers(timer);
1240+
1241+
verify(timer).cancel("initial-rebalance-timeout-test-group");
1242+
}
12311243
}

0 commit comments

Comments
 (0)