Skip to content

Commit f9fb3ff

Browse files
author
gnuhpc
committed
Add a debug endpoint and support multi topics from one consumer group
1 parent f3e9650 commit f9fb3ff

File tree

5 files changed

+107
-130
lines changed

5 files changed

+107
-130
lines changed

docs/index.pdf

0 Bytes
Binary file not shown.

src/main/java/org/gnuhpc/bigdata/controller/DebugController.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package org.gnuhpc.bigdata.controller;
22

33
import io.swagger.annotations.Api;
4+
import kafka.common.OffsetAndMetadata;
5+
import kafka.coordinator.GroupTopicPartition;
6+
import org.gnuhpc.bigdata.componet.OffsetStorage;
47
import org.springframework.beans.factory.annotation.Autowired;
58
import org.springframework.context.ApplicationContext;
69
import org.springframework.web.bind.annotation.RequestMapping;
@@ -23,6 +26,9 @@ public class DebugController {
2326
@Autowired
2427
ApplicationContext appContext;
2528

29+
@Autowired
30+
OffsetStorage offsetStorage;
31+
2632
@RequestMapping("/beans")
2733
public Map<String, String[]> beans(@RequestParam(required = false) String q) {
2834
Map<String, String[]> retMap = new HashMap<>();
@@ -37,4 +43,9 @@ public Map<String, String[]> beans(@RequestParam(required = false) String q) {
3743
retMap.put("beans", retArray);
3844
return retMap;
3945
}
46+
47+
@RequestMapping("/offsets")
48+
public Map<String, Map<GroupTopicPartition, OffsetAndMetadata>> offsets(){
49+
return offsetStorage.getMap();
50+
}
4051
}

src/main/java/org/gnuhpc/bigdata/controller/KafkaController.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,17 +210,17 @@ public Map<String, Set<String>> listAllConsumerGroups(
210210

211211
@GetMapping(value = "/consumergroups/{consumerGroup}/{type}/topic")
212212
@ApiOperation(value = "Get the topics involved of the specify consumer group")
213-
public List<String> listTopicByCG(@PathVariable String consumerGroup,
213+
public Set<String> listTopicByCG(@PathVariable String consumerGroup,
214214
@PathVariable ConsumerType type){
215215
return kafkaAdminService.listTopicsByCG(consumerGroup,type);
216216

217217
}
218218

219219
@GetMapping(value = "/consumergroups/{consumerGroup}/{type}/topic/{topic}")
220220
@ApiOperation(value = "Describe consumer groups by topic, showing lag and offset")
221-
public List<ConsumerGroupDesc> describeCGByGroup(@ConsumerGroupExistConstraint @PathVariable String consumerGroup,
222-
@PathVariable ConsumerType type,
223-
@PathVariable String topic) {
221+
public List<ConsumerGroupDesc> describeCGByTopic(@ConsumerGroupExistConstraint @PathVariable String consumerGroup,
222+
@PathVariable ConsumerType type,
223+
@PathVariable String topic) {
224224
if (!Strings.isNullOrEmpty(topic)) {
225225
existTopic(topic);
226226
} else {

src/main/java/org/gnuhpc/bigdata/model/ConsumerGroupDescFactory.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010
import org.gnuhpc.bigdata.constant.ConsumerType;
1111
import org.gnuhpc.bigdata.utils.KafkaUtils;
1212

13+
import java.util.Arrays;
1314
import java.util.Map;
15+
import java.util.stream.Collectors;
1416

1517
@AllArgsConstructor
1618
public class ConsumerGroupDescFactory {
1719
private KafkaUtils kafkaUtils;
20+
1821
public ConsumerGroupDesc makeOldConsumerGroupDesc(
1922
Map.Entry<Integer, String> op,
2023
Map<Integer, Long> fetchOffSetFromZKResultList,
@@ -59,6 +62,7 @@ public ConsumerGroupDesc makeNewRunningConsumerGroupDesc(
5962
.setType(ConsumerType.NEW);
6063

6164
long currentOffset = -1L;
65+
6266
org.apache.kafka.clients.consumer.OffsetAndMetadata offset = consumer.committed(new TopicPartition(tp.topic(), tp.partition()));
6367
if (offset != null) {
6468
currentOffset = offset.offset();
@@ -79,16 +83,17 @@ public ConsumerGroupDesc makeNewRunningConsumerGroupDesc(
7983
public ConsumerGroupDesc makeNewPendingConsumerGroupDesc(
8084
String consumerGroup,
8185
Map<Integer, Long> partitionEndOffsetMap,
82-
Map<Integer, Long> partitionCurrentOffsetMap,
83-
Map.Entry<GroupTopicPartition, OffsetAndMetadata> storage,
86+
Map.Entry<GroupTopicPartition, OffsetAndMetadata> topicStorage,
8487
String topic) {
85-
int partitionId = storage.getKey().topicPartition().partition();
88+
Long partitionCurrentOffset = (topicStorage.getValue() == null) ? -1l: topicStorage.getValue().offset();
89+
90+
int partitionId = topicStorage.getKey().topicPartition().partition();
8691
ConsumerGroupDesc.Builder cgdBuilder = ConsumerGroupDesc.newBuilder()
8792
.setGroupName(consumerGroup)
8893
.setTopic(topic)
8994
.setConsumerId("-")
9095
.setPartitionId(partitionId)
91-
.setCurrentOffset(partitionCurrentOffsetMap.get(storage.getKey().topicPartition().partition()))
96+
.setCurrentOffset(partitionCurrentOffset)
9297
.setHost("-")
9398
.setState(ConsumerState.PENDING)
9499
.setType(ConsumerType.NEW);

src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java

Lines changed: 83 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerRecord;
2727
import org.apache.kafka.clients.consumer.ConsumerRecords;
2828
import org.apache.kafka.clients.consumer.KafkaConsumer;
29+
import org.apache.kafka.common.KafkaException;
2930
import org.apache.kafka.common.Node;
3031
import org.apache.kafka.common.PartitionInfo;
3132
import org.apache.kafka.common.TopicPartition;
@@ -461,7 +462,7 @@ private Set<String> listNewConsumerGroupsByTopic(@TopicExistConstraint String to
461462
return result;
462463
}
463464

464-
public List<ConsumerGroupDesc> describeOldCGByTopic(String consumerGroup, String topic) {
465+
public List<ConsumerGroupDesc> describeOldCGByTopic(String consumerGroup, @TopicExistConstraint String topic) {
465466
if (!isOldConsumerGroup(consumerGroup)) {
466467
throw new RuntimeException(consumerGroup + " non-exist");
467468
}
@@ -545,17 +546,10 @@ public List<ConsumerGroupDesc> describeNewCGByTopic(String consumerGroup,
545546
throw new RuntimeException(consumerGroup + " non-exist!");
546547
}
547548

548-
if (isConsumerGroupActive(consumerGroup, ConsumerType.NEW)) {
549-
return setRunningNewCGD(consumerGroup, topic);
550-
} else {
551-
return setPendingNewCGD(consumerGroup, topic);
552-
}
549+
return setNewCGD(consumerGroup, topic);
553550
}
554551

555-
//Set consumergroupdescription for running new consumer group
556-
private List<ConsumerGroupDesc> setRunningNewCGD(
557-
String consumerGroup,
558-
String topic) {
552+
private List<ConsumerGroupDesc> setNewCGD(String consumerGroup, String topic) {
559553
List<ConsumerGroupDesc> cgdList = new ArrayList<>();
560554
AdminClient adminClient = kafkaUtils.createAdminClient();
561555

@@ -564,102 +558,70 @@ private List<ConsumerGroupDesc> setRunningNewCGD(
564558
//Nothing about this consumer group obtained, return an empty map directly
565559
adminClient.close();
566560

567-
if (consumerSummaryList.size() == 0) {
568-
return null;
569-
}
570-
571-
//Get the meta information of the topic
572-
TopicMeta topicMeta = describeTopic(topic);
573-
//Construct <PartitionID, End offset> Map
574-
Map<Integer, Long> partitionEndOffsetMap = topicMeta.getTopicPartitionInfos().stream()
575-
.collect(Collectors.toMap(
576-
tpi -> tpi.getPartitionId(),
577-
tpi -> tpi.getEndOffset()
578-
)
579-
);
580-
581-
582-
ConsumerGroupDescFactory factory = new ConsumerGroupDescFactory(kafkaUtils);
583-
for (AdminClient.ConsumerSummary cs : consumerSummaryList) {
584-
List<TopicPartition> assignment = CollectionConvertor.listConvertJavaList(cs.assignment());
585-
//Second get the current offset of each partition in this topic
586-
587-
cgdList.addAll(assignment.stream().map(tp ->
588-
factory.makeNewRunningConsumerGroupDesc(tp, consumerGroup, partitionEndOffsetMap, cs)
589-
).collect(toList()));
590-
}
591-
return cgdList;
592-
593-
}
594-
595-
//Set consumergroupdescription for pending new consumer group
596-
private List<ConsumerGroupDesc> setPendingNewCGD(
597-
String consumerGroup, String topic) {
598-
Map<GroupTopicPartition, OffsetAndMetadata> storageMap = storage.get(consumerGroup);
599-
List<ConsumerGroupDesc> cgdList;
600-
ConsumerState state = ConsumerState.PENDING;
601-
ConsumerType type = ConsumerType.NEW;
602-
ConsumerGroupDesc cgd;
603-
List<String> topicList;
604-
Map<Integer, Long> partitionCurrentOffsetMap;
561+
List<AdminClient.ConsumerSummary> filteredCSList = consumerSummaryList.parallelStream()
562+
.filter(cs ->
563+
CollectionConvertor.listConvertJavaList(cs.assignment()).parallelStream()
564+
.filter(tp -> tp.topic().equals(topic)).count() != 0)
565+
.collect(toList());
605566

606-
//Nothing about this consumer group obtained, return an empty map directly
607-
if (storageMap == null) {
608-
return null;
609-
}
567+
//Prepare the common metrics no matter the cg is active or not.
610568

611-
//Get the metadata of the topic
569+
//1. Get the meta information of the topic
612570
TopicMeta topicMeta = describeTopic(topic);
613571

614-
//Construct <PartitionID, EndOffset> Map
572+
//2. Get the log end offset for every partition
615573
Map<Integer, Long> partitionEndOffsetMap = topicMeta.getTopicPartitionInfos().stream()
616574
.collect(Collectors.toMap(
617575
tpi -> tpi.getPartitionId(),
618576
tpi -> tpi.getEndOffset()
619577
)
620578
);
579+
if (filteredCSList.size() == 0) {//For Pending consumer group
621580

622-
//First get the information of this topic
623-
Map<GroupTopicPartition, OffsetAndMetadata> topicStorage = new HashMap<>();
624-
cgdList = new ArrayList<>();
625-
//Second get the current offset of each partition in this topic.
626-
//Try to fetch it from in-memory storage map
627-
for (Map.Entry<GroupTopicPartition, OffsetAndMetadata> e : storageMap.entrySet()) {
628-
if (e.getKey().topicPartition().topic().equals(topic)) {
629-
topicStorage.put(e.getKey(), e.getValue());
581+
//Even from the offsetstorage, nothing about this consumer group obtained
582+
// In this case, return an empty map directly.
583+
Map<GroupTopicPartition, OffsetAndMetadata> storageMap = storage.get(consumerGroup);
584+
if (storageMap == null) {
585+
return null;
630586
}
631-
}
632-
633-
//Construct <PartitionID, current offset> Map
634-
partitionCurrentOffsetMap = topicStorage.entrySet().stream()
635-
.filter(e -> e.getKey().topicPartition().topic().equals(topic))
636-
.collect(Collectors.toMap(
637-
e -> e.getKey().topicPartition().partition(),
638-
e -> {
639-
if (e.getValue() == null) {
640-
return -1l;
641-
} else {
642-
return e.getValue().offset();
643-
}
644-
}
645-
));
646587

588+
//Get the current offset of each partition in this topic.
589+
Map<GroupTopicPartition, OffsetAndMetadata> topicStorage = new HashMap<>();
590+
for (Map.Entry<GroupTopicPartition, OffsetAndMetadata> e : storageMap.entrySet()) {
591+
if (e.getKey().topicPartition().topic().equals(topic)) {
592+
topicStorage.put(e.getKey(), e.getValue());
593+
}
594+
}
647595

648-
ConsumerGroupDescFactory factory = new ConsumerGroupDescFactory(kafkaUtils);
649-
cgdList.addAll(
650-
topicStorage.entrySet().stream().map(
651-
storage -> factory.makeNewPendingConsumerGroupDesc(
652-
consumerGroup,
653-
partitionEndOffsetMap,
654-
partitionCurrentOffsetMap,
655-
storage,
656-
topic)
657-
)
658-
.collect(toList()));
596+
//Build consumer group description
597+
ConsumerGroupDescFactory factory = new ConsumerGroupDescFactory(kafkaUtils);
598+
cgdList.addAll(
599+
topicStorage.entrySet().stream().map(
600+
storage -> factory.makeNewPendingConsumerGroupDesc(
601+
consumerGroup,
602+
partitionEndOffsetMap,
603+
storage,
604+
topic)
605+
).collect(toList()));
606+
607+
} else { //For running consumer group
608+
//Build consumer group description
609+
ConsumerGroupDescFactory factory = new ConsumerGroupDescFactory(kafkaUtils);
610+
for (AdminClient.ConsumerSummary cs : filteredCSList) {
611+
List<TopicPartition> assignment = CollectionConvertor.listConvertJavaList(cs.assignment());
612+
//Second get the current offset of each partition in this topic
613+
614+
cgdList.addAll(assignment.parallelStream()
615+
.filter(tp->tp.topic().equals(topic))
616+
.map(tp -> factory.makeNewRunningConsumerGroupDesc(tp, consumerGroup, partitionEndOffsetMap, cs)
617+
).collect(toList()));
618+
}
619+
}
659620

660621
return cgdList;
661622
}
662623

624+
663625
public String getMessage(@TopicExistConstraint String topic, int partition, long offset, String decoder, String avroSchema) {
664626
KafkaConsumer consumer = kafkaUtils.createNewConsumer();
665627
TopicPartition tp = new TopicPartition(topic, partition);
@@ -677,23 +639,20 @@ public String getMessage(@TopicExistConstraint String topic, int partition, long
677639
)
678640
);
679641
}
680-
consumer.assign(Arrays.asList(tp));
642+
consumer.assign(Collections.singletonList(tp));
681643
consumer.seek(tp, offset);
682644

683-
String last;
684-
685-
while (true) {
686-
ConsumerRecords<String, String> crs = consumer.poll(channelRetryBackoffMs);
687-
if (crs.count() != 0) {
688-
Iterator<ConsumerRecord<String, String>> it = crs.iterator();
645+
String last = null;
689646

647+
ConsumerRecords<String, String> crs = consumer.poll(channelRetryBackoffMs);
648+
if (crs.count() != 0) {
649+
Iterator<ConsumerRecord<String, String>> it = crs.iterator();
650+
while (it.hasNext()) {
690651
ConsumerRecord<String, String> initCr = it.next();
691-
last = initCr.value() + String.valueOf(initCr.offset());
692-
while (it.hasNext()) {
693-
ConsumerRecord<String, String> cr = it.next();
694-
last = cr.value() + String.valueOf(cr.offset());
652+
last = "Value: " + initCr.value() + ", Offset: " + String.valueOf(initCr.offset());
653+
if (last != null && initCr.offset() == offset) {
654+
break;
695655
}
696-
break;
697656
}
698657
}
699658

@@ -984,8 +943,8 @@ public boolean isNewConsumerGroup(String consumerGroup) {
984943
}
985944

986945

987-
public List<String> listTopicsByCG(String consumerGroup, ConsumerType type) {
988-
List<String> topicList = null;
946+
public Set<String> listTopicsByCG(String consumerGroup, ConsumerType type) {
947+
Set<String> topicList = new HashSet<>();
989948

990949
if (type == null) {
991950
throw new ApiException("Unknown Type " + type);
@@ -996,39 +955,41 @@ public List<String> listTopicsByCG(String consumerGroup, ConsumerType type) {
996955
throw new RuntimeException(consumerGroup + " non-exist");
997956
}
998957

999-
topicList = CollectionConvertor.seqConvertJavaList(zkUtils.getTopicsByConsumerGroup(consumerGroup));
958+
topicList = new HashSet<>(
959+
CollectionConvertor.seqConvertJavaList(zkUtils.getTopicsByConsumerGroup(consumerGroup))
960+
);
1000961
} else if (type == ConsumerType.NEW) {
1001962
if (!isNewConsumerGroup(consumerGroup)) {
1002963
throw new RuntimeException(consumerGroup + " non-exist!");
1003964
}
1004965

1005-
if (isConsumerGroupActive(consumerGroup, ConsumerType.NEW)) {
1006-
AdminClient adminClient = kafkaUtils.createAdminClient();
966+
AdminClient adminClient = kafkaUtils.createAdminClient();
1007967

1008-
List<AdminClient.ConsumerSummary> consumerSummaryList =
1009-
CollectionConvertor.listConvertJavaList(adminClient.describeConsumerGroup(consumerGroup));
1010-
//Nothing about this consumer group obtained, return an empty map directly
1011-
adminClient.close();
968+
List<AdminClient.ConsumerSummary> consumerSummaryList =
969+
CollectionConvertor.listConvertJavaList(adminClient.describeConsumerGroup(consumerGroup));
970+
//Nothing about this consumer group obtained, return an empty map directly
971+
adminClient.close();
1012972

1013-
if (consumerSummaryList.size() == 0) {
1014-
return null;
1015-
} else {
1016-
//Get topic list and filter if topic is set
1017-
topicList = consumerSummaryList.stream().flatMap(
1018-
cs -> CollectionConvertor.listConvertJavaList(cs.assignment()).stream())
1019-
.map(tp -> tp.topic()).distinct()
1020-
.collect(toList());
1021-
}
1022-
} else {
973+
if (isConsumerGroupActive(consumerGroup, ConsumerType.NEW) &&
974+
consumerSummaryList.size() != 0) {
975+
976+
//Get topic list and filter if topic is set
977+
topicList.addAll(consumerSummaryList.stream().flatMap(
978+
cs -> CollectionConvertor.listConvertJavaList(cs.assignment()).stream())
979+
.map(tp -> tp.topic()).distinct()
980+
.collect(toList()));
981+
}
982+
983+
if (consumerSummaryList.size() == 0) { //PENDING Consumer Group
1023984
Map<GroupTopicPartition, OffsetAndMetadata> storageMap = storage.get(consumerGroup);
1024985
if (storageMap == null) {
1025986
return null;
1026987
}
1027988

1028989
//Fetch the topics involved by consumer. And filter it by topic name
1029-
topicList = storageMap.entrySet().stream()
990+
topicList.addAll(storageMap.entrySet().stream()
1030991
.map(e -> e.getKey().topicPartition().topic()).distinct()
1031-
.collect(toList());
992+
.collect(toList()));
1032993
}
1033994
} else {
1034995
throw new ApiException("Unknown Type " + type);
@@ -1040,7 +1001,7 @@ public List<String> listTopicsByCG(String consumerGroup, ConsumerType type) {
10401001

10411002
public Map<String, List<ConsumerGroupDesc>> describeConsumerGroup(String consumerGroup, ConsumerType type) {
10421003
Map<String, List<ConsumerGroupDesc>> result = new HashMap<>();
1043-
List<String> topicList = listTopicsByCG(consumerGroup, type);
1004+
Set<String> topicList = listTopicsByCG(consumerGroup, type);
10441005
if (topicList == null) {
10451006
//Return empty result
10461007
return result;

0 commit comments

Comments
 (0)