Skip to content

Commit 0b376bd

Browse files
authored
Merge pull request #552 from didi/master
合并主分支
2 parents 71d4e0f + 8a0c233 commit 0b376bd

File tree

102 files changed

+1889
-362
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+1889
-362
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,7 @@ PS: 提问请尽量把问题一次性描述清楚,并告知环境信息情况
133133
**`2、微信群`**
134134

135135
微信加群:添加`mike_zhangliang``PenceXie`的微信号备注KnowStreaming加群。
136+
137+
## Star History
138+
139+
[![Star History Chart](https://api.star-history.com/svg?repos=didi/KnowStreaming&type=Date)](https://star-history.com/#didi/KnowStreaming&Date)

Releases_Notes.md

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,42 @@
11

2+
## v3.0.0-beta.1
23

3-
## v3.0.0-beta
4+
**文档**
5+
- 新增Task模块说明文档
6+
- FAQ补充 `Specified key was too long; max key length is 767 bytes ` 错误说明
7+
- FAQ补充 `出现ESIndexNotFoundException报错` 错误说明
8+
9+
10+
**Bug修复**
11+
- 修复 Consumer 点击 Stop 未停止检索的问题
12+
- 修复创建/编辑角色权限报错问题
13+
- 修复多集群管理/单集群详情均衡卡片状态错误问题
14+
- 修复版本列表未排序问题
15+
- 修复Raft集群Controller信息不断记录问题
16+
- 修复部分版本消费组描述信息获取失败问题
17+
- 修复分区Offset获取失败的日志中,缺少Topic名称信息问题
18+
- 修复GitHub图地址错误,及图裂问题
19+
- 修复Broker默认使用的地址和注释不一致问题
20+
- 修复 Consumer 列表分页不生效问题
21+
- 修复操作记录表operation_methods字段缺少默认值问题
22+
- 修复集群均衡表中move_broker_list字段无效的问题
23+
- 修复KafkaUser、KafkaACL信息获取时,日志一直重复提示不支持问题
24+
- 修复指标缺失时,曲线出现掉底的问题
25+
26+
27+
**体验优化**
28+
- 优化前端构建时间和打包体积,增加依赖打包的分包策略
29+
- 优化产品样式和文案展示
30+
- 优化ES客户端数为可配置
31+
- 优化日志中大量出现的MySQL Key冲突日志
32+
33+
34+
**能力提升**
35+
- 增加周期任务,用于主动创建缺少的ES模版及索引的能力,减少额外的脚本操作
36+
- 增加JMX连接的Broker地址可选择的能力
37+
38+
39+
## v3.0.0-beta.0
440

541
**1、多集群管理**
642

docs/assets/KnowStreamingLogo.png

-9.49 KB
Binary file not shown.
-183 KB
Binary file not shown.

docs/assets/readme/WeChat.png

-50.2 KB
Binary file not shown.

docs/assets/readme/ZSXQ.jpeg

-59.4 KB
Binary file not shown.

docs/dev_guide/Task模块简介.md

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# Task模块简介
2+
3+
## 1、Task简介
4+
5+
在 KnowStreaming 中(下面简称KS),Task模块主要是用于执行一些周期任务,包括Cluster、Broker、Topic等指标的定时采集,集群元数据定时更新至DB,集群状态的健康巡检等。在KS中,与Task模块相关的代码,我们都统一存放在km-task模块中。
6+
7+
Task模块是基于 LogiCommon 中的Logi-Job组件实现的任务周期执行,Logi-Job 的功能类似 XXX-Job,它是 XXX-Job 在 KnowStreaming 的内嵌实现,主要用于简化 KnowStreaming 的部署。
8+
Logi-Job 的任务总共有两种执行模式,分别是:
9+
10+
+ 广播模式:同一KS集群下,同一任务周期中,所有KS主机都会执行该定时任务。
11+
+ 抢占模式:同一KS集群下,同一任务周期中,仅有某一台KS主机会执行该任务。
12+
13+
KS集群范围定义:连接同一个DB,且application.yml中的spring.logi-job.app-name的名称一样的KS主机为同一KS集群。
14+
15+
## 2、使用指南
16+
17+
Task模块基于Logi-Job的广播模式与抢占模式,分别实现了任务的抢占执行、重复执行以及均衡执行,他们之间的差别是:
18+
19+
+ 抢占执行:同一个KS集群,同一个任务执行周期中,仅有一台KS主机执行该任务;
20+
+ 重复执行:同一个KS集群,同一个任务执行周期中,所有KS主机都执行该任务。比如3台KS主机,3个Kafka集群,此时每台KS主机都会去采集这3个Kafka集群的指标;
21+
+ 均衡执行:同一个KS集群,同一个任务执行周期中,每台KS主机仅执行该任务的一部分,所有的KS主机共同协作完成了任务。比如3台KS主机,3个Kafka集群,稳定运行情况下,每台KS主机将仅采集1个Kafka集群的指标,3台KS主机共同完成3个Kafka集群指标的采集。
22+
23+
下面我们看一下具体例子。
24+
25+
### 2.1、抢占模式——抢占执行
26+
27+
功能说明:
28+
29+
+ 同一个KS集群,同一个任务执行周期中,仅有一台KS主机执行该任务。
30+
31+
代码例子:
32+
33+
```java
34+
// 1、实现Job接口,重写excute方法;
35+
// 2、在类上添加@Task注解,并且配置好信息,指定为随机抢占模式;
36+
// 效果:KS集群中,每5秒,会有一台KS主机输出 "测试定时任务运行中";
37+
@Task(name = "TestJob",
38+
description = "测试定时任务",
39+
cron = "*/5 * * * * ?",
40+
autoRegister = true,
41+
consensual = ConsensualEnum.RANDOM, // 这里一定要设置为RANDOM
42+
timeout = 6 * 60)
43+
public class TestJob implements Job {
44+
45+
@Override
46+
public TaskResult execute(JobContext jobContext) throws Exception {
47+
48+
System.out.println("测试定时任务运行中");
49+
return new TaskResult();
50+
51+
}
52+
53+
}
54+
```
55+
56+
57+
58+
### 2.2、广播模式——重复执行
59+
60+
功能说明:
61+
62+
+ 同一个KS集群,同一个任务执行周期中,所有KS主机都执行该任务。比如3台KS主机,3个Kafka集群,此时每台KS主机都会去重复采集这3个Kafka集群的指标。
63+
64+
代码例子:
65+
66+
```java
67+
// 1、实现Job接口,重写excute方法;
68+
// 2、在类上添加@Task注解,并且配置好信息,指定为广播抢占模式;
69+
// 效果:KS集群中,每5秒,每台KS主机都会输出 "测试定时任务运行中";
70+
@Task(name = "TestJob",
71+
description = "测试定时任务",
72+
cron = "*/5 * * * * ?",
73+
autoRegister = true,
74+
consensual = ConsensualEnum.BROADCAST, // 这里一定要设置为BROADCAST
75+
timeout = 6 * 60)
76+
public class TestJob implements Job {
77+
78+
@Override
79+
public TaskResult execute(JobContext jobContext) throws Exception {
80+
81+
System.out.println("测试定时任务运行中");
82+
return new TaskResult();
83+
84+
}
85+
86+
}
87+
```
88+
89+
90+
91+
### 2.3、广播模式——均衡执行
92+
93+
功能说明:
94+
95+
+ 同一个KS集群,同一个任务执行周期中,每台KS主机仅执行该任务的一部分,所有的KS主机共同协作完成了任务。比如3台KS主机,3个Kafka集群,稳定运行情况下,每台KS主机将仅采集1个Kafka集群的指标,3台KS主机共同完成3个Kafka集群指标的采集。
96+
97+
代码例子:
98+
99+
+ 该模式有点特殊,是KS基于Logi-Job的广播模式,做的一个扩展,以下为一个使用例子:
100+
101+
```java
102+
// 1、继承AbstractClusterPhyDispatchTask,实现processSubTask方法;
103+
// 2、在类上添加@Task注解,并且配置好信息,指定为广播模式;
104+
// 效果:在本样例中,每隔1分钟ks会将所有的kafka集群列表在ks集群主机内均衡拆分,每台主机会将分发到自身的Kafka集群依次执行processSubTask方法,实现KS集群的任务协同处理。
105+
@Task(name = "kmJobTask",
106+
description = "km job 模块调度执行任务",
107+
cron = "0 0/1 * * * ? *",
108+
autoRegister = true,
109+
consensual = ConsensualEnum.BROADCAST,
110+
timeout = 6 * 60)
111+
public class KMJobTask extends AbstractClusterPhyDispatchTask {
112+
113+
@Autowired
114+
private JobService jobService;
115+
116+
@Override
117+
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
118+
jobService.scheduleJobByClusterId(clusterPhy.getId());
119+
return TaskResult.SUCCESS;
120+
}
121+
}
122+
```
123+
124+
125+
126+
## 3、原理简介
127+
128+
### 3.1、Task注解说明
129+
130+
```java
131+
public @interface Task {
132+
String name() default ""; //任务名称
133+
String description() default ""; //任务描述
134+
String owner() default "system"; //拥有者
135+
String cron() default ""; //定时执行的时间策略
136+
int retryTimes() default 0; //失败以后所能重试的最大次数
137+
long timeout() default 0; //在超时时间里重试
138+
//是否自动注册任务到数据库中
139+
//如果设置为false,需要手动去数据库km_task表注册定时任务信息。数据库记录和@Task注解缺一不可
140+
boolean autoRegister() default false;
141+
//执行模式:广播、随机抢占
142+
//广播模式:同一集群下的所有服务器都会执行该定时任务
143+
//随机抢占模式:同一集群下随机一台服务器执行该任务
144+
ConsensualEnum consensual() default ConsensualEnum.RANDOM;
145+
}
146+
```
147+
148+
### 3.2、数据库表介绍
149+
150+
+ logi_task:记录项目中的定时任务信息,一个定时任务对应一条记录。
151+
+ logi_job:具体任务执行信息。
152+
+ logi_job_log:定时任务的执行日志。
153+
+ logi_worker:记录机器信息,实现集群控制。
154+
155+
### 3.3、均衡执行简介
156+
157+
#### 3.3.1、类关系图
158+
159+
这里以KMJobTask为例,简单介绍KM中的定时任务实现逻辑。
160+
161+
![img](http://img-ys011.didistatic.com/static/dc2img/do1_knC85EtQ8Vbn1BcBzcjz)
162+
163+
+ Job:使用logi组件实现定时任务,必须实现该接口。
164+
+ Comparable & EntufyIdInterface:比较接口,实现任务的排序逻辑。
165+
+ AbstractDispatchTask:实现广播模式下,任务的均衡分发。
166+
+ AbstractClusterPhyDispatchTask:对分发到当前服务器的集群列表进行枚举。
167+
+ KMJobTask:实现对单个集群的定时任务处理。
168+
169+
#### 3.3.2、关键类代码
170+
171+
+ **AbstractDispatchTask类**
172+
173+
```java
174+
// 实现Job接口的抽象类,进行任务的负载均衡执行
175+
public abstract class AbstractDispatchTask<E extends Comparable & EntifyIdInterface> implements Job {
176+
177+
// 罗列所有的任务
178+
protected abstract List<E> listAllTasks();
179+
180+
// 执行被分配给该KS主机的任务
181+
protected abstract TaskResult processTask(List<E> subTaskList, long triggerTimeUnitMs);
182+
183+
// 被Logi-Job触发执行该方法
184+
// 该方法进行任务的分配
185+
@Override
186+
public TaskResult execute(JobContext jobContext) {
187+
try {
188+
189+
long triggerTimeUnitMs = System.currentTimeMillis();
190+
191+
// 获取所有的任务
192+
List<E> allTaskList = this.listAllTasks();
193+
194+
// 计算当前KS机器需要执行的任务
195+
List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());
196+
197+
// 进行任务处理
198+
return this.processTask(subTaskList, triggerTimeUnitMs);
199+
} catch (Exception e) {
200+
// ...
201+
}
202+
}
203+
}
204+
```
205+
206+
+ **AbstractClusterPhyDispatchTask类**
207+
208+
```java
209+
// 继承AbstractDispatchTask的抽象类,对Kafka集群进行负载均衡执行
210+
public abstract class AbstractClusterPhyDispatchTask extends AbstractDispatchTask<ClusterPhy> {
211+
212+
// 执行被分配的任务,具体由子类实现
213+
protected abstract TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
214+
215+
// 返回所有的Kafka集群
216+
@Override
217+
public List<ClusterPhy> listAllTasks() {
218+
return clusterPhyService.listAllClusters();
219+
}
220+
221+
// 执行被分配给该KS主机的Kafka集群任务
222+
@Override
223+
public TaskResult processTask(List<ClusterPhy> subTaskList, long triggerTimeUnitMs) { // ... }
224+
225+
}
226+
```
227+
228+
+ **KMJobTask**
229+
230+
```java
231+
// 加上@Task注解,并配置任务执行信息
232+
@Task(name = "kmJobTask",
233+
description = "km job 模块调度执行任务",
234+
cron = "0 0/1 * * * ? *",
235+
autoRegister = true,
236+
consensual = ConsensualEnum.BROADCAST,
237+
timeout = 6 * 60)
238+
// 继承AbstractClusterPhyDispatchTask类
239+
public class KMJobTask extends AbstractClusterPhyDispatchTask {
240+
241+
@Autowired
242+
private JobService jobService;
243+
244+
// 执行该Kafka集群的Job模块的任务
245+
@Override
246+
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
247+
jobService.scheduleJobByClusterId(clusterPhy.getId());
248+
return TaskResult.SUCCESS;
249+
}
250+
}
251+
```
252+
253+
#### 3.3.3、均衡执行总结
254+
255+
均衡执行的实现原理总结起来就是以下几点:
256+
257+
+ Logi-Job设置为广播模式,触发所有的KS主机执行任务;
258+
+ 每台KS主机,被触发执行后,按照统一的规则,对任务列表,KS集群主机列表进行排序。然后按照顺序将任务列表均衡的分配给排序后的KS集群主机。KS集群稳定运行情况下,这一步保证了每台KS主机之间分配到的任务列表不重复,不丢失。
259+
+ 最后每台KS主机,执行被分配到的任务。
260+
261+
## 4、注意事项
262+
263+
+ 不能100%保证任务在一个周期内,且仅且执行一次,可能出现重复执行或丢失的情况,所以必须严格是且仅且执行一次的任务,不建议基于Logi-Job进行任务控制。
264+
+ 尽量让Logi-Job仅负责任务的触发,后续的执行建议放到自己创建的线程池中进行。
Binary file not shown.
-228 KB
Binary file not shown.

docs/dev_guide/多版本兼容方案.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ KS-KM 根据其需要纳管的 kafka 版本,按照上述三个维度构建了
3636

3737
&emsp;&emsp;KS-KM 的每个版本针对需要纳管的 kafka 版本列表,事先分析各个版本的差异性和产品需求,同时 KS-KM 构建了一套专门处理兼容性的服务,来进行兼容性的注册、字典构建、处理器分发等操作,其中版本兼容性处理器是来具体处理不同 kafka 版本差异性的地方。
3838

39-
![registerHandler](./assets/multi_version_compatible/registerHandler.png)
39+
![registerHandler](http://img-ys011.didistatic.com/static/dc2img/do1_WxVTzndYE59ah5DFrMfn)
4040

4141
&emsp;&emsp;如上图所示,KS-KM 的 topic 服务在面对不同 kafka 版本时,其 topic 的创建、删除、扩容由于 kafka 版本自身的差异,导致 KnowStreaming 的处理也不一样,所以需要根据不同的 kafka 版本来实现不同的兼容性处理器,同时向 KnowStreaming 的兼容服务进行兼容性的注册,构建兼容性字典,后续在 KnowStreaming 的运行过程中,针对不同的 kafka 版本即可分发到不同的处理器中执行。
4242

0 commit comments

Comments
 (0)