Skip to content

Commit e248fdd

Browse files
committed
[Fix-17732] Change workflow instance status to failure when command handle failed
1 parent 3aeabb8 commit e248fdd

File tree

8 files changed

+69
-12
lines changed

8 files changed

+69
-12
lines changed

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ int updateWorkflowInstanceState(
138138
@Param("originState") WorkflowExecutionStatus originState,
139139
@Param("targetState") WorkflowExecutionStatus targetState);
140140

141+
int forceUpdateWorkflowInstanceState(@Param("id") Integer id, @Param("status") WorkflowExecutionStatus status);
142+
141143
/**
142144
* update workflow instance by tenantCode
143145
*

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ void updateWorkflowInstanceState(Integer workflowInstanceId,
3939
WorkflowExecutionStatus originState,
4040
WorkflowExecutionStatus targetState);
4141

42+
void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status);
43+
4244
/**
4345
* performs an "upsert" operation (update or insert) on a WorkflowInstance object within a new transaction
4446
*

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExec
7575
}
7676
}
7777

78+
@Override
79+
public void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status) {
80+
mybatisMapper.forceUpdateWorkflowInstanceState(id, status);
81+
}
82+
7883
@Override
7984
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
8085
public void performTransactionalUpsert(WorkflowInstance workflowInstance) {

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@
158158
where id = #{workflowInstanceId} and state = #{originState}
159159
</update>
160160

161+
<update id="forceUpdateWorkflowInstanceState">
162+
update t_ds_workflow_instance set state = #{status} where id = #{id}
163+
</update>
164+
161165
<update id="updateWorkflowInstanceByTenantCode">
162166
update t_ds_workflow_instance
163167
set tenant_code = #{destTenantCode}

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ void updateWorkflowInstanceState_failed() {
9696
unsupportedOperationException.getMessage());
9797
}
9898

99+
@Test
100+
void forceUpdateWorkflowInstanceState() {
101+
WorkflowInstance workflowInstance = createWorkflowInstance(1L, 1, WorkflowExecutionStatus.RUNNING_EXECUTION);
102+
workflowInstanceDao.insert(workflowInstance);
103+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstance.getId(), WorkflowExecutionStatus.FAILURE);
104+
assertEquals(WorkflowExecutionStatus.FAILURE,
105+
workflowInstanceDao.queryById(workflowInstance.getId()).getState());
106+
}
107+
99108
@Test
100109
void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() {
101110
long workflowDefinitionCode = 1L;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2727
import org.apache.dolphinscheduler.dao.entity.Command;
2828
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
29+
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2930
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
3031
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
3132
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -52,6 +53,7 @@
5253

5354
import org.springframework.beans.factory.annotation.Autowired;
5455
import org.springframework.stereotype.Service;
56+
import org.springframework.transaction.support.TransactionTemplate;
5557

5658
/**
5759
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
@@ -75,6 +77,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
7577
@Autowired
7678
private IWorkflowRepository workflowRepository;
7779

80+
@Autowired
81+
private WorkflowInstanceDao workflowInstanceDao;
82+
7883
@Autowired
7984
private WorkflowExecutionRunnableFactory workflowExecutionRunnableFactory;
8085

@@ -84,6 +89,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
8489
@Autowired
8590
private WorkflowEventBusCoordinator workflowEventBusCoordinator;
8691

92+
@Autowired
93+
private TransactionTemplate transactionTemplate;
94+
8795
private ExecutorService commandHandleThreadPool;
8896

8997
private boolean flag = false;
@@ -189,8 +197,18 @@ private Void bootstrapError(Command command, Throwable throwable) {
189197
throwable);
190198
return null;
191199
}
192-
log.error("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
193-
commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
200+
201+
transactionTemplate.execute(status -> {
202+
log.warn("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
203+
final int workflowInstanceId = command.getWorkflowInstanceId();
204+
205+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE);
206+
log.info("Set workflow instance {} state to FAILURE", workflowInstanceId);
207+
208+
commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
209+
log.info("Move command {} to error command table", command.getId());
210+
return null;
211+
});
194212
return null;
195213
}
196214

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.Set;
31-
import java.util.function.Function;
3231
import java.util.stream.Collectors;
3332

3433
public class WorkflowGraph implements IWorkflowGraph {
@@ -46,12 +45,20 @@ public WorkflowGraph(List<WorkflowTaskRelation> workflowTaskRelations, List<Task
4645
this.predecessors = new HashMap<>();
4746
this.successors = new HashMap<>();
4847

49-
this.taskDefinitionMap = taskDefinitions
50-
.stream()
51-
.collect(Collectors.toMap(TaskDefinition::getName, Function.identity()));
52-
this.taskDefinitionCodeMap = taskDefinitions
53-
.stream()
54-
.collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
48+
this.taskDefinitionMap = new HashMap<>(taskDefinitions.size());
49+
this.taskDefinitionCodeMap = new HashMap<>(taskDefinitions.size());
50+
for (TaskDefinition taskDefinition : taskDefinitions) {
51+
if (taskDefinitionMap.containsKey(taskDefinition.getName())) {
52+
throw new IllegalArgumentException(
53+
"Duplicate task name: " + taskDefinition.getName() + " in the workflow");
54+
}
55+
taskDefinitionMap.put(taskDefinition.getName(), taskDefinition);
56+
if (taskDefinitionCodeMap.containsKey(taskDefinition.getCode())) {
57+
throw new IllegalArgumentException(
58+
"Duplicate task code: " + taskDefinition.getCode() + " in the workflow");
59+
}
60+
taskDefinitionCodeMap.put(taskDefinition.getCode(), taskDefinition);
61+
}
5562

5663
addTaskNodes(taskDefinitions);
5764
addTaskEdge(workflowTaskRelations);

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.dolphinscheduler.common.constants.Constants;
2323
import org.apache.dolphinscheduler.common.enums.CommandType;
24+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2425
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2526
import org.apache.dolphinscheduler.dao.entity.Command;
2627
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
@@ -29,6 +30,7 @@
2930
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
3031
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
3132
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
33+
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
3234

3335
import org.apache.commons.lang3.StringUtils;
3436

@@ -41,6 +43,7 @@
4143

4244
import org.springframework.beans.factory.annotation.Autowired;
4345
import org.springframework.stereotype.Component;
46+
import org.springframework.transaction.annotation.Transactional;
4447

4548
import com.fasterxml.jackson.databind.node.ObjectNode;
4649
import io.micrometer.core.annotation.Counted;
@@ -64,11 +67,18 @@ public class CommandServiceImpl implements CommandService {
6467
@Autowired
6568
private WorkflowDefinitionMapper processDefineMapper;
6669

70+
@Autowired
71+
private WorkflowInstanceDao workflowInstanceDao;
72+
6773
@Override
74+
@Transactional
6875
public void moveToErrorCommand(Command command, String message) {
69-
ErrorCommand errorCommand = new ErrorCommand(command, message);
70-
this.errorCommandMapper.insert(errorCommand);
71-
this.commandMapper.deleteById(command.getId());
76+
final ErrorCommand errorCommand = new ErrorCommand(command, message);
77+
errorCommandMapper.insert(errorCommand);
78+
commandMapper.deleteById(command.getId());
79+
workflowInstanceDao.forceUpdateWorkflowInstanceState(command.getWorkflowInstanceId(),
80+
WorkflowExecutionStatus.FAILURE);
81+
log.info("Set workflow instance {} state to FAILURE", command.getWorkflowInstanceId());
7282
}
7383

7484
@Override

0 commit comments

Comments
 (0)