Skip to content

Commit 028c3bb

Browse files
author
zengqiao
committed
指标发送ES类按照指标类别拆分
1 parent 5ef5f6e commit 028c3bb

File tree

8 files changed

+243
-121
lines changed

8 files changed

+243
-121
lines changed

km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java

Lines changed: 0 additions & 121 deletions
This file was deleted.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.xiaojukeji.know.streaming.km.collector.sink;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
6+
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
7+
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
8+
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
9+
import org.apache.commons.collections.CollectionUtils;
10+
11+
import java.util.List;
12+
import java.util.Objects;
13+
import java.util.concurrent.LinkedBlockingDeque;
14+
import java.util.concurrent.ThreadPoolExecutor;
15+
import java.util.concurrent.TimeUnit;
16+
17+
public abstract class AbstractMetricESSender {
18+
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
19+
20+
private static final int THRESHOLD = 100;
21+
22+
private static final ThreadPoolExecutor esExecutor = new ThreadPoolExecutor(
23+
10,
24+
20,
25+
6000,
26+
TimeUnit.MILLISECONDS,
27+
new LinkedBlockingDeque<>(1000),
28+
new NamedThreadFactory("KM-Collect-MetricESSender-ES"),
29+
(r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount())
30+
);
31+
32+
/**
33+
* 根据不同监控维度来发送
34+
*/
35+
protected boolean send2es(String index, List<? extends BaseESPO> statsList){
36+
if (CollectionUtils.isEmpty(statsList)) {
37+
return true;
38+
}
39+
40+
if (!EnvUtil.isOnline()) {
41+
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
42+
index, statsList.size());
43+
}
44+
45+
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
46+
if (Objects.isNull( baseMetricESDao )) {
47+
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index);
48+
return false;
49+
}
50+
51+
int size = statsList.size();
52+
int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1);
53+
54+
if (size < THRESHOLD) {
55+
esExecutor.execute(
56+
() -> baseMetricESDao.batchInsertStats(statsList)
57+
);
58+
return true;
59+
}
60+
61+
for (int i = 1; i < num + 1; i++) {
62+
int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD);
63+
int start = (i - 1) * THRESHOLD;
64+
65+
esExecutor.execute(
66+
() -> baseMetricESDao.batchInsertStats(statsList.subList(start, end))
67+
);
68+
}
69+
70+
return true;
71+
}
72+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.xiaojukeji.know.streaming.km.collector.sink;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent;
6+
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO;
7+
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
8+
import org.springframework.context.ApplicationListener;
9+
import org.springframework.stereotype.Component;
10+
11+
import javax.annotation.PostConstruct;
12+
13+
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.BROKER_INDEX;
14+
15+
@Component
16+
public class BrokerMetricESSender extends AbstractMetricESSender implements ApplicationListener<BrokerMetricEvent> {
17+
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
18+
19+
@PostConstruct
20+
public void init(){
21+
LOGGER.info("class=BrokerMetricESSender||method=init||msg=init finished");
22+
}
23+
24+
@Override
25+
public void onApplicationEvent(BrokerMetricEvent event) {
26+
send2es(BROKER_INDEX, ConvertUtil.list2List(event.getBrokerMetrics(), BrokerMetricPO.class));
27+
}
28+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.xiaojukeji.know.streaming.km.collector.sink;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ClusterMetricEvent;
6+
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
7+
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
8+
import org.springframework.context.ApplicationListener;
9+
import org.springframework.stereotype.Component;
10+
11+
import javax.annotation.PostConstruct;
12+
13+
14+
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.CLUSTER_INDEX;
15+
16+
@Component
17+
public class ClusterMetricESSender extends AbstractMetricESSender implements ApplicationListener<ClusterMetricEvent> {
18+
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
19+
20+
@PostConstruct
21+
public void init(){
22+
LOGGER.info("class=ClusterMetricESSender||method=init||msg=init finished");
23+
}
24+
25+
@Override
26+
public void onApplicationEvent(ClusterMetricEvent event) {
27+
send2es(CLUSTER_INDEX, ConvertUtil.list2List(event.getClusterMetrics(), ClusterMetricPO.class));
28+
}
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.xiaojukeji.know.streaming.km.collector.sink;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.GroupMetricEvent;
6+
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.GroupMetricPO;
7+
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
8+
import org.springframework.context.ApplicationListener;
9+
import org.springframework.stereotype.Component;
10+
11+
import javax.annotation.PostConstruct;
12+
13+
14+
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.GROUP_INDEX;
15+
16+
@Component
17+
public class GroupMetricESSender extends AbstractMetricESSender implements ApplicationListener<GroupMetricEvent> {
18+
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
19+
20+
@PostConstruct
21+
public void init(){
22+
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished");
23+
}
24+
25+
@Override
26+
public void onApplicationEvent(GroupMetricEvent event) {
27+
send2es(GROUP_INDEX, ConvertUtil.list2List(event.getGroupMetrics(), GroupMetricPO.class));
28+
}
29+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.xiaojukeji.know.streaming.km.collector.sink;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent;
6+
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.PartitionMetricPO;
7+
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
8+
import org.springframework.context.ApplicationListener;
9+
import org.springframework.stereotype.Component;
10+
11+
import javax.annotation.PostConstruct;
12+
13+
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.PARTITION_INDEX;
14+
15+
@Component
16+
public class PartitionMetricESSender extends AbstractMetricESSender implements ApplicationListener<PartitionMetricEvent> {
17+
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
18+
19+
@PostConstruct
20+
public void init(){
21+
LOGGER.info("class=PartitionMetricESSender||method=init||msg=init finished");
22+
}
23+
24+
@Override
25+
public void onApplicationEvent(PartitionMetricEvent event) {
26+
send2es(PARTITION_INDEX, ConvertUtil.list2List(event.getPartitionMetrics(), PartitionMetricPO.class));
27+
}
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.xiaojukeji.know.streaming.km.collector.sink;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent;
6+
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
7+
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
8+
import org.springframework.context.ApplicationListener;
9+
import org.springframework.stereotype.Component;
10+
11+
import javax.annotation.PostConstruct;
12+
13+
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.REPLICATION_INDEX;
14+
15+
@Component
16+
public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener<ReplicaMetricEvent> {
17+
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
18+
19+
@PostConstruct
20+
public void init(){
21+
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished");
22+
}
23+
24+
@Override
25+
public void onApplicationEvent(ReplicaMetricEvent event) {
26+
send2es(REPLICATION_INDEX, ConvertUtil.list2List(event.getReplicationMetrics(), ReplicationMetricPO.class));
27+
}
28+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.xiaojukeji.know.streaming.km.collector.sink;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
6+
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
7+
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
8+
import org.springframework.context.ApplicationListener;
9+
import org.springframework.stereotype.Component;
10+
11+
import javax.annotation.PostConstruct;
12+
13+
14+
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.TOPIC_INDEX;
15+
16+
@Component
17+
public class TopicMetricESSender extends AbstractMetricESSender implements ApplicationListener<TopicMetricEvent> {
18+
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
19+
20+
@PostConstruct
21+
public void init(){
22+
LOGGER.info("class=TopicMetricESSender||method=init||msg=init finished");
23+
}
24+
25+
@Override
26+
public void onApplicationEvent(TopicMetricEvent event) {
27+
send2es(TOPIC_INDEX, ConvertUtil.list2List(event.getTopicMetrics(), TopicMetricPO.class));
28+
}
29+
}

0 commit comments

Comments
 (0)