Skip to content

Commit 87822a2

Browse files
authored
Merge pull request #87 from stelin/main
Update
2 parents ca89773 + 4f1c9b9 commit 87822a2

File tree

29 files changed

+177
-57
lines changed

29 files changed

+177
-57
lines changed

openjob-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>openjob</artifactId>
77
<groupId>io.openjob</groupId>
8-
<version>1.0.0</version>
8+
<version>1.0.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<artifactId>openjob-common</artifactId>

openjob-common/src/main/java/io/openjob/common/constant/TimeExpressionTypeEnum.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,8 @@ public static Boolean isCron(String type) {
5151
public static Boolean isSecondDelay(String type) {
5252
return SECOND_DELAY.getType().equals(type);
5353
}
54+
55+
public static Boolean isOneTime(String type) {
56+
return ONE_TIME.getType().equals(type);
57+
}
5458
}

openjob-server/openjob-server-admin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>openjob-server</artifactId>
77
<groupId>io.openjob</groupId>
8-
<version>1.0.0</version>
8+
<version>1.0.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<artifactId>openjob-server-admin</artifactId>

openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@
55
import lombok.Getter;
66

77
/**
8+
* user: 100+
9+
* namespace: 200+
10+
* application: 300+
11+
* job: 400+
12+
* delay: 500+
13+
*
814
* @author stelin swoft@qq.com
915
* @since 1.0.0
1016
*/
@@ -15,7 +21,9 @@ public enum CodeEnum implements CodeExceptionAssert {
1521
/**
1622
* App name not exist
1723
*/
18-
NAME_EXIST(100, "App name must be globally unique!");
24+
NAME_EXIST(100, "App name must be globally unique!"),
25+
26+
TIME_EXPRESSION_INVALID(400, "Time expression is invalid");
1927

2028
/**
2129
* Value

openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.openjob.common.constant.CommonConstant;
55
import io.openjob.common.constant.TimeExpressionTypeEnum;
66
import io.openjob.server.admin.constant.AdminConstant;
7+
import io.openjob.server.admin.constant.CodeEnum;
78
import io.openjob.server.admin.request.job.AddJobRequest;
89
import io.openjob.server.admin.request.job.DeleteJobRequest;
910
import io.openjob.server.admin.request.job.ExecuteJobRequest;
@@ -24,13 +25,15 @@
2425
import io.openjob.server.common.util.BeanMapperUtil;
2526
import io.openjob.server.common.util.PageUtil;
2627
import io.openjob.server.common.vo.PageVO;
28+
import io.openjob.server.repository.constant.JobStatusEnum;
2729
import io.openjob.server.repository.dao.AppDAO;
2830
import io.openjob.server.repository.dao.JobDAO;
2931
import io.openjob.server.repository.dto.JobPageDTO;
3032
import io.openjob.server.repository.entity.App;
3133
import io.openjob.server.repository.entity.Job;
3234
import io.openjob.server.scheduler.dto.JobExecuteRequestDTO;
3335
import io.openjob.server.scheduler.service.JobSchedulingService;
36+
import lombok.extern.slf4j.Slf4j;
3437
import org.springframework.beans.factory.annotation.Autowired;
3538
import org.springframework.stereotype.Component;
3639
import org.springframework.util.CollectionUtils;
@@ -47,6 +50,7 @@
4750
* @author zhenghongyang <sakuraovq@gmail.com>
4851
* @since 1.0.0
4952
*/
53+
@Slf4j
5054
@Component
5155
public class JobServiceImpl implements JobService {
5256

@@ -67,7 +71,13 @@ public AddJobVO add(AddJobRequest addJobRequest) {
6771
addJobRequest.setTimeExpression(String.valueOf(addJobRequest.getTimeExpressionValue()));
6872
}
6973

70-
long id = this.jobDAO.save(BeanMapperUtil.map(addJobRequest, Job.class));
74+
// Running status to parse next execute time.
75+
Job job = BeanMapperUtil.map(addJobRequest, Job.class);
76+
if (TimeExpressionTypeEnum.isCron(addJobRequest.getTimeExpressionType()) && JobStatusEnum.isRunning(job.getStatus())) {
77+
job.setNextExecuteTime(this.parseTimeExpression(job.getTimeExpression()));
78+
}
79+
80+
long id = this.jobDAO.save(job);
7181
return new AddJobVO().setId(id);
7282
}
7383

@@ -77,19 +87,39 @@ public UpdateJobVO update(UpdateJobRequest updateJobRequest) {
7787
updateJobRequest.setTimeExpression(String.valueOf(updateJobRequest.getTimeExpressionValue()));
7888
}
7989

80-
this.jobDAO.update(BeanMapperUtil.map(updateJobRequest, Job.class));
90+
// Job
91+
Job originJob = this.jobDAO.getById(updateJobRequest.getId());
92+
Job updateJob = BeanMapperUtil.map(updateJobRequest, Job.class);
93+
94+
// Condition
95+
boolean isUpdateCron = TimeExpressionTypeEnum.isCron(updateJob.getTimeExpressionType());
96+
boolean isUpdateExpression = !updateJob.getTimeExpression().equals(originJob.getTimeExpression());
97+
boolean isRunningStatus = JobStatusEnum.isRunning(updateJob.getStatus());
98+
99+
// Parse time expression
100+
if (isRunningStatus && isUpdateCron && isUpdateExpression) {
101+
updateJob.setNextExecuteTime(this.parseTimeExpression(updateJob.getTimeExpression()));
102+
}
103+
104+
this.jobDAO.update(updateJob);
81105
return new UpdateJobVO();
82106
}
83107

84108
@Override
85109
public DeleteJobVO delete(DeleteJobRequest deleteJobRequest) {
86-
this.jobDAO.updateByStatusOrDeleted(deleteJobRequest.getId(), null, CommonConstant.YES);
110+
this.jobDAO.updateByStatusOrDeleted(deleteJobRequest.getId(), null, CommonConstant.YES, null);
87111
return new DeleteJobVO();
88112
}
89113

90114
@Override
91115
public UpdateJobStatusVO updateStatus(UpdateJobStatusRequest request) {
92-
this.jobDAO.updateByStatusOrDeleted(request.getId(), request.getStatus(), null);
116+
Long nextExecuteTime = null;
117+
Job originJob = this.jobDAO.getById(request.getId());
118+
if (TimeExpressionTypeEnum.isCron(originJob.getTimeExpressionType()) && JobStatusEnum.isRunning(request.getStatus())) {
119+
nextExecuteTime = this.parseTimeExpression(originJob.getTimeExpression());
120+
}
121+
122+
this.jobDAO.updateByStatusOrDeleted(request.getId(), request.getStatus(), null, nextExecuteTime);
93123
return new UpdateJobStatusVO();
94124
}
95125

@@ -150,4 +180,21 @@ public PageVO<ListJobVO> getPageList(ListJobRequest request) {
150180
return listJobVO;
151181
});
152182
}
183+
184+
/**
185+
* Parse time expression
186+
*
187+
* @param timeExpression timeExpression
188+
* @return Long
189+
*/
190+
private Long parseTimeExpression(String timeExpression) {
191+
try {
192+
CronExpression cronExpression = new CronExpression(timeExpression);
193+
return cronExpression.getNextValidTimeAfter(new Date()).toInstant().getEpochSecond();
194+
} catch (ParseException e) {
195+
CodeEnum.TIME_EXPRESSION_INVALID.throwException();
196+
log.error("Parse time expression failed! timeExpression=" + timeExpression, e);
197+
return null;
198+
}
199+
}
153200
}

openjob-server/openjob-server-cluster/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>openjob-server</artifactId>
77
<groupId>io.openjob</groupId>
8-
<version>1.0.0</version>
8+
<version>1.0.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/autoconfigure/ClusterProperties.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
@ConfigurationProperties(prefix = "openjob.cluster")
1212
public class ClusterProperties {
1313

14+
/**
15+
* Cluster worker
16+
*/
17+
private ClusterProperties.Worker worker = new ClusterProperties.Worker();
18+
1419
/**
1520
* Ping timeout(ms).
1621
*/
@@ -45,4 +50,18 @@ public class ClusterProperties {
4550
* Cluster node success period time(ms).
4651
*/
4752
private Integer nodeSuccessPeriodTime = 15000;
53+
54+
@Data
55+
public static class Worker {
56+
57+
/**
58+
* Worker online period(s).
59+
*/
60+
private Integer onlinePeriod = 30;
61+
62+
/**
63+
* Worker offline period(s).
64+
*/
65+
private Integer offlinePeriod = 60;
66+
}
4867
}

openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/constant/ClusterConstant.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,9 @@
55
* @since 1.0.0
66
*/
77
public class ClusterConstant {
8+
89
/**
910
* Worker check delay(ms)
1011
*/
1112
public static final long WORKER_CHECK_DELAY = 20000L;
12-
13-
/**
14-
* Worker check period(s).
15-
*/
16-
public static final Integer WORKER_CHECK_PERIOD = 15;
1713
}

openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ public void doWorkerStop(WorkerStopRequest stopReq) {
150150
* Worker check
151151
*/
152152
public void workerCheck() {
153-
// Query all workers.
154-
long timePos = DateUtil.timestamp() - ClusterConstant.WORKER_CHECK_PERIOD;
155153
Set<Long> currentSlots = ClusterContext.getCurrentSlots();
156154

157155
// Query slot ids.
@@ -168,11 +166,12 @@ public void workerCheck() {
168166
.collect(Collectors.groupingBy(Worker::getStatus));
169167

170168
// New join worker
169+
long onlinePos = DateUtil.timestamp() - this.clusterProperties.getWorker().getOnlinePeriod();
171170
List<Worker> offlineWorkers = Optional.ofNullable(workerMap.get(WorkerStatusEnum.OFFLINE.getStatus())).orElseGet(ArrayList::new);
172171
offlineWorkers.forEach(w -> {
173172
// Join worker
174173
// Ignore just off the line
175-
if (w.getUpdateTime() < timePos && w.getLastHeartbeatTime() > timePos) {
174+
if (w.getUpdateTime() < onlinePos && w.getLastHeartbeatTime() > onlinePos) {
176175
WorkerStartRequest workerStartRequest = new WorkerStartRequest();
177176
workerStartRequest.setAddress(w.getAddress());
178177
workerStartRequest.setAppName(w.getAppName());
@@ -184,10 +183,11 @@ public void workerCheck() {
184183
});
185184

186185
// New fail worker.
186+
long offlinePos = DateUtil.timestamp() - this.clusterProperties.getWorker().getOfflinePeriod();
187187
List<Worker> onlineWorkers = Optional.ofNullable(workerMap.get(WorkerStatusEnum.ONLINE.getStatus())).orElseGet(ArrayList::new);
188188
onlineWorkers.forEach(w -> {
189189
// Fail worker
190-
if (w.getLastHeartbeatTime() < timePos) {
190+
if (w.getLastHeartbeatTime() < offlinePos) {
191191
WorkerStopRequest workerStopRequest = new WorkerStopRequest();
192192
workerStopRequest.setWorkerKey(w.getWorkerKey());
193193
workerStopRequest.setAddress(w.getAddress());

openjob-server/openjob-server-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>openjob-server</artifactId>
77
<groupId>io.openjob</groupId>
8-
<version>1.0.0</version>
8+
<version>1.0.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<artifactId>openjob-server-common</artifactId>

0 commit comments

Comments
 (0)