Skip to content

Commit 01d2181

Browse files
committed
[Improvement-17001] Once workflow is not exist, delete scheduler task
1 parent 806f051 commit 01d2181

File tree

3 files changed

+97
-22
lines changed

3 files changed

+97
-22
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository;
19+
20+
import org.apache.dolphinscheduler.dao.entity.Schedule;
21+
22+
public interface ScheduleDao extends IDao<Schedule> {
23+
24+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository.impl;
19+
20+
import org.apache.dolphinscheduler.dao.entity.Schedule;
21+
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
22+
import org.apache.dolphinscheduler.dao.repository.BaseDao;
23+
import org.apache.dolphinscheduler.dao.repository.ScheduleDao;
24+
25+
import lombok.NonNull;
26+
27+
import org.springframework.stereotype.Repository;
28+
29+
@Repository
30+
public class ScheduleDaoImpl extends BaseDao<Schedule, ScheduleMapper> implements ScheduleDao {
31+
32+
public ScheduleDaoImpl(@NonNull ScheduleMapper scheduleMapper) {
33+
super(scheduleMapper);
34+
}
35+
36+
}

dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import org.apache.dolphinscheduler.common.enums.TaskDependType;
2323
import org.apache.dolphinscheduler.dao.entity.Schedule;
2424
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
25+
import org.apache.dolphinscheduler.dao.repository.ScheduleDao;
26+
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
2527
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
2628
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
2729
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerRequest;
28-
import org.apache.dolphinscheduler.service.process.ProcessService;
2930

3031
import java.util.Date;
32+
import java.util.Optional;
3133

3234
import lombok.extern.slf4j.Slf4j;
3335

@@ -44,7 +46,10 @@
4446
public class ProcessScheduleTask extends QuartzJobBean {
4547

4648
@Autowired
47-
private ProcessService processService;
49+
private ScheduleDao scheduleDao;
50+
51+
@Autowired
52+
private WorkflowDefinitionDao workflowDefinitionDao;
4853

4954
@Autowired
5055
private IWorkflowControlClient workflowInstanceController;
@@ -53,34 +58,44 @@ public class ProcessScheduleTask extends QuartzJobBean {
5358
@Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
5459
@Override
5560
protected void executeInternal(JobExecutionContext context) {
56-
QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap());
57-
int projectId = quartzJobData.getProjectId();
58-
int scheduleId = quartzJobData.getScheduleId();
61+
final QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap());
62+
final int projectId = quartzJobData.getProjectId();
63+
final int scheduleId = quartzJobData.getScheduleId();
5964

60-
Date scheduledFireTime = context.getScheduledFireTime();
65+
final Date scheduledFireTime = context.getScheduledFireTime();
66+
final Date fireTime = context.getFireTime();
6167

62-
Date fireTime = context.getFireTime();
68+
log.info("Schedule: {} expect fire time is {}, actual fire time is {}", scheduleId, scheduledFireTime,
69+
fireTime);
6370

64-
log.info("scheduled fire time :{}, fire time :{}, scheduleId :{}", scheduledFireTime, fireTime, scheduleId);
65-
66-
// query schedule
67-
Schedule schedule = processService.querySchedule(scheduleId);
71+
// If the schedule does not exist or offline, then delete the corn job
72+
final Schedule schedule = scheduleDao.queryById(scheduleId);
6873
if (schedule == null || ReleaseState.OFFLINE == schedule.getReleaseState()) {
74+
log.warn("Schedule: {} does not exist in db,delete schedule job in quartz", scheduleId);
75+
deleteJob(context, projectId, scheduleId);
76+
return;
77+
}
78+
79+
final Optional<WorkflowDefinition> workflowDefinitionOptional =
80+
workflowDefinitionDao.queryByCode(schedule.getWorkflowDefinitionCode());
81+
if (!workflowDefinitionOptional.isPresent()) {
6982
log.warn(
70-
"process schedule does not exist in db or process schedule offline,delete schedule job in quartz, projectId:{}, scheduleId:{}",
71-
projectId, scheduleId);
83+
"Schedule: {} bind workflow: {} does not exist in db,delete the schedule and delete schedule job in quartz",
84+
scheduleId, schedule.getWorkflowDefinitionCode());
85+
scheduleDao.deleteById(scheduleId);
7286
deleteJob(context, projectId, scheduleId);
7387
return;
7488
}
7589

76-
WorkflowDefinition workflowDefinition =
77-
processService.findWorkflowDefinitionByCode(schedule.getWorkflowDefinitionCode());
78-
// release state : online/offline
79-
ReleaseState releaseState = workflowDefinition.getReleaseState();
80-
if (releaseState == ReleaseState.OFFLINE) {
90+
final WorkflowDefinition workflowDefinition = workflowDefinitionOptional.get();
91+
if (workflowDefinition.getReleaseState() == ReleaseState.OFFLINE) {
8192
log.warn(
82-
"process definition does not exist in db or offline,need not to create command, projectId:{}, processDefinitionId:{}",
83-
projectId, workflowDefinition.getId());
93+
"Schedule: {} bind workflow: {} state is OFFLINE,update the schedule status to OFFLINE and delete schedule job in quartz",
94+
scheduleId, schedule.getWorkflowDefinitionCode());
95+
schedule.setReleaseState(ReleaseState.OFFLINE);
96+
schedule.setUpdateTime(new Date());
97+
scheduleDao.updateById(schedule);
98+
deleteJob(context, projectId, scheduleId);
8499
return;
85100
}
86101

@@ -106,10 +121,10 @@ protected void executeInternal(JobExecutionContext context) {
106121

107122
private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {
108123
final Scheduler scheduler = context.getScheduler();
109-
JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey();
124+
final JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey();
110125
try {
111126
if (scheduler.checkExists(jobKey)) {
112-
log.info("Try to delete job: {}, projectId: {}, schedulerId", projectId, scheduleId);
127+
log.info("Try to delete job: {}, projectId: {}, schedulerId: {}", jobKey, projectId, scheduleId);
113128
scheduler.deleteJob(jobKey);
114129
}
115130
} catch (Exception e) {

0 commit comments

Comments
 (0)