Skip to content

Commit 5ef5f6e

Browse files
author
zengqiao
committed
健康巡检任务按照类型进行拆分
1 parent c199237 commit 5ef5f6e

File tree

4 files changed

+412
-23
lines changed

4 files changed

+412
-23
lines changed

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java renamed to km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,38 @@
99
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
1010
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
1111
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
12-
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
1312
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
1413
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
1514
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
16-
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
1715
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
16+
import com.xiaojukeji.know.streaming.km.core.service.health.checker.broker.HealthCheckBrokerService;
17+
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
1818
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
1919
import lombok.AllArgsConstructor;
2020
import lombok.NoArgsConstructor;
2121
import org.springframework.beans.factory.annotation.Autowired;
2222

23-
import java.util.*;
23+
import java.util.ArrayList;
24+
import java.util.Date;
25+
import java.util.List;
26+
import java.util.Map;
2427

2528
@NoArgsConstructor
2629
@AllArgsConstructor
27-
@Task(name = "HealthCheckTask",
28-
description = "健康检查",
30+
@Task(name = "BrokerHealthCheckTask",
31+
description = "Broker健康检查",
2932
cron = "0 0/1 * * * ? *",
3033
autoRegister = true,
3134
consensual = ConsensualEnum.BROADCAST,
3235
timeout = 2 * 60)
33-
public class HealthCheckTask extends AbstractAsyncMetricsDispatchTask {
34-
private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
36+
public class BrokerHealthCheckTask extends AbstractAsyncMetricsDispatchTask {
37+
private static final ILog log = LogFactory.getLog(BrokerHealthCheckTask.class);
3538

3639
@Autowired
3740
private HealthCheckResultService healthCheckResultService;
3841

39-
private final List<AbstractHealthCheckService> healthCheckServiceList = new ArrayList<>(
40-
SpringTool.getBeansOfType(AbstractHealthCheckService.class).values()
41-
);
42+
@Autowired
43+
private HealthCheckBrokerService healthCheckBrokerService;
4244

4345
@Override
4446
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
@@ -53,33 +55,30 @@ private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long tri
5355
List<HealthCheckResult> resultList = new ArrayList<>();
5456

5557
// 遍历Check-Service
56-
for (AbstractHealthCheckService healthCheckService: healthCheckServiceList) {
57-
List<ClusterPhyParam> paramList = healthCheckService.getResList(clusterPhy.getId());
58-
if (ValidateUtils.isEmptyList(paramList)) {
59-
// 当前无该维度的资源,则直接设置为
60-
resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckService, healthConfigMap));
61-
continue;
62-
}
58+
List<ClusterPhyParam> paramList = healthCheckBrokerService.getResList(clusterPhy.getId());
59+
if (ValidateUtils.isEmptyList(paramList)) {
60+
// 当前无该维度的资源,则直接设置为
61+
resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckBrokerService, healthConfigMap));
62+
}
6363

64-
// 遍历资源
65-
for (ClusterPhyParam clusterPhyParam: paramList) {
66-
resultList.addAll(this.checkAndGetResult(healthCheckService, clusterPhyParam, healthConfigMap));
67-
}
64+
// 遍历资源
65+
for (ClusterPhyParam clusterPhyParam: paramList) {
66+
resultList.addAll(this.checkAndGetResult(healthCheckBrokerService, clusterPhyParam, healthConfigMap));
6867
}
6968

7069
for (HealthCheckResult checkResult: resultList) {
7170
try {
7271
healthCheckResultService.replace(checkResult);
7372
} catch (Exception e) {
74-
log.error("method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e);
73+
log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e);
7574
}
7675
}
7776

7877
// 删除10分钟之前的检查结果
7978
try {
8079
healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000));
8180
} catch (Exception e) {
82-
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
81+
log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
8382
}
8483

8584
return TaskResult.SUCCESS;
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package com.xiaojukeji.know.streaming.km.task.health;
2+
3+
import com.didiglobal.logi.job.annotation.Task;
4+
import com.didiglobal.logi.job.common.TaskResult;
5+
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
6+
import com.didiglobal.logi.log.ILog;
7+
import com.didiglobal.logi.log.LogFactory;
8+
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
9+
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
10+
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
11+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
12+
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
13+
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
14+
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
15+
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
16+
import com.xiaojukeji.know.streaming.km.core.service.health.checker.cluster.HealthCheckClusterService;
17+
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
18+
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
19+
import lombok.AllArgsConstructor;
20+
import lombok.NoArgsConstructor;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
23+
import java.util.ArrayList;
24+
import java.util.Date;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
@NoArgsConstructor
29+
@AllArgsConstructor
30+
@Task(name = "ClusterHealthCheckTask",
31+
description = "Cluster健康检查",
32+
cron = "0 0/1 * * * ? *",
33+
autoRegister = true,
34+
consensual = ConsensualEnum.BROADCAST,
35+
timeout = 2 * 60)
36+
public class ClusterHealthCheckTask extends AbstractAsyncMetricsDispatchTask {
37+
private static final ILog log = LogFactory.getLog(ClusterHealthCheckTask.class);
38+
39+
@Autowired
40+
private HealthCheckResultService healthCheckResultService;
41+
42+
@Autowired
43+
private HealthCheckClusterService healthCheckClusterService;
44+
45+
@Override
46+
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
47+
return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs);
48+
}
49+
50+
private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
51+
// 获取配置,<配置名,配置信息>
52+
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
53+
54+
// 检查结果
55+
List<HealthCheckResult> resultList = new ArrayList<>();
56+
57+
// 遍历Check-Service
58+
List<ClusterPhyParam> paramList = healthCheckClusterService.getResList(clusterPhy.getId());
59+
if (ValidateUtils.isEmptyList(paramList)) {
60+
// 当前无该维度的资源,则直接设置为
61+
resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckClusterService, healthConfigMap));
62+
}
63+
64+
// 遍历资源
65+
for (ClusterPhyParam clusterPhyParam: paramList) {
66+
resultList.addAll(this.checkAndGetResult(healthCheckClusterService, clusterPhyParam, healthConfigMap));
67+
}
68+
69+
for (HealthCheckResult checkResult: resultList) {
70+
try {
71+
healthCheckResultService.replace(checkResult);
72+
} catch (Exception e) {
73+
log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e);
74+
}
75+
}
76+
77+
// 删除10分钟之前的检查结果
78+
try {
79+
healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000));
80+
} catch (Exception e) {
81+
log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
82+
}
83+
84+
return TaskResult.SUCCESS;
85+
}
86+
87+
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {
88+
List<HealthCheckResult> resultList = new ArrayList<>();
89+
90+
// 进行检查
91+
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
92+
HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum();
93+
if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) {
94+
// 类型不匹配
95+
continue;
96+
}
97+
98+
// 记录
99+
HealthCheckResult checkResult = new HealthCheckResult(
100+
dimensionEnum.getDimension(),
101+
clusterHealthConfig.getCheckNameEnum().getConfigName(),
102+
clusterPhyId,
103+
"-1"
104+
);
105+
checkResult.setPassed(Constant.YES);
106+
resultList.add(checkResult);
107+
}
108+
109+
return resultList;
110+
}
111+
112+
private List<HealthCheckResult> checkAndGetResult(AbstractHealthCheckService healthCheckService,
113+
ClusterPhyParam clusterPhyParam,
114+
Map<String, BaseClusterHealthConfig> healthConfigMap) {
115+
List<HealthCheckResult> resultList = new ArrayList<>();
116+
117+
// 进行检查
118+
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
119+
HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig);
120+
if (healthCheckResult == null) {
121+
continue;
122+
}
123+
124+
// 记录
125+
resultList.add(healthCheckResult);
126+
}
127+
128+
return resultList;
129+
}
130+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package com.xiaojukeji.know.streaming.km.task.health;
2+
3+
import com.didiglobal.logi.job.annotation.Task;
4+
import com.didiglobal.logi.job.common.TaskResult;
5+
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
6+
import com.didiglobal.logi.log.ILog;
7+
import com.didiglobal.logi.log.LogFactory;
8+
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
9+
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
10+
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
11+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
12+
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
13+
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
14+
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
15+
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
16+
import com.xiaojukeji.know.streaming.km.core.service.health.checker.group.HealthCheckGroupService;
17+
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
18+
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
19+
import lombok.AllArgsConstructor;
20+
import lombok.NoArgsConstructor;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
23+
import java.util.ArrayList;
24+
import java.util.Date;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
@NoArgsConstructor
29+
@AllArgsConstructor
30+
@Task(name = "GroupHealthCheckTask",
31+
description = "Group健康检查",
32+
cron = "0 0/1 * * * ? *",
33+
autoRegister = true,
34+
consensual = ConsensualEnum.BROADCAST,
35+
timeout = 2 * 60)
36+
public class GroupHealthCheckTask extends AbstractAsyncMetricsDispatchTask {
37+
private static final ILog log = LogFactory.getLog(GroupHealthCheckTask.class);
38+
39+
@Autowired
40+
private HealthCheckResultService healthCheckResultService;
41+
42+
@Autowired
43+
private HealthCheckGroupService healthCheckGroupService;
44+
45+
@Override
46+
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
47+
return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs);
48+
}
49+
50+
private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
51+
// 获取配置,<配置名,配置信息>
52+
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
53+
54+
// 检查结果
55+
List<HealthCheckResult> resultList = new ArrayList<>();
56+
57+
// 遍历Check-Service
58+
List<ClusterPhyParam> paramList = healthCheckGroupService.getResList(clusterPhy.getId());
59+
if (ValidateUtils.isEmptyList(paramList)) {
60+
// 当前无该维度的资源,则直接设置为
61+
resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckGroupService, healthConfigMap));
62+
}
63+
64+
// 遍历资源
65+
for (ClusterPhyParam clusterPhyParam: paramList) {
66+
resultList.addAll(this.checkAndGetResult(healthCheckGroupService, clusterPhyParam, healthConfigMap));
67+
}
68+
69+
for (HealthCheckResult checkResult: resultList) {
70+
try {
71+
healthCheckResultService.replace(checkResult);
72+
} catch (Exception e) {
73+
log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e);
74+
}
75+
}
76+
77+
// 删除10分钟之前的检查结果
78+
try {
79+
healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000));
80+
} catch (Exception e) {
81+
log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
82+
}
83+
84+
return TaskResult.SUCCESS;
85+
}
86+
87+
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {
88+
List<HealthCheckResult> resultList = new ArrayList<>();
89+
90+
// 进行检查
91+
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
92+
HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum();
93+
if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) {
94+
// 类型不匹配
95+
continue;
96+
}
97+
98+
// 记录
99+
HealthCheckResult checkResult = new HealthCheckResult(
100+
dimensionEnum.getDimension(),
101+
clusterHealthConfig.getCheckNameEnum().getConfigName(),
102+
clusterPhyId,
103+
"-1"
104+
);
105+
checkResult.setPassed(Constant.YES);
106+
resultList.add(checkResult);
107+
}
108+
109+
return resultList;
110+
}
111+
112+
private List<HealthCheckResult> checkAndGetResult(AbstractHealthCheckService healthCheckService,
113+
ClusterPhyParam clusterPhyParam,
114+
Map<String, BaseClusterHealthConfig> healthConfigMap) {
115+
List<HealthCheckResult> resultList = new ArrayList<>();
116+
117+
// 进行检查
118+
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
119+
HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig);
120+
if (healthCheckResult == null) {
121+
continue;
122+
}
123+
124+
// 记录
125+
resultList.add(healthCheckResult);
126+
}
127+
128+
return resultList;
129+
}
130+
}

0 commit comments

Comments
 (0)