Skip to content

Commit 9eadafe

Browse files
author
zengqiao
committed
健康巡检交由KS线程进行执行
1 parent dab3eef commit 9eadafe

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
1717
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
1818
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
19+
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
1920
import lombok.AllArgsConstructor;
2021
import lombok.NoArgsConstructor;
2122
import org.springframework.beans.factory.annotation.Autowired;
@@ -24,11 +25,18 @@
2425

2526
@NoArgsConstructor
2627
@AllArgsConstructor
27-
@Task(name = "HealthCheckTask", description = "健康检查", cron = "0 0/1 * * * ? *",
28-
autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60)
28+
@Task(name = "HealthCheckTask",
29+
description = "健康检查",
30+
cron = "0 0/1 * * * ? *",
31+
autoRegister = true,
32+
consensual = ConsensualEnum.BROADCAST,
33+
timeout = 2 * 60)
2934
public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
3035
private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
3136

37+
@Autowired
38+
private TaskThreadPoolService taskThreadPoolService;
39+
3240
@Autowired
3341
private HealthCheckResultService healthCheckResultService;
3442

@@ -38,6 +46,16 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
3846

3947
@Override
4048
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
49+
taskThreadPoolService.submitHeavenTask(
50+
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
51+
100000,
52+
() -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs)
53+
);
54+
55+
return TaskResult.SUCCESS;
56+
}
57+
58+
private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
4159
// 获取配置,<配置名,配置信息>
4260
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
4361

@@ -73,8 +91,6 @@ public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs)
7391
} catch (Exception e) {
7492
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
7593
}
76-
77-
return TaskResult.SUCCESS;
7894
}
7995

8096
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {

0 commit comments

Comments
 (0)