1212import com .xiaojukeji .know .streaming .km .common .exception .NotExistException ;
1313import com .xiaojukeji .know .streaming .km .common .utils .ValidateUtils ;
1414import com .xiaojukeji .know .streaming .km .core .service .group .GroupService ;
15+ import com .xiaojukeji .know .streaming .km .core .service .topic .TopicService ;
1516import org .apache .kafka .clients .admin .*;
1617import org .apache .kafka .common .TopicPartition ;
1718import org .springframework .beans .factory .annotation .Autowired ;
18- import org .springframework .transaction .annotation .Transactional ;
1919
2020import java .util .*;
2121import java .util .stream .Collectors ;
@@ -33,6 +33,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
3333 @ Autowired
3434 private GroupService groupService ;
3535
36+ @ Autowired
37+ private TopicService topicService ;
38+
3639 @ Override
3740 public TaskResult processClusterTask (ClusterPhy clusterPhy , long triggerTimeUnitMs ) throws Exception {
3841
@@ -53,6 +56,7 @@ public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnit
5356 private TaskResult updateGroupMembersTask (ClusterPhy clusterPhy , List <String > groupNameList , long triggerTimeUnitMs ) {
5457 List <GroupMemberPO > groupMemberPOList = new ArrayList <>();
5558 TaskResult tr = TaskResult .SUCCESS ;
59+
5660 for (String groupName : groupNameList ) {
5761 try {
5862 List <GroupMemberPO > poList = this .getGroupMembers (clusterPhy .getId (), groupName , new Date (triggerTimeUnitMs ));
@@ -62,7 +66,10 @@ private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> gr
6266 tr = TaskResult .FAIL ;
6367 }
6468 }
69+
70+ groupMemberPOList = this .filterGroupIfTopicNotExist (clusterPhy .getId (), groupMemberPOList );
6571 groupService .batchReplace (groupMemberPOList );
72+
6673 return tr ;
6774 }
6875
@@ -71,7 +78,7 @@ private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName,
7178
7279 // 获取消费组消费过哪些Topic
7380 Map <TopicPartition , Long > offsetMap = groupService .getGroupOffset (clusterPhyId , groupName );
74- for (TopicPartition topicPartition : offsetMap .keySet ()) {
81+ for (TopicPartition topicPartition : offsetMap .keySet ()) {
7582 GroupMemberPO po = groupMap .get (topicPartition .topic ());
7683 if (po == null ) {
7784 po = new GroupMemberPO (clusterPhyId , topicPartition .topic (), groupName , updateTime );
@@ -94,7 +101,7 @@ private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName,
94101 }
95102
96103 Set <String > topicNameSet = partitionList .stream ().map (elem -> elem .topic ()).collect (Collectors .toSet ());
97- for (String topicName : topicNameSet ) {
104+ for (String topicName : topicNameSet ) {
98105 groupMap .putIfAbsent (topicName , new GroupMemberPO (clusterPhyId , topicName , groupName , updateTime ));
99106
100107 GroupMemberPO po = groupMap .get (topicName );
@@ -112,4 +119,17 @@ private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName,
112119
113120 return new ArrayList <>(groupMap .values ());
114121 }
122+
123+ private List <GroupMemberPO > filterGroupIfTopicNotExist (Long clusterPhyId , List <GroupMemberPO > poList ) {
124+ if (poList .isEmpty ()) {
125+ return poList ;
126+ }
127+
128+ // 集群Topic集合
129+ Set <String > dbTopicSet = topicService .listTopicsFromDB (clusterPhyId ).stream ().map (elem -> elem .getTopicName ()).collect (Collectors .toSet ());
130+ dbTopicSet .add ("" ); //兼容没有消费Topic的group
131+
132+ // 过滤Topic不存在的消费组
133+ return poList .stream ().filter (elem -> dbTopicSet .contains (elem .getTopicName ())).collect (Collectors .toList ());
134+ }
115135}
0 commit comments