Skip to content

Commit f188a31

Browse files
authored
KAFKA-19500: kafka-consumer-groups.sh should fail quickly if the partition leader is unavailable (#20168)
1. Add check leader missing logic in method `ConsumerGroupCommand.ConsumerGroupService#prepareOffsetsToReset` in order to fail quickly 2. Add some tests Reviewers: TaiJuWu <[email protected]>, Lan Ding <[email protected]>, Ken Huang <[email protected]>, Andrew Schofield <[email protected]>
1 parent ef07b5f commit f188a31

File tree

3 files changed

+79
-1
lines changed

3 files changed

+79
-1
lines changed

tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.kafka.common.Node;
4242
import org.apache.kafka.common.TopicPartition;
4343
import org.apache.kafka.common.errors.GroupIdNotFoundException;
44+
import org.apache.kafka.common.errors.LeaderNotAvailableException;
45+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
4446
import org.apache.kafka.common.protocol.Errors;
4547
import org.apache.kafka.common.utils.Utils;
4648
import org.apache.kafka.server.util.CommandLineUtils;
@@ -1000,6 +1002,9 @@ private Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupI
10001002
}
10011003

10021004
private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId, Collection<TopicPartition> partitionsToReset) {
1005+
// ensure all partitions are valid, otherwise throw a runtime exception
1006+
checkAllTopicPartitionsValid(partitionsToReset);
1007+
10031008
if (opts.options.has(opts.resetToOffsetOpt)) {
10041009
return offsetsUtils.resetToOffset(partitionsToReset);
10051010
} else if (opts.options.has(opts.resetToEarliestOpt)) {
@@ -1024,6 +1029,38 @@ private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String grou
10241029
return null;
10251030
}
10261031

1032+
private void checkAllTopicPartitionsValid(Collection<TopicPartition> partitionsToReset) {
1033+
// check the partitions exist
1034+
List<TopicPartition> partitionsNotExistList = filterNonExistentPartitions(partitionsToReset);
1035+
if (!partitionsNotExistList.isEmpty()) {
1036+
String partitionStr = partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
1037+
throw new UnknownTopicOrPartitionException("The partitions \"" + partitionStr + "\" do not exist");
1038+
}
1039+
1040+
// check the partitions have leader
1041+
List<TopicPartition> partitionsWithoutLeader = filterNoneLeaderPartitions(partitionsToReset);
1042+
if (!partitionsWithoutLeader.isEmpty()) {
1043+
String partitionStr = partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
1044+
throw new LeaderNotAvailableException("The partitions \"" + partitionStr + "\" have no leader");
1045+
}
1046+
}
1047+
1048+
private List<TopicPartition> filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
1049+
// collect all topics
1050+
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
1051+
try {
1052+
List<TopicPartition> existPartitions = adminClient.describeTopics(topics).allTopicNames().get().entrySet()
1053+
.stream()
1054+
.flatMap(entry -> entry.getValue().partitions().stream()
1055+
.map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition())))
1056+
.toList();
1057+
1058+
return topicPartitions.stream().filter(element -> !existPartitions.contains(element)).toList();
1059+
} catch (InterruptedException | ExecutionException e) {
1060+
throw new RuntimeException(e);
1061+
}
1062+
}
1063+
10271064
String exportOffsetsToCsv(Map<String, Map<TopicPartition, OffsetAndMetadata>> assignments) {
10281065
boolean isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1;
10291066
ObjectWriter csvWriter = isSingleGroupQuery

tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static org.junit.jupiter.api.Assertions.assertEquals;
6363
import static org.junit.jupiter.api.Assertions.assertTrue;
6464
import static org.mockito.ArgumentMatchers.any;
65+
import static org.mockito.ArgumentMatchers.anySet;
6566
import static org.mockito.Mockito.mock;
6667
import static org.mockito.Mockito.times;
6768
import static org.mockito.Mockito.verify;
@@ -234,6 +235,8 @@ public void testAdminRequestsForResetOffsets() {
234235
.thenReturn(describeGroupsResult(GroupState.DEAD));
235236
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified), any()))
236237
.thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified));
238+
when(admin.describeTopics(anySet()))
239+
.thenReturn(describeTopicsResult(TOPICS));
237240
when(admin.listOffsets(offsetsArgMatcher(), any()))
238241
.thenReturn(listOffsetsResult());
239242

@@ -317,7 +320,7 @@ private DescribeTopicsResult describeTopicsResult(Collection<String> topics) {
317320

318321
topics.forEach(topic -> {
319322
List<TopicPartitionInfo> partitions = IntStream.range(0, NUM_PARTITIONS)
320-
.mapToObj(i -> new TopicPartitionInfo(i, null, Collections.emptyList(), Collections.emptyList()))
323+
.mapToObj(i -> new TopicPartitionInfo(i, Node.noNode(), Collections.emptyList(), Collections.emptyList()))
321324
.collect(Collectors.toList());
322325
topicDescriptions.put(topic, new TopicDescription(topic, false, partitions));
323326
});

tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.kafka.clients.producer.ProducerRecord;
2828
import org.apache.kafka.common.GroupState;
2929
import org.apache.kafka.common.TopicPartition;
30+
import org.apache.kafka.common.errors.LeaderNotAvailableException;
31+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
3032
import org.apache.kafka.common.serialization.ByteArraySerializer;
3133
import org.apache.kafka.common.serialization.StringDeserializer;
3234
import org.apache.kafka.common.test.ClusterInstance;
@@ -81,6 +83,7 @@
8183
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
8284
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
8385
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
86+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
8487
import static org.junit.jupiter.api.Assertions.assertEquals;
8588
import static org.junit.jupiter.api.Assertions.assertThrows;
8689
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -659,6 +662,41 @@ public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance cluster)
659662
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
660663
}
661664

665+
@ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")})
666+
public void testResetOffsetsWithPartitionNoneLeader(ClusterInstance cluster) throws Exception {
667+
String group = generateRandomGroupId();
668+
String topic = generateRandomTopic();
669+
String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":0,1,2",
670+
"--to-earliest", "--execute");
671+
672+
try (Admin admin = cluster.admin();
673+
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
674+
675+
admin.createTopics(singleton(new NewTopic(topic, 3, (short) 1))).all().get();
676+
produceConsumeAndShutdown(cluster, topic, group, 2, GroupProtocol.CLASSIC);
677+
assertDoesNotThrow(() -> resetOffsets(service));
678+
// shutdown a broker to make some partitions missing leader
679+
cluster.shutdownBroker(0);
680+
assertThrows(LeaderNotAvailableException.class, () -> resetOffsets(service));
681+
}
682+
}
683+
684+
@ClusterTest
685+
public void testResetOffsetsWithPartitionNotExist(ClusterInstance cluster) throws Exception {
686+
String group = generateRandomGroupId();
687+
String topic = generateRandomTopic();
688+
String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":2,3",
689+
"--to-earliest", "--execute");
690+
691+
try (Admin admin = cluster.admin();
692+
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
693+
694+
admin.createTopics(singleton(new NewTopic(topic, 1, (short) 1))).all().get();
695+
produceConsumeAndShutdown(cluster, topic, group, 2, GroupProtocol.CLASSIC);
696+
assertThrows(UnknownTopicOrPartitionException.class, () -> resetOffsets(service));
697+
}
698+
}
699+
662700
private String generateRandomTopic() {
663701
return TOPIC_PREFIX + TestUtils.randomString(10);
664702
}

0 commit comments

Comments
 (0)