Skip to content

Commit e119b1d

Browse files
authored
[Fix-17732] Change workflow instance status to failure when command handle failed (#17745)
1 parent 91c1b57 commit e119b1d

File tree

10 files changed

+164
-16
lines changed

10 files changed

+164
-16
lines changed

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ int updateWorkflowInstanceState(
138138
@Param("originState") WorkflowExecutionStatus originState,
139139
@Param("targetState") WorkflowExecutionStatus targetState);
140140

141+
int forceUpdateWorkflowInstanceState(@Param("id") Integer id, @Param("status") WorkflowExecutionStatus status);
142+
141143
/**
142144
* update workflow instance by tenantCode
143145
*

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ void updateWorkflowInstanceState(Integer workflowInstanceId,
3939
WorkflowExecutionStatus originState,
4040
WorkflowExecutionStatus targetState);
4141

42+
void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status);
43+
4244
/**
4345
* find last scheduler workflow instance in the date interval
4446
*

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExec
7272
}
7373
}
7474

75+
@Override
76+
public void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status) {
77+
mybatisMapper.forceUpdateWorkflowInstanceState(id, status);
78+
}
79+
7580
/**
7681
* find last scheduler process instance in the date interval
7782
*

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@
158158
where id = #{workflowInstanceId} and state = #{originState}
159159
</update>
160160

161+
<update id="forceUpdateWorkflowInstanceState">
162+
update t_ds_workflow_instance set state = #{status} where id = #{id}
163+
</update>
164+
161165
<update id="updateWorkflowInstanceByTenantCode">
162166
update t_ds_workflow_instance
163167
set tenant_code = #{destTenantCode}

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ void updateWorkflowInstanceState_failed() {
9696
unsupportedOperationException.getMessage());
9797
}
9898

99+
@Test
100+
void forceUpdateWorkflowInstanceState() {
101+
WorkflowInstance workflowInstance = createWorkflowInstance(1L, 1, WorkflowExecutionStatus.RUNNING_EXECUTION);
102+
workflowInstanceDao.insert(workflowInstance);
103+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstance.getId(), WorkflowExecutionStatus.FAILURE);
104+
assertEquals(WorkflowExecutionStatus.FAILURE,
105+
workflowInstanceDao.queryById(workflowInstance.getId()).getState());
106+
}
107+
99108
@Test
100109
void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() {
101110
long workflowDefinitionCode = 1L;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2727
import org.apache.dolphinscheduler.dao.entity.Command;
2828
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
29+
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2930
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
3031
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
3132
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -52,6 +53,7 @@
5253

5354
import org.springframework.beans.factory.annotation.Autowired;
5455
import org.springframework.stereotype.Service;
56+
import org.springframework.transaction.support.TransactionTemplate;
5557

5658
/**
5759
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
@@ -75,6 +77,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
7577
@Autowired
7678
private IWorkflowRepository workflowRepository;
7779

80+
@Autowired
81+
private WorkflowInstanceDao workflowInstanceDao;
82+
7883
@Autowired
7984
private WorkflowExecutionRunnableFactory workflowExecutionRunnableFactory;
8085

@@ -84,6 +89,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
8489
@Autowired
8590
private WorkflowEventBusCoordinator workflowEventBusCoordinator;
8691

92+
@Autowired
93+
private TransactionTemplate transactionTemplate;
94+
8795
private ExecutorService commandHandleThreadPool;
8896

8997
private boolean flag = false;
@@ -189,8 +197,18 @@ private Void bootstrapError(Command command, Throwable throwable) {
189197
throwable);
190198
return null;
191199
}
192-
log.error("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
193-
commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
200+
201+
transactionTemplate.execute(status -> {
202+
log.warn("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
203+
final int workflowInstanceId = command.getWorkflowInstanceId();
204+
205+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE);
206+
log.info("Set workflow instance {} state to FAILURE", workflowInstanceId);
207+
208+
commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
209+
log.info("Move command {} to error command table", command.getId());
210+
return null;
211+
});
194212
return null;
195213
}
196214

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.Set;
31-
import java.util.function.Function;
3231
import java.util.stream.Collectors;
3332

3433
public class WorkflowGraph implements IWorkflowGraph {
@@ -46,12 +45,20 @@ public WorkflowGraph(List<WorkflowTaskRelation> workflowTaskRelations, List<Task
4645
this.predecessors = new HashMap<>();
4746
this.successors = new HashMap<>();
4847

49-
this.taskDefinitionMap = taskDefinitions
50-
.stream()
51-
.collect(Collectors.toMap(TaskDefinition::getName, Function.identity()));
52-
this.taskDefinitionCodeMap = taskDefinitions
53-
.stream()
54-
.collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
48+
this.taskDefinitionMap = new HashMap<>(taskDefinitions.size());
49+
this.taskDefinitionCodeMap = new HashMap<>(taskDefinitions.size());
50+
for (TaskDefinition taskDefinition : taskDefinitions) {
51+
if (taskDefinitionMap.containsKey(taskDefinition.getName())) {
52+
throw new IllegalArgumentException(
53+
"Duplicate task name: " + taskDefinition.getName() + " in the workflow");
54+
}
55+
taskDefinitionMap.put(taskDefinition.getName(), taskDefinition);
56+
if (taskDefinitionCodeMap.containsKey(taskDefinition.getCode())) {
57+
throw new IllegalArgumentException(
58+
"Duplicate task code: " + taskDefinition.getCode() + " in the workflow");
59+
}
60+
taskDefinitionCodeMap.put(taskDefinition.getCode(), taskDefinition);
61+
}
5562

5663
addTaskNodes(taskDefinitions);
5764
addTaskEdge(workflowTaskRelations);

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,30 @@ public void testStartWorkflow_with_oneSuccessTaskDryRun() {
124124
masterContainer.assertAllResourceReleased();
125125
}
126126

127+
@Test
128+
@DisplayName("Test start a workflow with two fake task(A) has the same name")
129+
public void testStartWorkflow_contains_duplicateTaskName() {
130+
final String yaml = "/it/start/workflow_with_duplicate_task_name.yaml";
131+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
132+
final WorkflowDefinition workflow = context.getOneWorkflow();
133+
134+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
135+
.workflowDefinition(workflow)
136+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
137+
.build();
138+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
139+
140+
await()
141+
.atMost(Duration.ofMinutes(1))
142+
.untilAsserted(() -> {
143+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
144+
.isEqualTo(WorkflowExecutionStatus.FAILURE);
145+
assertThat(repository.queryTaskInstance(workflowInstanceId)).isEmpty();
146+
});
147+
148+
masterContainer.assertAllResourceReleased();
149+
}
150+
127151
@Test
128152
@DisplayName("Test start a workflow with one fake task(A) using serial wait strategy")
129153
public void testStartWorkflow_with_serialWaitStrategy() {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_duplicate_task_name
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a fake workflow with two parallel failed tasks
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
39+
tasks:
40+
- name: A
41+
code: 1
42+
version: 1
43+
projectCode: 1
44+
userId: 1
45+
taskType: LogicFakeTask
46+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
47+
workerGroup: default
48+
createTime: 2024-08-12 00:00:00
49+
updateTime: 2021-08-12 00:00:00
50+
taskExecuteType: BATCH
51+
- name: A
52+
code: 2
53+
version: 1
54+
projectCode: 1
55+
userId: 1
56+
taskType: LogicFakeTask
57+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
58+
workerGroup: default
59+
createTime: 2024-08-12 00:00:00
60+
updateTime: 2021-08-12 00:00:00
61+
taskExecuteType: BATCH
62+
63+
taskRelations:
64+
- projectCode: 1
65+
workflowDefinitionCode: 1
66+
workflowDefinitionVersion: 1
67+
preTaskCode: 0
68+
preTaskVersion: 0
69+
postTaskCode: 1
70+
postTaskVersion: 1
71+
createTime: 2024-08-12 00:00:00
72+
updateTime: 2024-08-12 00:00:00
73+
- projectCode: 1
74+
workflowDefinitionCode: 1
75+
workflowDefinitionVersion: 1
76+
preTaskCode: 0
77+
preTaskVersion: 0
78+
postTaskCode: 2
79+
postTaskVersion: 1
80+
createTime: 2024-08-12 00:00:00
81+
updateTime: 2024-08-12 00:00:00

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
2929
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
3030
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
31-
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
3231

3332
import org.apache.commons.lang3.StringUtils;
3433

@@ -61,14 +60,11 @@ public class CommandServiceImpl implements CommandService {
6160
@Autowired
6261
private ScheduleMapper scheduleMapper;
6362

64-
@Autowired
65-
private WorkflowDefinitionMapper processDefineMapper;
66-
6763
@Override
6864
public void moveToErrorCommand(Command command, String message) {
69-
ErrorCommand errorCommand = new ErrorCommand(command, message);
70-
this.errorCommandMapper.insert(errorCommand);
71-
this.commandMapper.deleteById(command.getId());
65+
final ErrorCommand errorCommand = new ErrorCommand(command, message);
66+
errorCommandMapper.insert(errorCommand);
67+
commandMapper.deleteById(command.getId());
7268
}
7369

7470
@Override

0 commit comments

Comments
 (0)