Skip to content

Commit fcefe7a

Browse files
author
wangyaobo
committed
文档
1 parent 639b1f8 commit fcefe7a

File tree

1 file changed

+250
-0
lines changed

1 file changed

+250
-0
lines changed
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# KnowStreaming Task模块简介
2+
3+
## 1、Task简介
4+
5+
在 KnowStreaming 中(下面简称KS),Task模块主要是用于执行一些周期任务,包括Cluster、Broker、Topic等指标的定时采集,集群元数据定时更新至DB,集群状态的健康巡检等。在KS中,与Task模块相关的代码,我们都统一存放在km-task模块中。
6+
7+
Task模块是基于 LogiCommon 中的Logi-Job组件实现的任务周期执行,Logi-Job 的功能类似 XXX-Job,它是 XXX-Job 在 KnowStreaming 的内嵌实现,主要用于简化 KnowStreaming 的部署。
8+
Logi-Job 的任务总共有两种执行模式,分别是:
9+
10+
+ 广播模式:同一KS集群下,同一任务周期中,所有KS主机都会执行该定时任务。
11+
+ 抢占模式:同一KS集群下,同一任务周期中,仅有某一台KS主机会执行该任务。
12+
13+
KS集群范围定义:连接同一个DB,且application.yml中的spring.logi-job.app-name的名称一样的KS主机为同一KS集群。
14+
15+
## 2、使用指南
16+
17+
Task模块基于Logi-Job的广播模式与抢占模式,分别实现了任务的抢占执行、重复执行以及均衡执行,他们之间的差别是:
18+
19+
+ 抢占执行:同一个KS集群,同一个任务执行周期中,仅有一台KS主机执行该任务;
20+
+ 重复执行:同一个KS集群,同一个任务执行周期中,所有KS主机都执行该任务。比如3台KS主机,3个Kafka集群,此时每台KS主机都会去采集这3个Kafka集群的指标;
21+
+ 均衡执行:同一个KS集群,同一个任务执行周期中,每台KS主机仅执行该任务的一部分,所有的KS主机共同协作完成了任务。比如3台KS主机,3个Kafka集群,稳定运行情况下,每台KS主机将仅采集1个Kafka集群的指标,3台KS主机共同完成3个Kafka集群指标的采集。
22+
23+
下面我们看一下具体例子。
24+
25+
### 2.1、抢占模式——抢占执行
26+
27+
功能说明:
28+
29+
+ 同一个KS集群,同一个任务执行周期中,仅有一台KS主机执行该任务。
30+
31+
代码例子:
32+
33+
```java
34+
// 1、实现Job接口,重写excute方法;
35+
// 2、在类上添加@Task注解,并且配置好信息,指定为随机抢占模式;
36+
// 效果:KS集群中,每5秒,会有一台KS主机输出 "测试定时任务运行中";
37+
@Task(name = "TestJob",
38+
description = "测试定时任务",
39+
cron = "*/5 * * * * ?",
40+
autoRegister = true,
41+
consensual = ConsensualEnum.RANDOM, // 这里一定要设置为RANDOM
42+
timeout = 6 * 60)
43+
public class TestJob implements Job {
44+
@Override
45+
public TaskResult execute(JobContext jobContext) throws Exception {
46+
System.out.println("测试定时任务运行中");
47+
return new TaskResult();
48+
}
49+
}
50+
```
51+
52+
53+
54+
### 2.2、广播模式——重复执行
55+
56+
功能说明:
57+
58+
+ 同一个KS集群,同一个任务执行周期中,所有KS主机都执行该任务。比如3台KS主机,3个Kafka集群,此时每台KS主机都会去重复采集这3个Kafka集群的指标。
59+
60+
代码例子:
61+
62+
```java
63+
// 1、实现Job接口,重写excute方法;
64+
// 2、在类上添加@Task注解,并且配置好信息,指定为广播抢占模式;
65+
// 效果:KS集群中,每5秒,每台KS主机都会输出 "测试定时任务运行中";
66+
@Task(name = "TestJob",
67+
description = "测试定时任务",
68+
cron = "*/5 * * * * ?",
69+
autoRegister = true,
70+
consensual = ConsensualEnum.BROADCAST, // 这里一定要设置为BROADCAST
71+
timeout = 6 * 60)
72+
public class TestJob implements Job {
73+
@Override
74+
public TaskResult execute(JobContext jobContext) throws Exception {
75+
System.out.println("测试定时任务运行中");
76+
return new TaskResult();
77+
}
78+
}
79+
```
80+
81+
82+
83+
### 2.3、广播模式——均衡执行
84+
85+
功能说明:
86+
87+
+ 同一个KS集群,同一个任务执行周期中,每台KS主机仅执行该任务的一部分,所有的KS主机共同协作完成了任务。比如3台KS主机,3个Kafka集群,稳定运行情况下,每台KS主机将仅采集1个Kafka集群的指标,3台KS主机共同完成3个Kafka集群指标的采集。
88+
89+
代码例子:
90+
91+
+ 该模式有点特殊,是KS基于Logi-Job的广播模式,做的一个扩展,以下为一个使用例子:
92+
93+
```java
94+
// 1、继承AbstractClusterPhyDispatchTask,实现processSubTask方法;
95+
// 2、在类上添加@Task注解,并且配置好信息,指定为广播模式;
96+
// 效果:在本样例中,每隔1分钟ks会将所有的kafka集群列表在ks集群主机内均衡拆分,每台主机会将分发到自身的Kafka集群依次执行processSubTask方法,实现KS集群的任务协同处理。
97+
@Task(name = "kmJobTask",
98+
description = "km job 模块调度执行任务",
99+
cron = "0 0/1 * * * ? *",
100+
autoRegister = true,
101+
consensual = ConsensualEnum.BROADCAST,
102+
timeout = 6 * 60)
103+
public class KMJobTask extends AbstractClusterPhyDispatchTask {
104+
105+
@Autowired
106+
private JobService jobService;
107+
108+
@Override
109+
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
110+
jobService.scheduleJobByClusterId(clusterPhy.getId());
111+
return TaskResult.SUCCESS;
112+
}
113+
}
114+
```
115+
116+
117+
118+
## 3、原理简介
119+
120+
### 3.1、Task注解说明
121+
122+
```java
123+
public @interface Task {
124+
String name() default ""; //任务名称
125+
String description() default ""; //任务描述
126+
String owner() default "system"; //拥有者
127+
String cron() default ""; //定时执行的时间策略
128+
int retryTimes() default 0; //失败以后所能重试的最大次数
129+
long timeout() default 0; //在超时时间里重试
130+
//是否自动注册任务到数据库中
131+
//如果设置为false,需要手动去数据库km_task表注册定时任务信息。数据库记录和@Task注解缺一不可
132+
boolean autoRegister() default false;
133+
//执行模式:广播、随机抢占
134+
//广播模式:同一集群下的所有服务器都会执行该定时任务
135+
//随机抢占模式:同一集群下随机一台服务器执行该任务
136+
ConsensualEnum consensual() default ConsensualEnum.RANDOM;
137+
}
138+
```
139+
140+
### 3.2、数据库表介绍
141+
142+
+ logi_task:记录项目中的定时任务信息,一个定时任务对应一条记录。
143+
+ logi_job:具体任务执行信息。
144+
+ logi_job_log:定时任务的执行日志。
145+
+ logi_worker:记录机器信息,实现集群控制。
146+
147+
### 3.3、均衡执行简介
148+
149+
#### 3.3.1、类关系图
150+
151+
这里以KMJobTask为例,简单介绍KM中的定时任务实现逻辑。
152+
153+
![img](https://cooper.didichuxing.com/cooper_gateway/cn/shimo-images/fXd8Bfj4tPEcdOcT/image.png)
154+
155+
+ Job:使用logi组件实现定时任务,必须实现该接口。
156+
+ Comparable & EntufyIdInterface:比较接口,实现任务的排序逻辑。
157+
+ AbstractDispatchTask:实现广播模式下,任务的均衡分发。
158+
+ AbstractClusterPhyDispatchTask:对分发到当前服务器的集群列表进行枚举。
159+
+ KMJobTask:实现对单个集群的定时任务处理。
160+
161+
#### 3.3.2、关键类代码
162+
163+
+ **AbstractDispatchTask类**
164+
165+
```java
166+
// 实现Job接口的抽象类,进行任务的负载均衡执行
167+
public abstract class AbstractDispatchTask<E extends Comparable & EntifyIdInterface> implements Job {
168+
169+
// 罗列所有的任务
170+
protected abstract List<E> listAllTasks();
171+
172+
// 执行被分配给该KS主机的任务
173+
protected abstract TaskResult processTask(List<E> subTaskList, long triggerTimeUnitMs);
174+
175+
// 被Logi-Job触发执行该方法
176+
// 该方法进行任务的分配
177+
@Override
178+
public TaskResult execute(JobContext jobContext) {
179+
try {
180+
181+
long triggerTimeUnitMs = System.currentTimeMillis();
182+
// 获取所有的任务
183+
List<E> allTaskList = this.listAllTasks();
184+
185+
// 计算当前KS机器需要执行的任务
186+
List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());
187+
188+
// 进行任务处理
189+
return this.processTask(subTaskList, triggerTimeUnitMs);
190+
} catch (Exception e) {
191+
// ...
192+
}
193+
}
194+
}
195+
```
196+
197+
+ **AbstractClusterPhyDispatchTask类**
198+
199+
```java
200+
// 继承AbstractDispatchTask的抽象类,对Kafka集群进行负载均衡执行
201+
public abstract class AbstractClusterPhyDispatchTask extends AbstractDispatchTask<ClusterPhy> {
202+
// 执行被分配的任务,具体由子类实现
203+
protected abstract TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception; // 返回所有的Kafka集群
204+
@Override
205+
public List<ClusterPhy> listAllTasks() {
206+
return clusterPhyService.listAllClusters();
207+
}
208+
// 执行被分配给该KS主机的Kafka集群任务
209+
@Override
210+
public TaskResult processTask(List<ClusterPhy> subTaskList, long triggerTimeUnitMs) { // ... }
211+
}
212+
```
213+
214+
+ **KMJobTask**
215+
216+
```java
217+
// 加上@Task注解,并配置任务执行信息
218+
@Task(name = "kmJobTask",
219+
description = "km job 模块调度执行任务",
220+
cron = "0 0/1 * * * ? *",
221+
autoRegister = true,
222+
consensual = ConsensualEnum.BROADCAST,
223+
timeout = 6 * 60)
224+
// 继承AbstractClusterPhyDispatchTask类
225+
public class KMJobTask extends AbstractClusterPhyDispatchTask {
226+
227+
@Autowired
228+
private JobService jobService;
229+
230+
// 执行该Kafka集群的Job模块的任务
231+
@Override
232+
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
233+
jobService.scheduleJobByClusterId(clusterPhy.getId());
234+
return TaskResult.SUCCESS;
235+
}
236+
}
237+
```
238+
239+
#### 3.3.3、均衡执行总结
240+
241+
均衡执行的实现原理总结起来就是以下几点:
242+
243+
+ Logi-Job设置为广播模式,触发所有的KS主机执行任务;
244+
+ 每台KS主机,被触发执行后,按照统一的规则,对任务列表,KS集群主机列表进行排序。然后按照顺序将任务列表均衡的分配给排序后的KS集群主机。KS集群稳定运行情况下,这一步保证了每台KS主机之间分配到的任务列表不重复,不丢失。
245+
+ 最后每台KS主机,执行被分配到的任务。
246+
247+
## 4、注意事项
248+
249+
+ 不能100%保证任务在一个周期内,且仅且执行一次,可能出现重复执行或丢失的情况,所以必须严格是且仅且执行一次的任务,不建议基于Logi-Job进行任务控制。
250+
+ 尽量让Logi-Job仅负责任务的触发,后续的执行建议放到自己创建的线程池中进行。

0 commit comments

Comments
 (0)