2424import com .xiaojukeji .know .streaming .km .core .service .broker .BrokerService ;
2525import com .xiaojukeji .know .streaming .km .core .service .kafkacontroller .KafkaControllerService ;
2626import com .xiaojukeji .know .streaming .km .core .service .topic .TopicService ;
27+ import com .xiaojukeji .know .streaming .km .persistence .kafka .KafkaJMXClient ;
2728import org .springframework .beans .factory .annotation .Autowired ;
2829import org .springframework .stereotype .Service ;
2930
@@ -51,6 +52,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
5152 @ Autowired
5253 private KafkaControllerService kafkaControllerService ;
5354
55+ @ Autowired
56+ private KafkaJMXClient kafkaJMXClient ;
57+
5458 @ Override
5559 public PaginationResult <ClusterBrokersOverviewVO > getClusterPhyBrokersOverview (Long clusterPhyId , ClusterBrokersOverviewDTO dto ) {
5660 // 获取集群Broker列表
@@ -75,6 +79,10 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
7579 //获取controller信息
7680 KafkaController kafkaController = kafkaControllerService .getKafkaControllerFromDB (clusterPhyId );
7781
82+ //获取jmx状态信息
83+ Map <Integer , Boolean > jmxConnectedMap = new HashMap <>();
84+ brokerList .forEach (elem -> jmxConnectedMap .put (elem .getBrokerId (), kafkaJMXClient .getClientWithCheck (clusterPhyId , elem .getBrokerId ()) != null ));
85+
7886 // 格式转换
7987 return PaginationResult .buildSuc (
8088 this .convert2ClusterBrokersOverviewVOList (
@@ -83,7 +91,8 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
8391 metricsResult .getData (),
8492 groupTopic ,
8593 transactionTopic ,
86- kafkaController
94+ kafkaController ,
95+ jmxConnectedMap
8796 ),
8897 paginationResult
8998 );
@@ -165,22 +174,24 @@ private List<ClusterBrokersOverviewVO> convert2ClusterBrokersOverviewVOList(List
165174 List <BrokerMetrics > metricsList ,
166175 Topic groupTopic ,
167176 Topic transactionTopic ,
168- KafkaController kafkaController ) {
169- Map <Integer , BrokerMetrics > metricsMap = metricsList == null ? new HashMap <>(): metricsList .stream ().collect (Collectors .toMap (BrokerMetrics ::getBrokerId , Function .identity ()));
177+ KafkaController kafkaController ,
178+ Map <Integer , Boolean > jmxConnectedMap ) {
179+ Map <Integer , BrokerMetrics > metricsMap = metricsList == null ? new HashMap <>() : metricsList .stream ().collect (Collectors .toMap (BrokerMetrics ::getBrokerId , Function .identity ()));
170180
171- Map <Integer , Broker > brokerMap = brokerList == null ? new HashMap <>(): brokerList .stream ().collect (Collectors .toMap (Broker ::getBrokerId , Function .identity ()));
181+ Map <Integer , Broker > brokerMap = brokerList == null ? new HashMap <>() : brokerList .stream ().collect (Collectors .toMap (Broker ::getBrokerId , Function .identity ()));
172182
173183 List <ClusterBrokersOverviewVO > voList = new ArrayList <>(pagedBrokerIdList .size ());
174184 for (Integer brokerId : pagedBrokerIdList ) {
175185 Broker broker = brokerMap .get (brokerId );
176186 BrokerMetrics brokerMetrics = metricsMap .get (brokerId );
187+ Boolean jmxConnected = jmxConnectedMap .get (brokerId );
177188
178- voList .add (this .convert2ClusterBrokersOverviewVO (brokerId , broker , brokerMetrics , groupTopic , transactionTopic , kafkaController ));
189+ voList .add (this .convert2ClusterBrokersOverviewVO (brokerId , broker , brokerMetrics , groupTopic , transactionTopic , kafkaController , jmxConnected ));
179190 }
180191 return voList ;
181192 }
182193
183- private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO (Integer brokerId , Broker broker , BrokerMetrics brokerMetrics , Topic groupTopic , Topic transactionTopic , KafkaController kafkaController ) {
194+ private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO (Integer brokerId , Broker broker , BrokerMetrics brokerMetrics , Topic groupTopic , Topic transactionTopic , KafkaController kafkaController , Boolean jmxConnected ) {
184195 ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO ();
185196 clusterBrokersOverviewVO .setBrokerId (brokerId );
186197 if (broker != null ) {
@@ -203,6 +214,7 @@ private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer broker
203214 }
204215
205216 clusterBrokersOverviewVO .setLatestMetrics (brokerMetrics );
217+ clusterBrokersOverviewVO .setJmxConnected (jmxConnected );
206218 return clusterBrokersOverviewVO ;
207219 }
208220
0 commit comments