99import com .xiaojukeji .know .streaming .km .common .bean .po .BaseESPO ;
1010import com .xiaojukeji .know .streaming .km .common .bean .po .metrice .BaseMetricESPO ;
1111import com .xiaojukeji .know .streaming .km .common .bean .vo .metrics .point .MetricPointVO ;
12+ import com .xiaojukeji .know .streaming .km .common .utils .CommonUtils ;
1213import com .xiaojukeji .know .streaming .km .common .utils .IndexNameUtils ;
1314import com .xiaojukeji .know .streaming .km .persistence .es .BaseESDAO ;
1415import com .xiaojukeji .know .streaming .km .persistence .es .dsls .DslsConstant ;
@@ -34,6 +35,8 @@ public class BaseMetricESDAO extends BaseESDAO {
3435 protected static final Long ONE_HOUR = 60 * ONE_MIN ;
3536 protected static final Long ONE_DAY = 24 * ONE_HOUR ;
3637
38+ private static final int INDEX_DAYS = 7 ;
39+
3740 /**
3841 * 不同维度 kafka 监控数据
3942 */
@@ -45,12 +48,18 @@ public class BaseMetricESDAO extends BaseESDAO {
4548 */
4649 @ Scheduled (cron = "0 3/5 * * * ?" )
4750 public void checkCurrentDayIndexExist (){
48- String realIndex = IndexNameUtils .genCurrentDailyIndexName (indexName );
51+ try {
52+ esOpClient .createIndexTemplateIfNotExist (indexName , indexTemplate );
4953
50- if (esOpClient .indexExist (realIndex )){return ;}
54+ //检查最近7天索引存在不存
55+ for (int i = 0 ; i < INDEX_DAYS ; i ++){
56+ String realIndex = IndexNameUtils .genDailyIndexName (indexName , i );
57+ if (esOpClient .indexExist (realIndex )){continue ;}
5158
52- if (esOpClient .createIndexTemplateIfNotExist (indexName , indexTemplate )){
53- esOpClient .createIndex (realIndex );
59+ esOpClient .createIndex (realIndex );
60+ }
61+ }catch (Exception e ){
62+ LOGGER .error ("method=checkCurrentDayIndexExist||errMsg=exception!" , e );
5463 }
5564 }
5665
@@ -336,10 +345,17 @@ protected int handleESQueryResponseCount(ESQueryResponse response){
336345 if (null == response || null == response .getHits ()
337346 || null ==response .getHits ().getUnusedMap ()){return -1 ;}
338347
339- // "total" : {"value": 123 , "relation": "XX "}
340- JSONObject jsonObjectTotal = (JSONObject ) response .getHits ().getUnusedMap ().getOrDefault (TOTAL , 0 );
341-
342- return Integer .valueOf (jsonObjectTotal .get (VALUE ).toString ());
348+ try {
349+ String total = response .getHits ().getUnusedMap ().get (TOTAL ).toString ();
350+ if (CommonUtils .isNumeric (total )){
351+ return Integer .valueOf (total );
352+ }else {
353+ return JSON .parseObject (total ).getIntValue (VALUE );
354+ }
355+ }catch (Exception e ){
356+ LOGGER .error ("method=handleESQueryResponseCount||errMsg=exception!" , e );
357+ }
358+ return 0 ;
343359 }
344360
345361 protected <T extends BaseMetricESPO > T filterMetrics (T t , List <String > metricNames ){
@@ -389,6 +405,10 @@ protected Long getLatestMetricTime(String appendQueryDsl) {
389405 * 对 metricPointVOS 进行缺点优化
390406 */
391407 protected List <MetricPointVO > optimizeMetricPoints (List <MetricPointVO > metricPointVOS ){
408+ // // 内部测试环境,不进行优化,直接返回
409+ // return metricPointVOS;
410+
411+ // 开源环境,进行指标点优化
392412 if (CollectionUtils .isEmpty (metricPointVOS )){return metricPointVOS ;}
393413
394414 int size = metricPointVOS .size ();
0 commit comments