Skip to content

Commit 703d685

Browse files
author
zengqiao
committed
Task任务分为metrics,common,metaddata三类,每一类任务的执行对应一个线程池,减少对Job模块线程池的依赖
1 parent 31a5f17 commit 703d685

25 files changed

+279
-147
lines changed

km-rest/src/main/resources/application.yml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,15 @@ thread-pool:
6060
suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改
6161

6262
task: # 任务模块的配置
63-
heaven: # 采集任务配置
64-
thread-num: 20 # 采集任务线程池核心线程数
65-
queue-size: 1000 # 采集任务线程池队列大小
63+
metrics: # metrics采集任务配置
64+
thread-num: 18 # metrics采集任务线程池核心线程数
65+
queue-size: 180 # metrics采集任务线程池队列大小
66+
metadata: # metadata同步任务配置
67+
thread-num: 27 # metadata同步任务线程池核心线程数
68+
queue-size: 270 # metadata同步任务线程池队列大小
69+
common: # 剩余其他任务配置
70+
thread-num: 15 # 剩余其他任务线程池核心线程数
71+
queue-size: 150 # 剩余其他任务线程池队列大小
6672

6773

6874
# 客户端池大小相关配置
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.xiaojukeji.know.streaming.km.task;
2+
3+
import com.didiglobal.logi.job.common.TaskResult;
4+
import com.didiglobal.logi.log.ILog;
5+
import com.didiglobal.logi.log.LogFactory;
6+
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
7+
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
10+
/**
11+
* other相关任务
12+
*/
13+
public abstract class AbstractAsyncCommonDispatchTask extends AbstractClusterPhyDispatchTask {
14+
private static final ILog log = LogFactory.getLog(AbstractAsyncCommonDispatchTask.class);
15+
16+
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
17+
18+
@Autowired
19+
private TaskThreadPoolService taskThreadPoolService;
20+
21+
@Override
22+
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
23+
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
24+
}
25+
26+
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
27+
taskThreadPoolService.submitCommonTask(
28+
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
29+
this.timeoutUnitSec.intValue() * 1000,
30+
() -> {
31+
try {
32+
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
33+
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
34+
log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
35+
} else {
36+
log.debug("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
37+
}
38+
} catch (Exception e) {
39+
log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
40+
}
41+
}
42+
);
43+
44+
return TaskResult.SUCCESS;
45+
}
46+
}

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/AbstractDispatchTask.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,12 @@ private List<E> selectTask(List<E> allTaskList, List<String> allWorkCodes, Strin
130130
}
131131
return allTaskList.subList(idx * count, Math.min(idx * count + count, allTaskList.size()));
132132
}
133+
134+
public String getTaskName() {
135+
return taskName;
136+
}
137+
138+
public Long getTimeoutUnitSec() {
139+
return timeoutUnitSec;
140+
}
133141
}

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/client/CheckJmxClientTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.springframework.beans.factory.annotation.Autowired;
1515

1616
@Task(name = "CheckJmxClientTask",
17-
description = "检查Jmx客户端,",
17+
description = "检查Jmx客户端",
1818
cron = "0 0/1 * * * ? *",
1919
autoRegister = true,
2020
timeout = 2 * 60,

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
1616
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
1717
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
18-
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
19-
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
18+
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
2019
import lombok.AllArgsConstructor;
2120
import lombok.NoArgsConstructor;
2221
import org.springframework.beans.factory.annotation.Autowired;
@@ -31,12 +30,9 @@
3130
autoRegister = true,
3231
consensual = ConsensualEnum.BROADCAST,
3332
timeout = 2 * 60)
34-
public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
33+
public class HealthCheckTask extends AbstractAsyncMetricsDispatchTask {
3534
private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
3635

37-
@Autowired
38-
private TaskThreadPoolService taskThreadPoolService;
39-
4036
@Autowired
4137
private HealthCheckResultService healthCheckResultService;
4238

@@ -45,17 +41,11 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
4541
);
4642

4743
@Override
48-
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
49-
taskThreadPoolService.submitHeavenTask(
50-
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
51-
100000,
52-
() -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs)
53-
);
54-
55-
return TaskResult.SUCCESS;
44+
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
45+
return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs);
5646
}
5747

58-
private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
48+
private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
5949
// 获取配置,<配置名,配置信息>
6050
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
6151

@@ -91,6 +81,8 @@ private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTi
9181
} catch (Exception e) {
9282
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
9383
}
84+
85+
return TaskResult.SUCCESS;
9486
}
9587

9688
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/CommunityReassignJobTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
99
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
1010
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
11-
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
11+
import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask;
1212
import org.springframework.beans.factory.annotation.Autowired;
1313

1414
@Task(name = "CommunityReassignJobTask",
@@ -17,14 +17,14 @@
1717
autoRegister = true,
1818
consensual = ConsensualEnum.BROADCAST,
1919
timeout = 6 * 60)
20-
public class CommunityReassignJobTask extends AbstractClusterPhyDispatchTask {
20+
public class CommunityReassignJobTask extends AbstractAsyncCommonDispatchTask {
2121
private static final ILog log = LogFactory.getLog(CommunityReassignJobTask.class);
2222

2323
@Autowired
2424
private ReassignJobService reassignJobService;
2525

2626
@Override
27-
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
27+
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
2828
// 获取迁移中的任务
2929
Long jobId = reassignJobService.getOneRunningJobId(clusterPhy.getId());
3030
if (jobId == null) {

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/job/KMJobTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
66
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
77
import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
8-
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
8+
import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask;
99
import org.springframework.beans.factory.annotation.Autowired;
1010

1111
@Task(name = "kmJobTask",
@@ -14,13 +14,13 @@
1414
autoRegister = true,
1515
consensual = ConsensualEnum.BROADCAST,
1616
timeout = 6 * 60)
17-
public class KMJobTask extends AbstractClusterPhyDispatchTask {
17+
public class KMJobTask extends AbstractAsyncCommonDispatchTask {
1818

1919
@Autowired
2020
private JobService jobService;
2121

2222
@Override
23-
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
23+
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
2424
jobService.scheduleJobByClusterId(clusterPhy.getId());
2525
return TaskResult.SUCCESS;
2626
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.xiaojukeji.know.streaming.km.task.metadata;
2+
3+
import com.didiglobal.logi.job.common.TaskResult;
4+
import com.didiglobal.logi.log.ILog;
5+
import com.didiglobal.logi.log.LogFactory;
6+
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
7+
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
8+
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
11+
/**
12+
* 元数据同步相关任务
13+
*/
14+
public abstract class AbstractAsyncMetadataDispatchTask extends AbstractClusterPhyDispatchTask {
15+
private static final ILog log = LogFactory.getLog(AbstractAsyncMetadataDispatchTask.class);
16+
17+
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
18+
19+
@Autowired
20+
private TaskThreadPoolService taskThreadPoolService;
21+
22+
@Override
23+
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
24+
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
25+
}
26+
27+
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
28+
taskThreadPoolService.submitMetadataTask(
29+
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
30+
this.timeoutUnitSec.intValue() * 1000,
31+
() -> {
32+
try {
33+
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
34+
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
35+
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
36+
} else {
37+
log.debug("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
38+
}
39+
} catch (Exception e) {
40+
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
41+
}
42+
}
43+
);
44+
45+
return TaskResult.SUCCESS;
46+
}
47+
}

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerConfigDiffTask.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigDiffTypeEnum;
1414
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService;
1515
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
16-
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
1716
import org.springframework.beans.factory.annotation.Autowired;
1817

1918
import java.util.*;
@@ -25,12 +24,12 @@
2524
* @date 22/02/25
2625
*/
2726
@Task(name = "SyncBrokerConfigDiffTask",
28-
description = "Broker配置的Diff信息同步到DB,",
27+
description = "Broker配置的Diff信息同步到DB",
2928
cron = "0 0/1 * * * ? *",
3029
autoRegister = true,
3130
consensual = ConsensualEnum.BROADCAST,
3231
timeout = 6 * 60)
33-
public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask {
32+
public class SyncBrokerConfigDiffTask extends AbstractAsyncMetadataDispatchTask {
3433
protected static final ILog log = LogFactory.getLog(SyncBrokerConfigDiffTask.class);
3534

3635
@Autowired
@@ -40,7 +39,7 @@ public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask {
4039
private BrokerConfigService brokerConfigService;
4140

4241
@Override
43-
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
42+
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
4443
// <configName, <BrokerId, ConfigValue>>
4544
Map<String, Map<Integer, String>> allConfigMap = new HashMap<>();
4645

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncBrokerTask.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,24 @@
99
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
1010
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
1111
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
12-
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
1312
import org.springframework.beans.factory.annotation.Autowired;
1413

1514
import java.util.List;
1615

1716
@Task(name = "SyncBrokerTask",
18-
description = "Broker信息同步到DB,",
17+
description = "Broker信息同步到DB",
1918
cron = "0 0/1 * * * ? *",
2019
autoRegister = true,
2120
consensual = ConsensualEnum.BROADCAST,
2221
timeout = 2 * 60)
23-
public class SyncBrokerTask extends AbstractClusterPhyDispatchTask {
22+
public class SyncBrokerTask extends AbstractAsyncMetadataDispatchTask {
2423
private static final ILog log = LogFactory.getLog(SyncBrokerTask.class);
2524

2625
@Autowired
2726
private BrokerService brokerService;
2827

2928
@Override
30-
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
29+
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
3130
Result<List<Broker>> brokersResult = brokerService.listBrokersFromKafka(clusterPhy);
3231
if (brokersResult.failed()) {
3332
return new TaskResult(TaskResult.FAIL_CODE, brokersResult.getMessage());

0 commit comments

Comments
 (0)