1414import com .xiaojukeji .know .streaming .km .common .bean .vo .cluster .res .ClusterBrokersOverviewVO ;
1515import com .xiaojukeji .know .streaming .km .common .bean .vo .cluster .res .ClusterBrokersStateVO ;
1616import com .xiaojukeji .know .streaming .km .common .bean .vo .kafkacontroller .KafkaControllerVO ;
17+ import com .xiaojukeji .know .streaming .km .common .constant .KafkaConstant ;
1718import com .xiaojukeji .know .streaming .km .common .enums .SortTypeEnum ;
1819import com .xiaojukeji .know .streaming .km .common .utils .PaginationMetricsUtil ;
1920import com .xiaojukeji .know .streaming .km .common .utils .PaginationUtil ;
@@ -71,14 +72,18 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
7172 Topic groupTopic = topicService .getTopic (clusterPhyId , org .apache .kafka .common .internals .Topic .GROUP_METADATA_TOPIC_NAME );
7273 Topic transactionTopic = topicService .getTopic (clusterPhyId , org .apache .kafka .common .internals .Topic .TRANSACTION_STATE_TOPIC_NAME );
7374
75+ //获取controller信息
76+ KafkaController kafkaController = kafkaControllerService .getKafkaControllerFromDB (clusterPhyId );
77+
7478 // 格式转换
7579 return PaginationResult .buildSuc (
7680 this .convert2ClusterBrokersOverviewVOList (
7781 paginationResult .getData ().getBizData (),
7882 brokerList ,
7983 metricsResult .getData (),
8084 groupTopic ,
81- transactionTopic
85+ transactionTopic ,
86+ kafkaController
8287 ),
8388 paginationResult
8489 );
@@ -159,7 +164,8 @@ private List<ClusterBrokersOverviewVO> convert2ClusterBrokersOverviewVOList(List
159164 List <Broker > brokerList ,
160165 List <BrokerMetrics > metricsList ,
161166 Topic groupTopic ,
162- Topic transactionTopic ) {
167+ Topic transactionTopic ,
168+ KafkaController kafkaController ) {
163169 Map <Integer , BrokerMetrics > metricsMap = metricsList == null ? new HashMap <>(): metricsList .stream ().collect (Collectors .toMap (BrokerMetrics ::getBrokerId , Function .identity ()));
164170
165171 Map <Integer , Broker > brokerMap = brokerList == null ? new HashMap <>(): brokerList .stream ().collect (Collectors .toMap (Broker ::getBrokerId , Function .identity ()));
@@ -169,12 +175,12 @@ private List<ClusterBrokersOverviewVO> convert2ClusterBrokersOverviewVOList(List
169175 Broker broker = brokerMap .get (brokerId );
170176 BrokerMetrics brokerMetrics = metricsMap .get (brokerId );
171177
172- voList .add (this .convert2ClusterBrokersOverviewVO (brokerId , broker , brokerMetrics , groupTopic , transactionTopic ));
178+ voList .add (this .convert2ClusterBrokersOverviewVO (brokerId , broker , brokerMetrics , groupTopic , transactionTopic , kafkaController ));
173179 }
174180 return voList ;
175181 }
176182
177- private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO (Integer brokerId , Broker broker , BrokerMetrics brokerMetrics , Topic groupTopic , Topic transactionTopic ) {
183+ private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO (Integer brokerId , Broker broker , BrokerMetrics brokerMetrics , Topic groupTopic , Topic transactionTopic , KafkaController kafkaController ) {
178184 ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO ();
179185 clusterBrokersOverviewVO .setBrokerId (brokerId );
180186 if (broker != null ) {
@@ -192,6 +198,9 @@ private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer broker
192198 if (transactionTopic != null && transactionTopic .getBrokerIdSet ().contains (brokerId )) {
193199 clusterBrokersOverviewVO .getKafkaRoleList ().add (transactionTopic .getTopicName ());
194200 }
201+ if (kafkaController != null && kafkaController .getBrokerId ().equals (brokerId )) {
202+ clusterBrokersOverviewVO .getKafkaRoleList ().add (KafkaConstant .CONTROLLER_ROLE );
203+ }
195204
196205 clusterBrokersOverviewVO .setLatestMetrics (brokerMetrics );
197206 return clusterBrokersOverviewVO ;
0 commit comments