Skip to content

Commit 5aeca87

Browse files
authored
[Fix-17758][Master] Mark task as failed if TaskExecutionContext initialization fails (#17821)
1 parent 4907243 commit 5aeca87

14 files changed

+591
-4
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2525
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
2626
import org.apache.dolphinscheduler.server.master.engine.exceptions.WorkflowEventFireException;
27+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
28+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2729
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
2830
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
2931
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
@@ -32,6 +34,7 @@
3234
import org.apache.commons.collections4.MapUtils;
3335

3436
import java.util.Collections;
37+
import java.util.Date;
3538
import java.util.List;
3639
import java.util.Map;
3740
import java.util.Optional;
@@ -128,6 +131,17 @@ private void doFireSingleWorkflowEventBus(final IWorkflowExecutionRunnable workf
128131
ThreadUtils.sleep(5_000);
129132
return;
130133
}
134+
135+
// If task context init fails, publish a fatal error event
136+
if (ExceptionUtils.isTaskExecutionContextCreateException(ex)) {
137+
AbstractTaskLifecycleEvent taskLifecycleEvent = (AbstractTaskLifecycleEvent) lifecycleEvent;
138+
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
139+
.taskExecutionRunnable(taskLifecycleEvent.getTaskExecutionRunnable())
140+
.endTime(new Date())
141+
.build();
142+
workflowEventBus.publish(taskFatalEvent);
143+
}
144+
131145
workflowEventBus.getWorkflowEventBusSummary().decreaseFireSuccessEventCount();
132146
workflowEventBus.getWorkflowEventBusSummary().increaseFireFailedEventCount();
133147
throw new WorkflowEventFireException(lifecycleEvent, ex);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
2929
* Dispatch the task instance to target.
3030
*/
3131
DISPATCH,
32+
/**
33+
* Task instance encounters catastrophic failure(such as initialization failure), it will enter a failed state.
34+
*/
35+
FATAL,
3236
/**
3337
* The task instance is dispatched to the target executor server.
3438
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
21+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
22+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
23+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
24+
25+
import java.util.Date;
26+
27+
import lombok.AllArgsConstructor;
28+
import lombok.Builder;
29+
import lombok.Data;
30+
31+
@Data
32+
@Builder
33+
@AllArgsConstructor
34+
public class TaskFatalLifecycleEvent extends AbstractTaskLifecycleEvent {
35+
36+
private final ITaskExecutionRunnable taskExecutionRunnable;
37+
38+
private final Date endTime;
39+
40+
@Override
41+
public ILifecycleEventType getEventType() {
42+
return TaskLifecycleEventType.FATAL;
43+
}
44+
45+
@Override
46+
public String toString() {
47+
return "TaskFatalLifecycleEvent{" +
48+
"task=" + taskExecutionRunnable.getName() +
49+
", endTime=" + endTime +
50+
'}';
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
21+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
22+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
23+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
24+
import org.apache.dolphinscheduler.server.master.engine.task.statemachine.ITaskStateAction;
25+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
26+
27+
import org.springframework.stereotype.Component;
28+
29+
@Component
30+
public class TaskFatalLifecycleEventHandler extends AbstractTaskLifecycleEventHandler<TaskFatalLifecycleEvent> {
31+
32+
@Override
33+
public void handle(final ITaskStateAction taskStateAction,
34+
final IWorkflowExecutionRunnable workflowExecutionRunnable,
35+
final ITaskExecutionRunnable taskExecutionRunnable,
36+
final TaskFatalLifecycleEvent taskFatalEvent) {
37+
taskStateAction.onFatalEvent(workflowExecutionRunnable, taskExecutionRunnable, taskFatalEvent);
38+
}
39+
40+
@Override
41+
public ILifecycleEventType matchEventType() {
42+
return TaskLifecycleEventType.FATAL;
43+
}
44+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
3838
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
39+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
3940
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
4041
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
4142
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
@@ -99,6 +100,39 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
99100
}
100101
}
101102

103+
@Override
104+
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
105+
final ITaskExecutionRunnable taskExecutionRunnable,
106+
final TaskFatalLifecycleEvent taskFatalEvent) {
107+
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
108+
persistentTaskInstanceFatalEventToDB(taskExecutionRunnable, taskFatalEvent);
109+
110+
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
111+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
112+
return;
113+
}
114+
115+
// If all successors are condition tasks, then the task will not be marked as failure.
116+
// And the DAG will continue to execute.
117+
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
118+
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
119+
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
120+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
121+
return;
122+
}
123+
124+
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
125+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
126+
}
127+
128+
private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
129+
final TaskFatalLifecycleEvent taskFatalEvent) {
130+
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
131+
taskInstance.setState(TaskExecutionStatus.FAILURE);
132+
taskInstance.setEndTime(taskFatalEvent.getEndTime());
133+
taskInstanceDao.updateById(taskInstance);
134+
}
135+
102136
@Override
103137
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
104138
final ITaskExecutionRunnable taskExecutionRunnable,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2323
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
2424
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
25+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2526
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
2627
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
2728
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
@@ -91,6 +92,14 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
9192
final ITaskExecutionRunnable taskExecutionRunnable,
9293
final TaskDispatchLifecycleEvent taskDispatchEvent);
9394

95+
/**
96+
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
97+
* <p> This method is called when the task encounters catastrophic failure (e.g., initialization failure).
98+
*/
99+
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
100+
final ITaskExecutionRunnable taskExecutionRunnable,
101+
final TaskFatalLifecycleEvent taskFatalEvent);
102+
94103
/**
95104
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
96105
* <p> This method is called when the task has been dispatched to executor.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
3838
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
39+
import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
40+
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
3941

4042
import java.util.concurrent.TimeUnit;
4143

@@ -109,7 +111,17 @@ public void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRu
109111
taskInstance.getDelayTime(),
110112
remainTimeMills);
111113
}
112-
taskExecutionRunnable.initializeTaskExecutionContext();
114+
115+
try {
116+
taskExecutionRunnable.initializeTaskExecutionContext();
117+
} catch (Exception ex) {
118+
if (ExceptionUtils.isDatabaseConnectedFailedException(ex)) {
119+
throw ex;
120+
}
121+
log.error("Failed to initialize task execution context, taskName: {}", taskInstance.getName(), ex);
122+
throw new TaskExecutionContextCreateException(ex.getMessage());
123+
}
124+
113125
workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, remainTimeMills);
114126
}
115127

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/TaskExecutionContextCreateException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.server.master.exception;
1919

20-
public class TaskExecutionContextCreateException extends MasterException {
20+
public class TaskExecutionContextCreateException extends RuntimeException {
2121

2222
public TaskExecutionContextCreateException(String message) {
2323
super(message);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.dolphinscheduler.server.master.utils;
1919

20+
import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
21+
2022
import org.springframework.dao.DataAccessResourceFailureException;
2123

2224
public class ExceptionUtils {
@@ -25,4 +27,7 @@ public static boolean isDatabaseConnectedFailedException(Throwable e) {
2527
return e instanceof DataAccessResourceFailureException;
2628
}
2729

30+
public static boolean isTaskExecutionContextCreateException(Throwable e) {
31+
return e instanceof TaskExecutionContextCreateException;
32+
}
2833
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,36 @@ public void testStartWorkflow_with_oneFailedTask() {
889889
masterContainer.assertAllResourceReleased();
890890
}
891891

892+
@Test
893+
@DisplayName("Test start a workflow with one fake task(A) fatal")
894+
public void testStartWorkflow_with_oneFatalTask() {
895+
final String yaml = "/it/start/workflow_with_one_fake_task_fatal.yaml";
896+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
897+
final WorkflowDefinition workflow = context.getOneWorkflow();
898+
899+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
900+
.workflowDefinition(workflow)
901+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
902+
.build();
903+
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
904+
905+
await()
906+
.atMost(Duration.ofMinutes(1))
907+
.untilAsserted(() -> {
908+
Assertions
909+
.assertThat(repository.queryWorkflowInstance(workflow))
910+
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
911+
.isEqualTo(WorkflowExecutionStatus.FAILURE));
912+
Assertions
913+
.assertThat(repository.queryTaskInstance(workflow))
914+
.satisfiesExactly(taskInstance -> {
915+
assertThat(taskInstance.getName()).isEqualTo("A");
916+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
917+
});
918+
});
919+
masterContainer.assertAllResourceReleased();
920+
}
921+
892922
@Test
893923
@DisplayName("Test start a workflow with one fake task(A) failed")
894924
public void testStartWorkflow_with_oneFailedTaskWithRetry() {
@@ -1443,6 +1473,46 @@ void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
14431473
masterContainer.assertAllResourceReleased();
14441474
}
14451475

1476+
@Test
1477+
@DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run fatal")
1478+
void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFatal() {
1479+
final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml";
1480+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1481+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1482+
1483+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1484+
.workflowDefinition(parentWorkflow)
1485+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1486+
.build();
1487+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1488+
1489+
await()
1490+
.atMost(Duration.ofMinutes(1))
1491+
.untilAsserted(() -> {
1492+
Assertions
1493+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1494+
.matches(
1495+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
1496+
1497+
Assertions
1498+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1499+
.hasSize(3)
1500+
.anySatisfy(taskInstance -> {
1501+
assertThat(taskInstance.getName()).isEqualTo("A");
1502+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
1503+
})
1504+
.anySatisfy(taskInstance -> {
1505+
assertThat(taskInstance.getName()).isEqualTo("B");
1506+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1507+
})
1508+
.anySatisfy(taskInstance -> {
1509+
assertThat(taskInstance.getName()).isEqualTo("D");
1510+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1511+
});
1512+
});
1513+
masterContainer.assertAllResourceReleased();
1514+
}
1515+
14461516
@Test
14471517
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run failed")
14481518
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFailed() {
@@ -1475,4 +1545,37 @@ void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runF
14751545
});
14761546
masterContainer.assertAllResourceReleased();
14771547
}
1548+
1549+
@Test
1550+
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run fatal")
1551+
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFatal() {
1552+
final String yaml =
1553+
"/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml";
1554+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1555+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1556+
1557+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1558+
.workflowDefinition(parentWorkflow)
1559+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1560+
.build();
1561+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1562+
1563+
await()
1564+
.atMost(Duration.ofMinutes(1))
1565+
.untilAsserted(() -> {
1566+
Assertions
1567+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1568+
.matches(
1569+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);
1570+
1571+
Assertions
1572+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1573+
.hasSize(1)
1574+
.satisfiesExactly(taskInstance -> {
1575+
assertThat(taskInstance.getName()).isEqualTo("A");
1576+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
1577+
});
1578+
});
1579+
masterContainer.assertAllResourceReleased();
1580+
}
14781581
}

0 commit comments

Comments
 (0)