66import com .google .common .collect .Table ;
77import com .xiaojukeji .know .streaming .km .common .bean .po .metrice .BrokerMetricPO ;
88import com .xiaojukeji .know .streaming .km .common .bean .vo .metrics .point .MetricPointVO ;
9- import com .xiaojukeji .know .streaming .km .common .utils .FutureWaitUtil ;
109import com .xiaojukeji .know .streaming .km .common .utils .MetricsUtils ;
1110import com .xiaojukeji .know .streaming .km .common .utils .Tuple ;
1211import com .xiaojukeji .know .streaming .km .persistence .es .dsls .DslConstant ;
1312import org .springframework .stereotype .Component ;
14- import org .springframework .util .CollectionUtils ;
1513
1614import javax .annotation .PostConstruct ;
1715import java .util .*;
@@ -29,8 +27,6 @@ public void init() {
2927 register ( this );
3028 }
3129
32- protected FutureWaitUtil <Void > queryFuture = FutureWaitUtil .init ("BrokerMetricESDAO" , 4 ,8 , 500 );
33-
3430 /**
3531 * 获取集群 clusterId 中 brokerId 最新的统计指标
3632 */
@@ -140,7 +136,7 @@ public BrokerMetricPO getBrokerLatestMetrics(Long clusterId, Integer brokerId){
140136 aggDsl
141137 );
142138
143- queryFuture . runnableTask (
139+ esTPService . submitSearchTask (
144140 String .format ("class=BrokerMetricESDAO||method=listBrokerMetricsByBrokerIds||ClusterPhyId=%d" , clusterPhyId ),
145141 5000 ,
146142 () -> {
@@ -163,7 +159,7 @@ public BrokerMetricPO getBrokerLatestMetrics(Long clusterId, Integer brokerId){
163159 }
164160 }
165161
166- queryFuture .waitExecute ();
162+ esTPService .waitExecute ();
167163
168164 return table ;
169165 }
@@ -220,106 +216,86 @@ public Map<String, List<Long>> getTopNBrokerIds(Long clusterPhyId, List<String>
220216 return metricMap ;
221217 }
222218
223- private Map <String , List <MetricPointVO >> handleListESQueryResponse (ESQueryResponse response , List <String > metrics , String aggType ){
219+ private Map <String , List <MetricPointVO >> handleListESQueryResponse (ESQueryResponse response , List <String > metricNameList , String aggType ){
224220 Map <String , List <MetricPointVO >> metricMap = new HashMap <>();
225221
226- if (null == response || null == response .getAggs ()){
227- return metricMap ;
228- }
229-
230- Map <String , ESAggr > esAggrMap = response .getAggs ().getEsAggrMap ();
231- if (null == esAggrMap || null == esAggrMap .get (HIST )) {
232- return metricMap ;
233- }
234-
235- if (CollectionUtils .isEmpty (esAggrMap .get (HIST ).getBucketList ())){
222+ Map <String , ESAggr > esAggrMap = this .checkBucketsAndHitsOfResponseAggs (response );
223+ if (esAggrMap == null ) {
236224 return metricMap ;
237225 }
238226
239- for (String metric : metrics ){
227+ for (String metricName : metricNameList ){
240228 List <MetricPointVO > metricPoints = new ArrayList <>();
241229
242- esAggrMap .get (HIST ).getBucketList ().forEach ( esBucket -> {
230+ esAggrMap .get (HIST ).getBucketList ().forEach (esBucket -> {
243231 try {
244- if (null != esBucket .getUnusedMap ().get (KEY )) {
245- Long timestamp = Long .valueOf (esBucket .getUnusedMap ().get (KEY ).toString ());
246- Object value = esBucket .getAggrMap ().get (metric ).getUnusedMap ().get (VALUE );
247- if (null == value ){return ;}
248-
249- MetricPointVO metricPoint = new MetricPointVO ();
250- metricPoint .setAggType (aggType );
251- metricPoint .setTimeStamp (timestamp );
252- metricPoint .setValue (value .toString ());
253- metricPoint .setName (metric );
254-
255- metricPoints .add (metricPoint );
256- }else {
257- LOGGER .info ("" );
232+ if (null == esBucket .getUnusedMap ().get (KEY )) {
233+ return ;
258234 }
259- }catch (Exception e ){
260- LOGGER .error ("metric={}||errMsg=exception!" , metric , e );
235+
236+ Long timestamp = Long .valueOf (esBucket .getUnusedMap ().get (KEY ).toString ());
237+ Object value = esBucket .getAggrMap ().get (metricName ).getUnusedMap ().get (VALUE );
238+ if (null == value ) {
239+ return ;
240+ }
241+
242+ metricPoints .add (new MetricPointVO (metricName , timestamp , value .toString (), aggType ));
243+ } catch (Exception e ){
244+ LOGGER .error ("method=handleListESQueryResponse||metricName={}||errMsg=exception!" , metricName , e );
261245 }
262246 } );
263247
264- metricMap .put (metric , optimizeMetricPoints (metricPoints ));
248+ metricMap .put (metricName , optimizeMetricPoints (metricPoints ));
265249 }
266250
267251 return metricMap ;
268252 }
269253
270- private Map <String , List <Long >> handleTopBrokerESQueryResponse (ESQueryResponse response , List <String > metrics , int topN ){
254+ private Map <String , List <Long >> handleTopBrokerESQueryResponse (ESQueryResponse response , List <String > metricNameList , int topN ) {
271255 Map <String , List <Long >> ret = new HashMap <>();
272256
273- if (null == response || null == response .getAggs ()){
274- return ret ;
275- }
276-
277- Map <String , ESAggr > esAggrMap = response .getAggs ().getEsAggrMap ();
278- if (null == esAggrMap || null == esAggrMap .get (HIST )) {
279- return ret ;
280- }
281-
282- if (CollectionUtils .isEmpty (esAggrMap .get (HIST ).getBucketList ())){
257+ Map <String , ESAggr > esAggrMap = this .checkBucketsAndHitsOfResponseAggs (response );
258+ if (esAggrMap == null ) {
283259 return ret ;
284260 }
285261
286- Map <String , List <Tuple <Long , Double >>> metricBrokerValueMap = new HashMap <>();
262+ Map <String , List <Tuple <Long , Double >>> metricNameBrokerValueMap = new HashMap <>();
287263
288264 //1、先获取每个指标对应的所有brokerIds以及指标的值
289- for (String metric : metrics ) {
290- esAggrMap .get (HIST ).getBucketList ().forEach ( esBucket -> {
265+ for (String metricName : metricNameList ) {
266+ esAggrMap .get (HIST ).getBucketList ().forEach (esBucket -> {
291267 try {
292- if (null != esBucket .getUnusedMap ().get (KEY )) {
293- Long brokerId = Long .valueOf (esBucket .getUnusedMap ().get (KEY ).toString ());
294- Object value = esBucket .getAggrMap ().get (HIST ).getBucketList ().get (0 ).getAggrMap ()
295- .get (metric ).getUnusedMap ().get (VALUE );
296- if (null == value ){return ;}
297-
298- List <Tuple <Long , Double >> brokerValue = (null == metricBrokerValueMap .get (metric )) ?
299- new ArrayList <>() : metricBrokerValueMap .get (metric );
268+ if (null == esBucket .getUnusedMap ().get (KEY )) {
269+ return ;
270+ }
300271
301- brokerValue .add (new Tuple <>(brokerId , Double .valueOf (value .toString ())));
302- metricBrokerValueMap .put (metric , brokerValue );
272+ Long brokerId = Long .valueOf (esBucket .getUnusedMap ().get (KEY ).toString ());
273+ Object value = esBucket .getAggrMap ().get (HIST ).getBucketList ().get (0 ).getAggrMap ().get (metricName ).getUnusedMap ().get (VALUE );
274+ if (null == value ) {
275+ return ;
303276 }
304- }catch (Exception e ){
305- LOGGER .error ("metrice={}||errMsg=exception!" , metric , e );
277+
278+ metricNameBrokerValueMap .putIfAbsent (metricName , new ArrayList <>());
279+ metricNameBrokerValueMap .get (metricName ).add (new Tuple <>(brokerId , Double .valueOf (value .toString ())));
280+ } catch (Exception e ) {
281+ LOGGER .error ("method=handleTopBrokerESQueryResponse||metric={}||errMsg=exception!" , metricName , e );
306282 }
307- } );
283+ });
308284 }
309285
310286 //2、对每个指标的broker按照指标值排序,并截取前topN个brokerIds
311- for (String metric : metricBrokerValueMap .keySet ()){
312- List <Tuple <Long , Double >> brokerValue = metricBrokerValueMap .get (metric );
287+ for (Map .Entry <String , List <Tuple <Long , Double >>> entry : metricNameBrokerValueMap .entrySet ()){
288+ entry .getValue ().sort ((o1 , o2 ) -> {
289+ if (null == o1 || null == o2 ){
290+ return 0 ;
291+ }
313292
314- brokerValue .sort ((o1 , o2 ) -> {
315- if (null == o1 || null == o2 ){return 0 ;}
316293 return o2 .getV2 ().compareTo (o1 .getV2 ());
317294 } );
318295
319- List <Tuple <Long , Double >> temp = (brokerValue .size () > topN ) ? brokerValue .subList (0 , topN ) : brokerValue ;
320- List <Long > brokerIds = temp .stream ().map (t -> t .getV1 ()).collect ( Collectors .toList ());
321-
322- ret .put (metric , brokerIds );
296+ // 获取TopN的Broker
297+ List <Long > brokerIdList = entry .getValue ().subList (0 , Math .min (topN , entry .getValue ().size ())).stream ().map (elem -> elem .getV1 ()).collect (Collectors .toList ());
298+ ret .put (entry .getKey (), brokerIdList );
323299 }
324300
325301 return ret ;
0 commit comments