Skip to content

Commit d948f20

Browse files
add cola-component-job
1 parent 09ce216 commit d948f20

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3026
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# 主要功能
2+
可持久化、分布式任务管理组件:
3+
1. 可同步,可异步
4+
2. 可持久化(redis,数据库)
5+
3. Job可重复执行,Step失败可回滚,Step可断点继续
6+
4. 支持批量任务
7+
8+
# 核心领域概念
9+
6. JobLauncher:是任务启动器,通过它来启动任务,可以看做是程序的入口。
10+
7. BatchJobLauncher: 批量任务的处理入口
11+
8. BatchJob:批量任务,包含了多个JobInstance,一个JobInstance包含一个Job单例和一个ExecutionContext对象
12+
9. ExecutionContext: 任务执行的上下文,主要是用来承载用户传递的参数,以及不同Job、step之间传递参数
13+
10. Job: 代表着一个具体的任务。
14+
11. JobExecution: 代表一次具体的任务执行,可持久化,会管理任务执行状态。
15+
12. Step: 代表着一个具体的步骤,一个Job可以包含多个Step。在实际业务场景中,可能一个任务很复杂,这个时候可以将任务 拆分成多个step,分别对这些step 进行管理(将一个复杂任务简单化)。
16+
13. StepExecution: 代表一个具体的执行步骤,可持久化,会管理步骤的执行状态。
17+
14. JobRepository:批处理框架执行过程中的上下文(元数据),有三种实现,1)通过内存来管理,2)通过Redis持久化,3)通过数据库持久化。
18+
19+
# 详细设计解析
20+
https://blog.csdn.net/significantfrank/article/details/145314072
21+
22+
# 如何使用
23+
1. 引入依赖cola-component-job依赖。注意:为了减少冲突,依赖的存储redis和database均为provided,需要使用方自己提供,SpringBoot版本最好在3.2.0以上
24+
2. 在SpringBoot的启动类上添加@EnableColaJob注解,该注解会扫描注册需要的bean,以及负责初始化JobRepository
25+
3. 默认的数据库初始化脚本是schema-mysql.sql。如果使用其他数据库,可通过application.yml配置文件中的cola.job.database.ddl-location属性来指定脚本路径
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.alibaba.cola</groupId>
7+
<artifactId>cola-component-job</artifactId>
8+
<packaging>jar</packaging>
9+
<name>${project.artifactId}:${project.version}</name>
10+
<description>${project.artifactId}</description>
11+
<version>1.0.0-SNAPSHOT</version>
12+
13+
<properties>
14+
<maven.compiler.source>17</maven.compiler.source>
15+
<maven.compiler.target>17</maven.compiler.target>
16+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
17+
<spring.boot.version>3.2.9</spring.boot.version>
18+
</properties>
19+
20+
<dependencies>
21+
<!-- Lombok -->
22+
<dependency>
23+
<groupId>org.projectlombok</groupId>
24+
<artifactId>lombok</artifactId>
25+
<version>1.18.34</version>
26+
<scope>provided</scope>
27+
</dependency>
28+
<!-- Logback -->
29+
<dependency>
30+
<groupId>org.springframework.boot</groupId>
31+
<artifactId>spring-boot-starter-data-redis</artifactId>
32+
<version>${spring.boot.version}</version>
33+
<scope>provided</scope>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.springframework.boot</groupId>
37+
<artifactId>spring-boot-starter-web</artifactId>
38+
<version>${spring.boot.version}</version>
39+
<scope>provided</scope>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.springframework.boot</groupId>
43+
<artifactId>spring-boot-starter-data-jpa</artifactId>
44+
<version>${spring.boot.version}</version>
45+
</dependency>
46+
<!-- test start-->
47+
<dependency>
48+
<groupId>mysql</groupId>
49+
<artifactId>mysql-connector-java</artifactId>
50+
<version>8.0.33</version>
51+
<scope>test</scope>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.junit.jupiter</groupId>
55+
<artifactId>junit-jupiter-engine</artifactId>
56+
<version>5.10.2</version>
57+
<scope>test</scope>
58+
</dependency>
59+
<dependency>
60+
<groupId>com.alibaba.cola</groupId>
61+
<artifactId>cola-component-unittest</artifactId>
62+
<version>5.0.0</version>
63+
<exclusions>
64+
<exclusion>
65+
<groupId>org.springframework.kafka</groupId>
66+
<artifactId>spring-kafka</artifactId>
67+
</exclusion>
68+
<exclusion>
69+
<groupId>org.springframework.kafka</groupId>
70+
<artifactId>spring-kafka-test</artifactId>
71+
</exclusion>
72+
<exclusion>
73+
<groupId>org.postgresql</groupId>
74+
<artifactId>postgresql</artifactId>
75+
</exclusion>
76+
</exclusions>
77+
<scope>test</scope>
78+
</dependency>
79+
<dependency>
80+
<groupId>com.alibaba.cola</groupId>
81+
<artifactId>cola-component-test-container</artifactId>
82+
<version>5.0.0</version>
83+
<scope>test</scope>
84+
</dependency>
85+
<!-- test end-->
86+
</dependencies>
87+
<build>
88+
<plugins>
89+
<plugin>
90+
<groupId>org.apache.maven.plugins</groupId>
91+
<artifactId>maven-compiler-plugin</artifactId>
92+
<version>3.8.1</version>
93+
<configuration>
94+
<source>17</source>
95+
<target>17</target>
96+
</configuration>
97+
</plugin>
98+
</plugins>
99+
</build>
100+
</project>
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.alibaba.cola.job;
2+
3+
import com.alibaba.cola.job.model.*;
4+
import com.alibaba.cola.job.repository.JobRepository;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import org.springframework.util.CollectionUtils;
8+
9+
import java.time.LocalDateTime;
10+
import java.util.Set;
11+
12+
/**
13+
* 批量任务只支持单个Job的异步执行
14+
*/
15+
@Slf4j
16+
public class BatchJobLauncher {
17+
public static BatchJobExecution execute(BatchJob batchJob) {
18+
JobRepository jobRepository = batchJob.getJobRepository();
19+
BatchJobExecution batchJobExecution = new BatchJobExecution(UuidGenerator.nextBatchJobId());
20+
for (JobInstance jobInstance : batchJob.getJobInstances()) {
21+
jobInstance.getExecutionContext().setBatchJobId(batchJobExecution.getBatchJobId());
22+
String jobExecutionId = JobLauncher.executeAsync(jobInstance.getJob(), jobInstance.getExecutionContext(),
23+
batchJob.getExecutorService());
24+
batchJobExecution.addJobExecution(jobExecutionId);
25+
}
26+
batchJobExecution.setStartTime(LocalDateTime.now());
27+
batchJobExecution.setStatus(ExecutionStatus.STARTED);
28+
batchJobExecution.setJobType(batchJob.getJobType());
29+
jobRepository.saveBatchJobExecution(batchJobExecution);
30+
log.info("[BatchJob] started: {}", batchJobExecution);
31+
return batchJobExecution;
32+
}
33+
34+
public static boolean checkAndRefresh(BatchJob batchJob, String batchJobId) {
35+
JobRepository jobRepository = batchJob.getJobRepository();
36+
BatchJobExecution batchJobExecution = jobRepository.getBatchJobExecutionByBatchJobId(batchJobId);
37+
Set<String> jobIds = batchJobExecution.getJobExecutionResults().keySet();
38+
if (CollectionUtils.isEmpty(jobIds)) {
39+
return false;
40+
}
41+
// 如果已经完成,就不用费时查看每个job的状态了
42+
if (batchJobExecution.isBatchJobCompleted()) {
43+
return true;
44+
}
45+
for (String jobId : jobIds) {
46+
JobExecution jobExecution = jobRepository.getJobExecutionByJobId(jobId);
47+
batchJobExecution.put(jobExecution.getJobId(), jobExecution.getExecutionStatus());
48+
}
49+
boolean isChildrenCompleted = batchJobExecution.isChildrenJobsCompleted();
50+
if (isChildrenCompleted) {
51+
// 所有子任务都已完成
52+
batchJobExecution.setStatus(ExecutionStatus.COMPLETED);
53+
batchJobExecution.setEndTime(LocalDateTime.now());
54+
} else if (batchJobExecution.isFailed()) {
55+
batchJobExecution.setStatus(ExecutionStatus.FAILED);
56+
} else {
57+
batchJobExecution.setStatus(ExecutionStatus.UNKNOWN);
58+
}
59+
jobRepository.updateBatchJobExecution(batchJobExecution);
60+
return isChildrenCompleted;
61+
}
62+
}
63+
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package com.alibaba.cola.job;
2+
3+
import com.alibaba.cola.job.repository.JsonUtil;
4+
import com.fasterxml.jackson.annotation.JsonIgnore;
5+
import com.fasterxml.jackson.annotation.JsonProperty;
6+
7+
import lombok.ToString;
8+
9+
import org.springframework.lang.Nullable;
10+
11+
import java.util.Map;
12+
import java.util.concurrent.ConcurrentHashMap;
13+
14+
@ToString
15+
public class ExecutionContext<T> {
16+
/**
17+
* 这是任务执行的主参数,通常是client自定义的类
18+
*/
19+
private T param;
20+
21+
@JsonIgnore
22+
private Object rawParam;
23+
24+
/**
25+
* 为了灵活性,主参数之外的数据,放在extensions
26+
*/
27+
@JsonProperty("extensions")
28+
private final Map<String, Object> extensions;
29+
30+
31+
/**
32+
* 这里是暂存数据到context的,不会进行持久化
33+
*/
34+
@JsonIgnore
35+
private final Map<String, Object> temp;
36+
37+
/**
38+
* 当执行批量任务的时候,需要传入batchJobId
39+
*/
40+
@JsonIgnore
41+
private String batchJobId;
42+
43+
/**
44+
* 当重复执行一个Job的时候,需要传入上一次执行的jobExecutionId
45+
*/
46+
@JsonIgnore
47+
private String jobId;
48+
49+
public T getParam() {
50+
return (T) param;
51+
}
52+
53+
@SuppressWarnings("unchecked")
54+
public T getParam(Class<T> tClass) {
55+
if (rawParam != null) {
56+
return (T) rawParam;
57+
}
58+
if (param.getClass().equals(tClass)) {
59+
rawParam = param;
60+
}
61+
rawParam = JsonUtil.decode(JsonUtil.encode(param), tClass);
62+
return (T) rawParam;
63+
}
64+
65+
public void setParam(T param) {
66+
this.param = param;
67+
}
68+
69+
public String getJobId() {
70+
return jobId;
71+
}
72+
73+
public void setJobId(String jobId) {
74+
this.jobId = jobId;
75+
}
76+
77+
public String getBatchJobId() {
78+
return batchJobId;
79+
}
80+
81+
public void setBatchJobId(String batchJobId) {
82+
this.batchJobId = batchJobId;
83+
}
84+
85+
public ExecutionContext() {
86+
this.extensions = new ConcurrentHashMap();
87+
this.temp = new ConcurrentHashMap<>();
88+
}
89+
90+
public ExecutionContext(String jobId) {
91+
this();
92+
this.jobId = jobId;
93+
}
94+
95+
public ExecutionContext(Map<String, Object> extensions) {
96+
this.extensions = new ConcurrentHashMap(extensions);
97+
this.temp = new ConcurrentHashMap<>();
98+
}
99+
100+
public void putString(String key, @Nullable String value) {
101+
this.put(key, value);
102+
}
103+
104+
public void putTemp(String key, Object value){
105+
this.temp.put(key, value);
106+
}
107+
108+
public Object getTemp(String key){
109+
return this.temp.get(key);
110+
}
111+
112+
public String getString(String key) {
113+
return (String) this.get(key, String.class);
114+
}
115+
116+
public String getString(String key, String defaultString) {
117+
return !this.containsKey(key) ? defaultString : this.getString(key);
118+
}
119+
120+
public void put(String key, @Nullable Object value) {
121+
if (value != null) {
122+
this.extensions.put(key, value);
123+
} else {
124+
this.extensions.remove(key);
125+
}
126+
}
127+
128+
@Nullable
129+
public Object get(String key) {
130+
return this.extensions.get(key);
131+
}
132+
133+
@Nullable
134+
public <V> V get(String key, Class<V> type) {
135+
Object value = this.extensions.get(key);
136+
return value == null ? null : this.get(key, type, null);
137+
}
138+
139+
@Nullable
140+
public <V> V get(String key, Class<V> type, @Nullable V defaultValue) {
141+
Object value = this.extensions.get(key);
142+
if (value == null) {
143+
return defaultValue;
144+
} else if (!type.isInstance(value)) {
145+
throw new ClassCastException(
146+
"Value for key=[" + key + "] is not of type: [" + type + "], it is [(" + value.getClass() + ")" + value
147+
+ "]");
148+
} else {
149+
return type.cast(value);
150+
}
151+
}
152+
153+
public boolean containsKey(String key) {
154+
return this.extensions.containsKey(key);
155+
}
156+
157+
@Nullable
158+
public Object remove(String key) {
159+
return this.extensions.remove(key);
160+
}
161+
162+
public boolean containsValue(Object value) {
163+
return this.extensions.containsValue(value);
164+
}
165+
166+
public ExecutionContext<?> fromJsonString(String jsonString) {
167+
return JsonUtil.decode(jsonString, ExecutionContext.class);
168+
}
169+
170+
@Override
171+
public String toString() {
172+
return "ExecutionContext{" + "param=" + param + ", extensions=" + extensions + '}';
173+
}
174+
}
175+

0 commit comments

Comments
 (0)