Skip to content

Commit 1caab7f

Browse files
authored
[ISSUE-624]过滤掉不存在的Topic(#624)
[ISSUE-624]过滤掉不存在的Topic(#624)
2 parents 6ed1d38 + 9d33c72 commit 1caab7f

File tree

1 file changed

+23
-3
lines changed

1 file changed

+23
-3
lines changed

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
1313
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
1414
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
15+
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
1516
import org.apache.kafka.clients.admin.*;
1617
import org.apache.kafka.common.TopicPartition;
1718
import org.springframework.beans.factory.annotation.Autowired;
18-
import org.springframework.transaction.annotation.Transactional;
1919

2020
import java.util.*;
2121
import java.util.stream.Collectors;
@@ -33,6 +33,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
3333
@Autowired
3434
private GroupService groupService;
3535

36+
@Autowired
37+
private TopicService topicService;
38+
3639
@Override
3740
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
3841

@@ -53,6 +56,7 @@ public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnit
5356
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> groupNameList, long triggerTimeUnitMs) {
5457
List<GroupMemberPO> groupMemberPOList = new ArrayList<>();
5558
TaskResult tr = TaskResult.SUCCESS;
59+
5660
for (String groupName : groupNameList) {
5761
try {
5862
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
@@ -62,7 +66,10 @@ private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> gr
6266
tr = TaskResult.FAIL;
6367
}
6468
}
69+
70+
groupMemberPOList = this.filterGroupIfTopicNotExist(clusterPhy.getId(), groupMemberPOList);
6571
groupService.batchReplace(groupMemberPOList);
72+
6673
return tr;
6774
}
6875

@@ -71,7 +78,7 @@ private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName,
7178

7279
// 获取消费组消费过哪些Topic
7380
Map<TopicPartition, Long> offsetMap = groupService.getGroupOffset(clusterPhyId, groupName);
74-
for (TopicPartition topicPartition: offsetMap.keySet()) {
81+
for (TopicPartition topicPartition : offsetMap.keySet()) {
7582
GroupMemberPO po = groupMap.get(topicPartition.topic());
7683
if (po == null) {
7784
po = new GroupMemberPO(clusterPhyId, topicPartition.topic(), groupName, updateTime);
@@ -94,7 +101,7 @@ private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName,
94101
}
95102

96103
Set<String> topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet());
97-
for (String topicName: topicNameSet) {
104+
for (String topicName : topicNameSet) {
98105
groupMap.putIfAbsent(topicName, new GroupMemberPO(clusterPhyId, topicName, groupName, updateTime));
99106

100107
GroupMemberPO po = groupMap.get(topicName);
@@ -112,4 +119,17 @@ private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName,
112119

113120
return new ArrayList<>(groupMap.values());
114121
}
122+
123+
private List<GroupMemberPO> filterGroupIfTopicNotExist(Long clusterPhyId, List<GroupMemberPO> poList) {
124+
if (poList.isEmpty()) {
125+
return poList;
126+
}
127+
128+
// 集群Topic集合
129+
Set<String> dbTopicSet = topicService.listTopicsFromDB(clusterPhyId).stream().map(elem -> elem.getTopicName()).collect(Collectors.toSet());
130+
dbTopicSet.add(""); //兼容没有消费Topic的group
131+
132+
// 过滤Topic不存在的消费组
133+
return poList.stream().filter(elem -> dbTopicSet.contains(elem.getTopicName())).collect(Collectors.toList());
134+
}
115135
}

0 commit comments

Comments
 (0)