|
| 1 | +# KnowStreaming Task模块简介 |
| 2 | + |
| 3 | +## 1、Task简介 |
| 4 | + |
| 5 | +在 KnowStreaming 中(下面简称KS),Task模块主要是用于执行一些周期任务,包括Cluster、Broker、Topic等指标的定时采集,集群元数据定时更新至DB,集群状态的健康巡检等。在KS中,与Task模块相关的代码,我们都统一存放在km-task模块中。 |
| 6 | + |
| 7 | +Task模块是基于 LogiCommon 中的Logi-Job组件实现的任务周期执行,Logi-Job 的功能类似 XXX-Job,它是 XXX-Job 在 KnowStreaming 的内嵌实现,主要用于简化 KnowStreaming 的部署。 |
| 8 | +Logi-Job 的任务总共有两种执行模式,分别是: |
| 9 | + |
| 10 | ++ 广播模式:同一KS集群下,同一任务周期中,所有KS主机都会执行该定时任务。 |
| 11 | ++ 抢占模式:同一KS集群下,同一任务周期中,仅有某一台KS主机会执行该任务。 |
| 12 | + |
| 13 | +KS集群范围定义:连接同一个DB,且application.yml中的spring.logi-job.app-name的名称一样的KS主机为同一KS集群。 |
| 14 | + |
| 15 | +## 2、使用指南 |
| 16 | + |
| 17 | +Task模块基于Logi-Job的广播模式与抢占模式,分别实现了任务的抢占执行、重复执行以及均衡执行,他们之间的差别是: |
| 18 | + |
| 19 | ++ 抢占执行:同一个KS集群,同一个任务执行周期中,仅有一台KS主机执行该任务; |
| 20 | ++ 重复执行:同一个KS集群,同一个任务执行周期中,所有KS主机都执行该任务。比如3台KS主机,3个Kafka集群,此时每台KS主机都会去采集这3个Kafka集群的指标; |
| 21 | ++ 均衡执行:同一个KS集群,同一个任务执行周期中,每台KS主机仅执行该任务的一部分,所有的KS主机共同协作完成了任务。比如3台KS主机,3个Kafka集群,稳定运行情况下,每台KS主机将仅采集1个Kafka集群的指标,3台KS主机共同完成3个Kafka集群指标的采集。 |
| 22 | + |
| 23 | +下面我们看一下具体例子。 |
| 24 | + |
| 25 | +### 2.1、抢占模式——抢占执行 |
| 26 | + |
| 27 | +功能说明: |
| 28 | + |
| 29 | ++ 同一个KS集群,同一个任务执行周期中,仅有一台KS主机执行该任务。 |
| 30 | + |
| 31 | +代码例子: |
| 32 | + |
| 33 | +```java |
| 34 | +// 1、实现Job接口,重写excute方法; |
| 35 | +// 2、在类上添加@Task注解,并且配置好信息,指定为随机抢占模式; |
| 36 | +// 效果:KS集群中,每5秒,会有一台KS主机输出 "测试定时任务运行中"; |
| 37 | +@Task(name = "TestJob", |
| 38 | + description = "测试定时任务", |
| 39 | + cron = "*/5 * * * * ?", |
| 40 | + autoRegister = true, |
| 41 | + consensual = ConsensualEnum.RANDOM, // 这里一定要设置为RANDOM |
| 42 | + timeout = 6 * 60) |
| 43 | +public class TestJob implements Job { |
| 44 | + |
| 45 | + @Override |
| 46 | + public TaskResult execute(JobContext jobContext) throws Exception { |
| 47 | + |
| 48 | + System.out.println("测试定时任务运行中"); |
| 49 | + return new TaskResult(); |
| 50 | + |
| 51 | + } |
| 52 | + |
| 53 | +} |
| 54 | +``` |
| 55 | + |
| 56 | + |
| 57 | + |
| 58 | +### 2.2、广播模式——重复执行 |
| 59 | + |
| 60 | +功能说明: |
| 61 | + |
| 62 | ++ 同一个KS集群,同一个任务执行周期中,所有KS主机都执行该任务。比如3台KS主机,3个Kafka集群,此时每台KS主机都会去重复采集这3个Kafka集群的指标。 |
| 63 | + |
| 64 | +代码例子: |
| 65 | + |
| 66 | +```java |
| 67 | +// 1、实现Job接口,重写excute方法; |
| 68 | +// 2、在类上添加@Task注解,并且配置好信息,指定为广播抢占模式; |
| 69 | +// 效果:KS集群中,每5秒,每台KS主机都会输出 "测试定时任务运行中"; |
| 70 | +@Task(name = "TestJob", |
| 71 | + description = "测试定时任务", |
| 72 | + cron = "*/5 * * * * ?", |
| 73 | + autoRegister = true, |
| 74 | + consensual = ConsensualEnum.BROADCAST, // 这里一定要设置为BROADCAST |
| 75 | + timeout = 6 * 60) |
| 76 | +public class TestJob implements Job { |
| 77 | + |
| 78 | + @Override |
| 79 | + public TaskResult execute(JobContext jobContext) throws Exception { |
| 80 | + |
| 81 | + System.out.println("测试定时任务运行中"); |
| 82 | + return new TaskResult(); |
| 83 | + |
| 84 | + } |
| 85 | + |
| 86 | +} |
| 87 | +``` |
| 88 | + |
| 89 | + |
| 90 | + |
| 91 | +### 2.3、广播模式——均衡执行 |
| 92 | + |
| 93 | +功能说明: |
| 94 | + |
| 95 | ++ 同一个KS集群,同一个任务执行周期中,每台KS主机仅执行该任务的一部分,所有的KS主机共同协作完成了任务。比如3台KS主机,3个Kafka集群,稳定运行情况下,每台KS主机将仅采集1个Kafka集群的指标,3台KS主机共同完成3个Kafka集群指标的采集。 |
| 96 | + |
| 97 | +代码例子: |
| 98 | + |
| 99 | ++ 该模式有点特殊,是KS基于Logi-Job的广播模式,做的一个扩展,以下为一个使用例子: |
| 100 | + |
| 101 | +```java |
| 102 | +// 1、继承AbstractClusterPhyDispatchTask,实现processSubTask方法; |
| 103 | +// 2、在类上添加@Task注解,并且配置好信息,指定为广播模式; |
| 104 | +// 效果:在本样例中,每隔1分钟ks会将所有的kafka集群列表在ks集群主机内均衡拆分,每台主机会将分发到自身的Kafka集群依次执行processSubTask方法,实现KS集群的任务协同处理。 |
| 105 | +@Task(name = "kmJobTask", |
| 106 | + description = "km job 模块调度执行任务", |
| 107 | + cron = "0 0/1 * * * ? *", |
| 108 | + autoRegister = true, |
| 109 | + consensual = ConsensualEnum.BROADCAST, |
| 110 | + timeout = 6 * 60) |
| 111 | +public class KMJobTask extends AbstractClusterPhyDispatchTask { |
| 112 | + |
| 113 | + @Autowired |
| 114 | + private JobService jobService; |
| 115 | + |
| 116 | + @Override |
| 117 | + protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { |
| 118 | + jobService.scheduleJobByClusterId(clusterPhy.getId()); |
| 119 | + return TaskResult.SUCCESS; |
| 120 | + } |
| 121 | +} |
| 122 | +``` |
| 123 | + |
| 124 | + |
| 125 | + |
| 126 | +## 3、原理简介 |
| 127 | + |
| 128 | +### 3.1、Task注解说明 |
| 129 | + |
| 130 | +```java |
| 131 | +public @interface Task { |
| 132 | + String name() default ""; //任务名称 |
| 133 | + String description() default ""; //任务描述 |
| 134 | + String owner() default "system"; //拥有者 |
| 135 | + String cron() default ""; //定时执行的时间策略 |
| 136 | + int retryTimes() default 0; //失败以后所能重试的最大次数 |
| 137 | + long timeout() default 0; //在超时时间里重试 |
| 138 | + //是否自动注册任务到数据库中 |
| 139 | + //如果设置为false,需要手动去数据库km_task表注册定时任务信息。数据库记录和@Task注解缺一不可 |
| 140 | + boolean autoRegister() default false; |
| 141 | + //执行模式:广播、随机抢占 |
| 142 | + //广播模式:同一集群下的所有服务器都会执行该定时任务 |
| 143 | + //随机抢占模式:同一集群下随机一台服务器执行该任务 |
| 144 | + ConsensualEnum consensual() default ConsensualEnum.RANDOM; |
| 145 | + } |
| 146 | +``` |
| 147 | + |
| 148 | +### 3.2、数据库表介绍 |
| 149 | + |
| 150 | ++ logi_task:记录项目中的定时任务信息,一个定时任务对应一条记录。 |
| 151 | ++ logi_job:具体任务执行信息。 |
| 152 | ++ logi_job_log:定时任务的执行日志。 |
| 153 | ++ logi_worker:记录机器信息,实现集群控制。 |
| 154 | + |
| 155 | +### 3.3、均衡执行简介 |
| 156 | + |
| 157 | +#### 3.3.1、类关系图 |
| 158 | + |
| 159 | +这里以KMJobTask为例,简单介绍KM中的定时任务实现逻辑。 |
| 160 | + |
| 161 | +  |
| 162 | + |
| 163 | ++ Job:使用logi组件实现定时任务,必须实现该接口。 |
| 164 | ++ Comparable & EntufyIdInterface:比较接口,实现任务的排序逻辑。 |
| 165 | ++ AbstractDispatchTask:实现广播模式下,任务的均衡分发。 |
| 166 | ++ AbstractClusterPhyDispatchTask:对分发到当前服务器的集群列表进行枚举。 |
| 167 | ++ KMJobTask:实现对单个集群的定时任务处理。 |
| 168 | + |
| 169 | +#### 3.3.2、关键类代码 |
| 170 | + |
| 171 | ++ **AbstractDispatchTask类** |
| 172 | + |
| 173 | +```java |
| 174 | +// 实现Job接口的抽象类,进行任务的负载均衡执行 |
| 175 | +public abstract class AbstractDispatchTask<E extends Comparable & EntifyIdInterface> implements Job { |
| 176 | + |
| 177 | + // 罗列所有的任务 |
| 178 | + protected abstract List<E> listAllTasks(); |
| 179 | + |
| 180 | + // 执行被分配给该KS主机的任务 |
| 181 | + protected abstract TaskResult processTask(List<E> subTaskList, long triggerTimeUnitMs); |
| 182 | + |
| 183 | + // 被Logi-Job触发执行该方法 |
| 184 | + // 该方法进行任务的分配 |
| 185 | + @Override |
| 186 | + public TaskResult execute(JobContext jobContext) { |
| 187 | + try { |
| 188 | + |
| 189 | + long triggerTimeUnitMs = System.currentTimeMillis(); |
| 190 | + |
| 191 | + // 获取所有的任务 |
| 192 | + List<E> allTaskList = this.listAllTasks(); |
| 193 | + |
| 194 | + // 计算当前KS机器需要执行的任务 |
| 195 | + List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode()); |
| 196 | + |
| 197 | + // 进行任务处理 |
| 198 | + return this.processTask(subTaskList, triggerTimeUnitMs); |
| 199 | + } catch (Exception e) { |
| 200 | + // ... |
| 201 | + } |
| 202 | + } |
| 203 | +} |
| 204 | +``` |
| 205 | + |
| 206 | ++ **AbstractClusterPhyDispatchTask类** |
| 207 | + |
| 208 | +```java |
| 209 | +// 继承AbstractDispatchTask的抽象类,对Kafka集群进行负载均衡执行 |
| 210 | +public abstract class AbstractClusterPhyDispatchTask extends AbstractDispatchTask<ClusterPhy> { |
| 211 | + |
| 212 | + // 执行被分配的任务,具体由子类实现 |
| 213 | + protected abstract TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception; |
| 214 | + |
| 215 | + // 返回所有的Kafka集群 |
| 216 | + @Override |
| 217 | + public List<ClusterPhy> listAllTasks() { |
| 218 | + return clusterPhyService.listAllClusters(); |
| 219 | + } |
| 220 | + |
| 221 | + // 执行被分配给该KS主机的Kafka集群任务 |
| 222 | + @Override |
| 223 | + public TaskResult processTask(List<ClusterPhy> subTaskList, long triggerTimeUnitMs) { // ... } |
| 224 | + |
| 225 | +} |
| 226 | +``` |
| 227 | + |
| 228 | ++ **KMJobTask类** |
| 229 | + |
| 230 | +```java |
| 231 | +// 加上@Task注解,并配置任务执行信息 |
| 232 | +@Task(name = "kmJobTask", |
| 233 | + description = "km job 模块调度执行任务", |
| 234 | + cron = "0 0/1 * * * ? *", |
| 235 | + autoRegister = true, |
| 236 | + consensual = ConsensualEnum.BROADCAST, |
| 237 | + timeout = 6 * 60) |
| 238 | +// 继承AbstractClusterPhyDispatchTask类 |
| 239 | +public class KMJobTask extends AbstractClusterPhyDispatchTask { |
| 240 | + |
| 241 | + @Autowired |
| 242 | + private JobService jobService; |
| 243 | + |
| 244 | + // 执行该Kafka集群的Job模块的任务 |
| 245 | + @Override |
| 246 | + protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { |
| 247 | + jobService.scheduleJobByClusterId(clusterPhy.getId()); |
| 248 | + return TaskResult.SUCCESS; |
| 249 | + } |
| 250 | +} |
| 251 | +``` |
| 252 | + |
| 253 | +#### 3.3.3、均衡执行总结 |
| 254 | + |
| 255 | +均衡执行的实现原理总结起来就是以下几点: |
| 256 | + |
| 257 | ++ Logi-Job设置为广播模式,触发所有的KS主机执行任务; |
| 258 | ++ 每台KS主机,被触发执行后,按照统一的规则,对任务列表,KS集群主机列表进行排序。然后按照顺序将任务列表均衡的分配给排序后的KS集群主机。KS集群稳定运行情况下,这一步保证了每台KS主机之间分配到的任务列表不重复,不丢失。 |
| 259 | ++ 最后每台KS主机,执行被分配到的任务。 |
| 260 | + |
| 261 | +## 4、注意事项 |
| 262 | + |
| 263 | ++ 不能100%保证任务在一个周期内,且仅且执行一次,可能出现重复执行或丢失的情况,所以必须严格是且仅且执行一次的任务,不建议基于Logi-Job进行任务控制。 |
| 264 | ++ 尽量让Logi-Job仅负责任务的触发,后续的执行建议放到自己创建的线程池中进行。 |
0 commit comments