Skip to content

Commit afe44a2

Browse files
author
zengqiao
committed
自动创建ES索引 & 主动填补指标历史曲线缺少的点
1 parent 9eadafe commit afe44a2

File tree

15 files changed

+859
-98
lines changed

15 files changed

+859
-98
lines changed

km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
66
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
77
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
8-
import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum;
98
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
109
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
1110
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
@@ -21,6 +20,8 @@
2120
import java.util.concurrent.ThreadPoolExecutor;
2221
import java.util.concurrent.TimeUnit;
2322

23+
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
24+
2425
@Component
2526
public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
2627
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@@ -41,37 +42,37 @@ public void init(){
4142
public void onApplicationEvent(BaseMetricEvent event) {
4243
if(event instanceof BrokerMetricEvent) {
4344
BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event;
44-
send2es(KafkaMetricIndexEnum.BROKER_INFO,
45+
send2es(BROKER_INDEX,
4546
ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class)
4647
);
4748

4849
} else if(event instanceof ClusterMetricEvent) {
4950
ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event;
50-
send2es(KafkaMetricIndexEnum.CLUSTER_INFO,
51+
send2es(CLUSTER_INDEX,
5152
ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class)
5253
);
5354

5455
} else if(event instanceof TopicMetricEvent) {
5556
TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event;
56-
send2es(KafkaMetricIndexEnum.TOPIC_INFO,
57+
send2es(TOPIC_INDEX,
5758
ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class)
5859
);
5960

6061
} else if(event instanceof PartitionMetricEvent) {
6162
PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event;
62-
send2es(KafkaMetricIndexEnum.PARTITION_INFO,
63+
send2es(PARTITION_INDEX,
6364
ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class)
6465
);
6566

6667
} else if(event instanceof GroupMetricEvent) {
6768
GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event;
68-
send2es(KafkaMetricIndexEnum.GROUP_INFO,
69+
send2es(GROUP_INDEX,
6970
ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class)
7071
);
7172

7273
} else if(event instanceof ReplicaMetricEvent) {
7374
ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event;
74-
send2es(KafkaMetricIndexEnum.REPLICATION_INFO,
75+
send2es(REPLICATION_INDEX,
7576
ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class)
7677
);
7778
}
@@ -80,19 +81,19 @@ public void onApplicationEvent(BaseMetricEvent event) {
8081
/**
8182
* 根据不同监控维度来发送
8283
*/
83-
private boolean send2es(KafkaMetricIndexEnum stats, List<? extends BaseESPO> statsList){
84+
private boolean send2es(String index, List<? extends BaseESPO> statsList){
8485
if (CollectionUtils.isEmpty(statsList)) {
8586
return true;
8687
}
8788

8889
if (!EnvUtil.isOnline()) {
8990
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
90-
stats.getIndex(), statsList.size());
91+
index, statsList.size());
9192
}
9293

93-
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(stats);
94+
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
9495
if (Objects.isNull( baseMetricESDao )) {
95-
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", stats.getIndex());
96+
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index);
9697
return false;
9798
}
9899

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public class MetricPointVO implements Comparable<MetricPointVO> {
2929
@Override
3030
public int compareTo(MetricPointVO o) {
3131
if(null == o){return 0;}
32+
if(null == this.getTimeStamp()
33+
|| null == o.getTimeStamp()){
34+
return 0;
35+
}
3236

3337
return this.getTimeStamp().intValue() - o.getTimeStamp().intValue();
3438
}

0 commit comments

Comments
 (0)