Skip to content

Commit f3e9650

Browse files
author
gnuhpc
committed
Fix the lag caculation bug for pending consumer group
1 parent 49bb957 commit f3e9650

File tree

4 files changed

+28
-31
lines changed

4 files changed

+28
-31
lines changed

docs/index.pdf

0 Bytes
Binary file not shown.

kafka-rest-springboot.iml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,6 @@
109109
<orderEntry type="library" name="Maven: commons-io:commons-io:2.5" level="project" />
110110
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.5" level="project" />
111111
<orderEntry type="library" name="Maven: org.apache.curator:curator-framework:2.11.0" level="project" />
112+
<orderEntry type="library" name="scala-sdk-2.11.8" level="application" />
112113
</component>
113114
</module>

src/main/java/org/gnuhpc/bigdata/model/ConsumerGroupDescFactory.java

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@
44
import kafka.common.OffsetAndMetadata;
55
import kafka.coordinator.GroupTopicPartition;
66
import lombok.AllArgsConstructor;
7-
import lombok.NoArgsConstructor;
87
import org.apache.kafka.clients.consumer.KafkaConsumer;
98
import org.apache.kafka.common.TopicPartition;
109
import org.gnuhpc.bigdata.constant.ConsumerState;
1110
import org.gnuhpc.bigdata.constant.ConsumerType;
1211
import org.gnuhpc.bigdata.utils.KafkaUtils;
13-
import org.springframework.beans.factory.annotation.Autowired;
14-
import org.springframework.stereotype.Component;
1512

1613
import java.util.Map;
1714

@@ -23,27 +20,27 @@ public ConsumerGroupDesc makeOldConsumerGroupDesc(
2320
Map<Integer, Long> fetchOffSetFromZKResultList,
2421
String topic, String consumerGroup, TopicMeta topicMeta) {
2522

26-
ConsumerGroupDesc cgd = ConsumerGroupDesc.newBuilder()
23+
ConsumerGroupDesc.Builder cgdBuilder = ConsumerGroupDesc.newBuilder()
2724
.setGroupName(consumerGroup)
2825
.setTopic(topic)
2926
.setPartitionId(op.getKey())
3027
.setCurrentOffset(fetchOffSetFromZKResultList.get(op.getKey()))
3128
.setLogEndOffset(
3229
topicMeta.getTopicPartitionInfos().stream()
33-
.filter(tpi -> tpi.getPartitionId() == op.getKey()).findFirst().get().getEndOffset())
34-
.build();
30+
.filter(tpi -> tpi.getPartitionId() == op.getKey()).findFirst().get().getEndOffset());
31+
3532

3633
if (op.getValue().equals("none")) {
37-
cgd.setConsumerId("-");
38-
cgd.setHost("-");
39-
cgd.setState(ConsumerState.PENDING);
34+
cgdBuilder.setConsumerId("-");
35+
cgdBuilder.setHost("-");
36+
cgdBuilder.setState(ConsumerState.PENDING);
4037
} else {
41-
cgd.setConsumerId(op.getValue());
42-
cgd.setHost(op.getValue().replace(consumerGroup + "_", ""));
43-
cgd.setState(ConsumerState.RUNNING);
38+
cgdBuilder.setConsumerId(op.getValue());
39+
cgdBuilder.setHost(op.getValue().replace(consumerGroup + "_", ""));
40+
cgdBuilder.setState(ConsumerState.RUNNING);
4441
}
45-
cgd.setType(ConsumerType.OLD);
46-
return cgd;
42+
cgdBuilder.setType(ConsumerType.OLD);
43+
return cgdBuilder.build();
4744
}
4845

4946
public ConsumerGroupDesc makeNewRunningConsumerGroupDesc(
@@ -52,31 +49,31 @@ public ConsumerGroupDesc makeNewRunningConsumerGroupDesc(
5249
Map<Integer, Long> partitionEndOffsetMap,
5350
AdminClient.ConsumerSummary cs) {
5451
KafkaConsumer consumer = kafkaUtils.createNewConsumer(consumerGroup);
55-
ConsumerGroupDesc cgd = ConsumerGroupDesc.newBuilder()
52+
ConsumerGroupDesc.Builder cgdBuilder = ConsumerGroupDesc.newBuilder()
5653
.setGroupName(consumerGroup)
5754
.setTopic(tp.topic())
5855
.setPartitionId(tp.partition())
5956
.setConsumerId(cs.clientId())
6057
.setHost(cs.clientHost())
6158
.setState(ConsumerState.RUNNING)
62-
.setType(ConsumerType.NEW).build();
59+
.setType(ConsumerType.NEW);
6360

6461
long currentOffset = -1L;
6562
org.apache.kafka.clients.consumer.OffsetAndMetadata offset = consumer.committed(new TopicPartition(tp.topic(), tp.partition()));
6663
if (offset != null) {
6764
currentOffset = offset.offset();
6865
}
69-
cgd.setCurrentOffset(currentOffset);
66+
cgdBuilder.setCurrentOffset(currentOffset);
7067

7168
Long endOffset = partitionEndOffsetMap.get(tp.partition());
7269
if (endOffset == null) { //if endOffset is null ,the partition of this topic has no leader replication
73-
cgd.setLogEndOffset(-1l);
70+
cgdBuilder.setLogEndOffset(-1l);
7471
} else {
75-
cgd.setLogEndOffset(endOffset);
72+
cgdBuilder.setLogEndOffset(endOffset);
7673
}
7774
consumer.close();
7875

79-
return cgd;
76+
return cgdBuilder.build();
8077
}
8178

8279
public ConsumerGroupDesc makeNewPendingConsumerGroupDesc(
@@ -85,26 +82,25 @@ public ConsumerGroupDesc makeNewPendingConsumerGroupDesc(
8582
Map<Integer, Long> partitionCurrentOffsetMap,
8683
Map.Entry<GroupTopicPartition, OffsetAndMetadata> storage,
8784
String topic) {
88-
KafkaConsumer consumer = kafkaUtils.createNewConsumer(consumerGroup);
89-
ConsumerGroupDesc cgd = ConsumerGroupDesc.newBuilder()
85+
int partitionId = storage.getKey().topicPartition().partition();
86+
ConsumerGroupDesc.Builder cgdBuilder = ConsumerGroupDesc.newBuilder()
9087
.setGroupName(consumerGroup)
9188
.setTopic(topic)
9289
.setConsumerId("-")
93-
.setPartitionId(storage.getKey().topicPartition().partition())
90+
.setPartitionId(partitionId)
9491
.setCurrentOffset(partitionCurrentOffsetMap.get(storage.getKey().topicPartition().partition()))
9592
.setHost("-")
9693
.setState(ConsumerState.PENDING)
97-
.setType(ConsumerType.NEW)
98-
.build();
94+
.setType(ConsumerType.NEW);
9995

100-
Long endOffset = partitionEndOffsetMap.get(cgd.getPartitionId());
96+
Long endOffset = partitionEndOffsetMap.get(partitionId);
10197

10298
if (endOffset == null) { //if endOffset is null ,the partition of this topic has no leader replication
103-
cgd.setLogEndOffset(-1l);
99+
cgdBuilder.setLogEndOffset(-1l);
104100
} else {
105-
cgd.setLogEndOffset(endOffset);
101+
cgdBuilder.setLogEndOffset(endOffset);
106102
}
107103

108-
return cgd;
104+
return cgdBuilder.build();
109105
}
110106
}

src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,14 +648,14 @@ private List<ConsumerGroupDesc> setPendingNewCGD(
648648
ConsumerGroupDescFactory factory = new ConsumerGroupDescFactory(kafkaUtils);
649649
cgdList.addAll(
650650
topicStorage.entrySet().stream().map(
651-
storage-> factory.makeNewPendingConsumerGroupDesc(
651+
storage -> factory.makeNewPendingConsumerGroupDesc(
652652
consumerGroup,
653653
partitionEndOffsetMap,
654654
partitionCurrentOffsetMap,
655655
storage,
656656
topic)
657657
)
658-
.collect(toList()));
658+
.collect(toList()));
659659

660660
return cgdList;
661661
}

0 commit comments

Comments
 (0)