Skip to content

Commit 8c4d921

Browse files
authored
[Fix-17758][Master] Mark task as failed if TaskExecutionContext initialization fails (#17759)
1 parent 41d4e4c commit 8c4d921

File tree

11 files changed

+573
-2
lines changed

11 files changed

+573
-2
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2525
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
2626
import org.apache.dolphinscheduler.server.master.engine.exceptions.WorkflowEventFireException;
27+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
28+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2729
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
2830
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
2931
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
@@ -32,6 +34,7 @@
3234
import org.apache.commons.collections4.MapUtils;
3335

3436
import java.util.Collections;
37+
import java.util.Date;
3538
import java.util.List;
3639
import java.util.Map;
3740
import java.util.Optional;
@@ -127,6 +130,19 @@ private void doFireSingleWorkflowEventBus(final IWorkflowExecutionRunnable workf
127130
workflowEventBus.publish(lifecycleEvent);
128131
ThreadUtils.sleep(5_000);
129132
return;
133+
} else {
134+
log.warn("exception occurred during event handling: {}", lifecycleEvent, ex);
135+
// If other exceptions and the event is task-related, construct and publish a dedicated
136+
// TaskFatalLifecycleEvent
137+
// so that the event will be handled by TaskFatalLifecycleEventHandler
138+
if (lifecycleEvent instanceof AbstractTaskLifecycleEvent) {
139+
AbstractTaskLifecycleEvent taskLifecycleEvent = (AbstractTaskLifecycleEvent) lifecycleEvent;
140+
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
141+
.taskExecutionRunnable(taskLifecycleEvent.getTaskExecutionRunnable())
142+
.endTime(new Date())
143+
.build();
144+
workflowEventBus.publish(taskFatalEvent);
145+
}
130146
}
131147
workflowEventBus.getWorkflowEventBusSummary().decreaseFireSuccessEventCount();
132148
workflowEventBus.getWorkflowEventBusSummary().increaseFireFailedEventCount();

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
2929
* Dispatch the task instance to target.
3030
*/
3131
DISPATCH,
32+
/**
33+
* Task instance encounters catastrophic failure(such as initialization failure), it will enter a failed state.
34+
*/
35+
FATAL,
3236
/**
3337
* The task instance is dispatched to the target executor server.
3438
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
21+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
22+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
23+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
24+
25+
import java.util.Date;
26+
27+
import lombok.AllArgsConstructor;
28+
import lombok.Builder;
29+
import lombok.Data;
30+
31+
@Data
32+
@Builder
33+
@AllArgsConstructor
34+
public class TaskFatalLifecycleEvent extends AbstractTaskLifecycleEvent {
35+
36+
private final ITaskExecutionRunnable taskExecutionRunnable;
37+
38+
private final Date endTime;
39+
40+
@Override
41+
public ILifecycleEventType getEventType() {
42+
return TaskLifecycleEventType.FATAL;
43+
}
44+
45+
@Override
46+
public String toString() {
47+
return "TaskFatalLifecycleEvent{" +
48+
"task=" + taskExecutionRunnable.getName() +
49+
", endTime=" + endTime +
50+
'}';
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
21+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
22+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
23+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
24+
import org.apache.dolphinscheduler.server.master.engine.task.statemachine.ITaskStateAction;
25+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
26+
27+
import org.springframework.stereotype.Component;
28+
29+
@Component
30+
public class TaskFatalLifecycleEventHandler extends AbstractTaskLifecycleEventHandler<TaskFatalLifecycleEvent> {
31+
32+
@Override
33+
public void handle(final ITaskStateAction taskStateAction,
34+
final IWorkflowExecutionRunnable workflowExecutionRunnable,
35+
final ITaskExecutionRunnable taskExecutionRunnable,
36+
final TaskFatalLifecycleEvent taskFatalEvent) {
37+
taskStateAction.onFatalEvent(workflowExecutionRunnable, taskExecutionRunnable, taskFatalEvent);
38+
}
39+
40+
@Override
41+
public ILifecycleEventType matchEventType() {
42+
return TaskLifecycleEventType.FATAL;
43+
}
44+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
3838
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
39+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
3940
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
4041
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
4142
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
@@ -99,6 +100,38 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
99100
}
100101
}
101102

103+
@Override
104+
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
105+
final ITaskExecutionRunnable taskExecutionRunnable,
106+
final TaskFatalLifecycleEvent taskFatalEvent) {
107+
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
108+
persistentTaskInstanceFatalEventToDB(taskExecutionRunnable, taskFatalEvent);
109+
110+
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
111+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
112+
return;
113+
}
114+
115+
// If all successors are condition tasks, then the task will not be marked as failure.
116+
// And the DAG will continue to execute.
117+
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
118+
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
119+
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
120+
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
121+
return;
122+
}
123+
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
124+
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
125+
}
126+
127+
private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
128+
final TaskFatalLifecycleEvent taskFatalEvent) {
129+
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
130+
taskInstance.setState(TaskExecutionStatus.FAILURE);
131+
taskInstance.setEndTime(taskFatalEvent.getEndTime());
132+
taskInstanceDao.updateById(taskInstance);
133+
}
134+
102135
@Override
103136
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
104137
final ITaskExecutionRunnable taskExecutionRunnable,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2323
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
2424
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
25+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2526
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
2627
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
2728
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
@@ -91,6 +92,14 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
9192
final ITaskExecutionRunnable taskExecutionRunnable,
9293
final TaskDispatchLifecycleEvent taskDispatchEvent);
9394

95+
/**
96+
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
97+
* <p> This method is called when the task encounters catastrophic failure (e.g., initialization failure).
98+
*/
99+
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
100+
final ITaskExecutionRunnable taskExecutionRunnable,
101+
final TaskFatalLifecycleEvent taskFatalEvent);
102+
94103
/**
95104
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
96105
* <p> This method is called when the task has been dispatched to executor.

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,36 @@ public void testStartWorkflow_with_oneFailedTask() {
849849
masterContainer.assertAllResourceReleased();
850850
}
851851

852+
@Test
853+
@DisplayName("Test start a workflow with one fake task(A) fatal")
854+
public void testStartWorkflow_with_oneFatalTask() {
855+
final String yaml = "/it/start/workflow_with_one_fake_task_fatal.yaml";
856+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
857+
final WorkflowDefinition workflow = context.getOneWorkflow();
858+
859+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
860+
.workflowDefinition(workflow)
861+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
862+
.build();
863+
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
864+
865+
await()
866+
.atMost(Duration.ofMinutes(1))
867+
.untilAsserted(() -> {
868+
Assertions
869+
.assertThat(repository.queryWorkflowInstance(workflow))
870+
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
871+
.isEqualTo(WorkflowExecutionStatus.FAILURE));
872+
Assertions
873+
.assertThat(repository.queryTaskInstance(workflow))
874+
.satisfiesExactly(taskInstance -> {
875+
assertThat(taskInstance.getName()).isEqualTo("A");
876+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
877+
});
878+
});
879+
masterContainer.assertAllResourceReleased();
880+
}
881+
852882
@Test
853883
@DisplayName("Test start a workflow with one fake task(A) failed")
854884
public void testStartWorkflow_with_oneFailedTaskWithRetry() {
@@ -1403,6 +1433,46 @@ void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
14031433
masterContainer.assertAllResourceReleased();
14041434
}
14051435

1436+
@Test
1437+
@DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run fatal")
1438+
void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFatal() {
1439+
final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml";
1440+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1441+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1442+
1443+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1444+
.workflowDefinition(parentWorkflow)
1445+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1446+
.build();
1447+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1448+
1449+
await()
1450+
.atMost(Duration.ofMinutes(1))
1451+
.untilAsserted(() -> {
1452+
Assertions
1453+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1454+
.matches(
1455+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
1456+
1457+
Assertions
1458+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1459+
.hasSize(3)
1460+
.anySatisfy(taskInstance -> {
1461+
assertThat(taskInstance.getName()).isEqualTo("A");
1462+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
1463+
})
1464+
.anySatisfy(taskInstance -> {
1465+
assertThat(taskInstance.getName()).isEqualTo("B");
1466+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1467+
})
1468+
.anySatisfy(taskInstance -> {
1469+
assertThat(taskInstance.getName()).isEqualTo("D");
1470+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1471+
});
1472+
});
1473+
masterContainer.assertAllResourceReleased();
1474+
}
1475+
14061476
@Test
14071477
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run failed")
14081478
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFailed() {
@@ -1435,4 +1505,37 @@ void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runF
14351505
});
14361506
masterContainer.assertAllResourceReleased();
14371507
}
1508+
1509+
@Test
1510+
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run fatal")
1511+
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFatal() {
1512+
final String yaml =
1513+
"/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml";
1514+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1515+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1516+
1517+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1518+
.workflowDefinition(parentWorkflow)
1519+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1520+
.build();
1521+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1522+
1523+
await()
1524+
.atMost(Duration.ofMinutes(1))
1525+
.untilAsserted(() -> {
1526+
Assertions
1527+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1528+
.matches(
1529+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);
1530+
1531+
Assertions
1532+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1533+
.hasSize(1)
1534+
.satisfiesExactly(taskInstance -> {
1535+
assertThat(taskInstance.getName()).isEqualTo("A");
1536+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
1537+
});
1538+
});
1539+
masterContainer.assertAllResourceReleased();
1540+
}
14381541
}

0 commit comments

Comments
 (0)