Skip to content

Commit 05c52cd

Browse files
zengqiaoZQKC
authored andcommitted
[Feature] 集群Group列表按照Group维度进行展示 (#580)
1 parent 586b37c commit 05c52cd

File tree

22 files changed

+737
-199
lines changed

22 files changed

+737
-199
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package com.xiaojukeji.know.streaming.km.biz.group;
22

3+
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
34
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
45
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
56
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
67
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
78
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
89
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS;
10+
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
11+
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO;
912
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO;
1013
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
1114
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
@@ -22,6 +25,10 @@ PaginationResult<GroupTopicOverviewVO> pagingGroupMembers(Long clusterPhyId,
2225
String searchGroupKeyword,
2326
PaginationBaseDTO dto);
2427

28+
PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto);
29+
30+
PaginationResult<GroupOverviewVO> pagingClusterGroupsOverview(Long clusterPhyId, ClusterGroupSummaryDTO dto);
31+
2532
PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetrics(Long clusterPhyId,
2633
String topicName,
2734
String groupName,
@@ -31,4 +38,6 @@ PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetrics(Lon
3138
Result<Set<TopicPartitionKS>> listClusterPhyGroupPartitions(Long clusterPhyId, String groupName, Long startTime, Long endTime);
3239

3340
Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception;
41+
42+
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList (Long clusterPhyId, List<GroupMemberPO> groupMemberPOList);
3443
}

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java

Lines changed: 101 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,30 @@
33
import com.didiglobal.logi.log.ILog;
44
import com.didiglobal.logi.log.LogFactory;
55
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
6+
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
67
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
78
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
89
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
910
import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO;
11+
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
1012
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic;
13+
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember;
1114
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
1215
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
1316
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
1417
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
1518
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS;
1619
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
1720
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
21+
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO;
1822
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO;
1923
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
2024
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
25+
import com.xiaojukeji.know.streaming.km.common.constant.PaginationConstant;
26+
import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter;
2127
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
2228
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
29+
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
2330
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
2431
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
2532
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
@@ -71,30 +78,60 @@ public PaginationResult<GroupTopicOverviewVO> pagingGroupMembers(Long clusterPhy
7178
String searchGroupKeyword,
7279
PaginationBaseDTO dto) {
7380
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto);
74-
if (paginationResult.failed()) {
75-
return PaginationResult.buildFailure(paginationResult, dto);
76-
}
7781

7882
if (!paginationResult.hasData()) {
7983
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
8084
}
8185

82-
// 获取指标
83-
Result<List<GroupMetrics>> metricsListResult = groupMetricService.listLatestMetricsAggByGroupTopicFromES(
84-
clusterPhyId,
85-
paginationResult.getData().getBizData().stream().map(elem -> new GroupTopic(elem.getGroupName(), elem.getTopicName())).collect(Collectors.toList()),
86-
Arrays.asList(GroupMetricVersionItems.GROUP_METRIC_LAG),
87-
AggTypeEnum.MAX
88-
);
89-
if (metricsListResult.failed()) {
90-
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
91-
log.error("method=pagingGroupMembers||clusterPhyId={}||topicName={}||groupName={}||result={}||errMsg=search es failed", clusterPhyId, topicName, groupName, metricsListResult);
86+
List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
87+
88+
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
89+
}
90+
91+
@Override
92+
public PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) {
93+
Group group = groupService.getGroupFromDB(clusterPhyId, groupName);
94+
95+
//没有topicMember则直接返回
96+
if (group == null || ValidateUtils.isEmptyList(group.getTopicMembers())) {
97+
return PaginationResult.buildSuc(dto);
9298
}
9399

94-
return PaginationResult.buildSuc(
95-
this.convert2GroupTopicOverviewVOList(paginationResult.getData().getBizData(), metricsListResult.getData()),
96-
paginationResult
97-
);
100+
//排序
101+
List<GroupTopicMember> groupTopicMembers = PaginationUtil.pageBySort(group.getTopicMembers(), PaginationConstant.DEFAULT_GROUP_TOPIC_SORTED_FIELD, SortTypeEnum.DESC.getSortType());
102+
103+
//分页
104+
PaginationResult<GroupTopicMember> paginationResult = PaginationUtil.pageBySubData(groupTopicMembers, dto);
105+
106+
List<GroupMemberPO> groupMemberPOList = paginationResult.getData().getBizData().stream().map(elem -> new GroupMemberPO(clusterPhyId, elem.getTopicName(), groupName, group.getState().getState(), elem.getMemberCount())).collect(Collectors.toList());
107+
108+
return PaginationResult.buildSuc(this.getGroupTopicOverviewVOList(clusterPhyId, groupMemberPOList), paginationResult);
109+
}
110+
111+
@Override
112+
public PaginationResult<GroupOverviewVO> pagingClusterGroupsOverview(Long clusterPhyId, ClusterGroupSummaryDTO dto) {
113+
List<Group> groupList = groupService.listClusterGroups(clusterPhyId);
114+
115+
// 类型转化
116+
List<GroupOverviewVO> voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList());
117+
118+
// 搜索groupName
119+
voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name"));
120+
121+
//搜索topic
122+
if (!ValidateUtils.isBlank(dto.getSearchTopicName())) {
123+
voList = voList.stream().filter(elem -> {
124+
for (String topicName : elem.getTopicNameList()) {
125+
if (topicName.contains(dto.getSearchTopicName())) {
126+
return true;
127+
}
128+
}
129+
return false;
130+
}).collect(Collectors.toList());
131+
}
132+
133+
// 分页 后 返回
134+
return PaginationUtil.pageBySubData(voList, dto);
98135
}
99136

100137
@Override
@@ -104,7 +141,7 @@ public PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetr
104141
List<String> latestMetricNames,
105142
PaginationSortDTO dto) throws NotExistException, AdminOperateException {
106143
// 获取消费组消费的TopicPartition列表
107-
Map<TopicPartition, Long> consumedOffsetMap = groupService.getGroupOffset(clusterPhyId, groupName);
144+
Map<TopicPartition, Long> consumedOffsetMap = groupService.getGroupOffsetFromKafka(clusterPhyId, groupName);
108145
List<Integer> partitionList = consumedOffsetMap.keySet()
109146
.stream()
110147
.filter(elem -> elem.topic().equals(topicName))
@@ -113,7 +150,7 @@ public PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetr
113150
Collections.sort(partitionList);
114151

115152
// 获取消费组当前运行信息
116-
ConsumerGroupDescription groupDescription = groupService.getGroupDescription(clusterPhyId, groupName);
153+
ConsumerGroupDescription groupDescription = groupService.getGroupDescriptionFromKafka(clusterPhyId, groupName);
117154

118155
// 转换存储格式
119156
Map<TopicPartition, MemberDescription> tpMemberMap = new HashMap<>();
@@ -166,13 +203,13 @@ public Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator)
166203
return rv;
167204
}
168205

169-
ConsumerGroupDescription description = groupService.getGroupDescription(dto.getClusterId(), dto.getGroupName());
206+
ConsumerGroupDescription description = groupService.getGroupDescriptionFromKafka(dto.getClusterId(), dto.getGroupName());
170207
if (ConsumerGroupState.DEAD.equals(description.state()) && !dto.isCreateIfNotExist()) {
171208
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, "group不存在, 重置失败");
172209
}
173210

174211
if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) {
175-
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty | Dead 情况可重置)", GroupStateEnum.getByRawState(description.state()).getState()));
212+
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", GroupStateEnum.getByRawState(description.state()).getState()));
176213
}
177214

178215
// 获取offset
@@ -185,6 +222,22 @@ public Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator)
185222
return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator);
186223
}
187224

225+
@Override
226+
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList) {
227+
// 获取指标
228+
Result<List<GroupMetrics>> metricsListResult = groupMetricService.listLatestMetricsAggByGroupTopicFromES(
229+
clusterPhyId,
230+
groupMemberPOList.stream().map(elem -> new GroupTopic(elem.getGroupName(), elem.getTopicName())).collect(Collectors.toList()),
231+
Arrays.asList(GroupMetricVersionItems.GROUP_METRIC_LAG),
232+
AggTypeEnum.MAX
233+
);
234+
if (metricsListResult.failed()) {
235+
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
236+
log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
237+
}
238+
return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData());
239+
}
240+
188241

189242
/**************************************************** private method ****************************************************/
190243

@@ -293,4 +346,31 @@ private PaginationResult<GroupMetrics> pagingGroupTopicPartitionMetrics(Long clu
293346
);
294347
}
295348

349+
private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(String groupName, String state, List<GroupTopicMember> groupTopicList, List<GroupMetrics> metricsList) {
350+
if (metricsList == null) {
351+
metricsList = new ArrayList<>();
352+
}
353+
354+
// <TopicName, GroupMetrics>
355+
Map<String, GroupMetrics> metricsMap = new HashMap<>();
356+
for (GroupMetrics metrics : metricsList) {
357+
if (!groupName.equals(metrics.getGroup())) continue;
358+
metricsMap.put(metrics.getTopic(), metrics);
359+
}
360+
361+
List<GroupTopicOverviewVO> voList = new ArrayList<>();
362+
for (GroupTopicMember po : groupTopicList) {
363+
GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class);
364+
vo.setGroupName(groupName);
365+
vo.setState(state);
366+
GroupMetrics metrics = metricsMap.get(po.getTopicName());
367+
if (metrics != null) {
368+
vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG)));
369+
}
370+
371+
voList.add(vo);
372+
}
373+
return voList;
374+
}
375+
296376
}

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.xiaojukeji.know.streaming.km.biz.topic;
22

3-
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
3+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
44
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
5+
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
56
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
7+
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
68
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
79
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO;
810
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO;
@@ -23,4 +25,6 @@ public interface TopicStateManager {
2325
Result<List<TopicPartitionVO>> getTopicPartitions(Long clusterPhyId, String topicName, List<String> metricsNames);
2426

2527
Result<TopicBrokersPartitionsSummaryVO> getTopicBrokersPartitionsSummary(Long clusterPhyId, String topicName);
28+
29+
PaginationResult<GroupTopicOverviewVO> pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto);
2630
}

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,22 @@
22

33
import com.didiglobal.logi.log.ILog;
44
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
56
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
7+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
68
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
79
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
810
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
911
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics;
1012
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
1113
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
14+
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
1215
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
1316
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
1417
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
18+
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
1519
import com.xiaojukeji.know.streaming.km.common.bean.vo.broker.BrokerReplicaSummaryVO;
20+
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
1621
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
1722
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO;
1823
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO;
@@ -32,6 +37,7 @@
3237
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
3338
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
3439
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
40+
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
3541
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
3642
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
3743
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
@@ -77,6 +83,12 @@ public class TopicStateManagerImpl implements TopicStateManager {
7783
@Autowired
7884
private TopicConfigService topicConfigService;
7985

86+
@Autowired
87+
private GroupService groupService;
88+
89+
@Autowired
90+
private GroupManager groupManager;
91+
8092
@Override
8193
public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException {
8294
Topic topic = topicService.getTopic(clusterPhyId, topicName);
@@ -346,6 +358,19 @@ public Result<TopicBrokersPartitionsSummaryVO> getTopicBrokersPartitionsSummary(
346358
return Result.buildSuc(vo);
347359
}
348360

361+
@Override
362+
public PaginationResult<GroupTopicOverviewVO> pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto) {
363+
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, "", "", searchGroupName, dto);
364+
365+
if (!paginationResult.hasData()) {
366+
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
367+
}
368+
369+
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
370+
371+
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
372+
}
373+
349374
/**************************************************** private method ****************************************************/
350375

351376
private boolean checkIfIgnore(ConsumerRecord<String, String> consumerRecord, String filterKey, String filterValue) {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
4+
import io.swagger.annotations.ApiModelProperty;
5+
import lombok.Data;
6+
7+
/**
8+
* @author wyb
9+
* @date 2022/10/17
10+
*/
11+
@Data
12+
public class ClusterGroupSummaryDTO extends PaginationBaseDTO {
13+
@ApiModelProperty("查找该Topic")
14+
private String searchTopicName;
15+
16+
@ApiModelProperty("查找该Group")
17+
private String searchGroupName;
18+
}

0 commit comments

Comments
 (0)