Skip to content

Commit 3ccf059

Browse files
authored
Revert "[Fix-17758][Master] Mark task as failed if TaskExecutionContext initialization fails" (#17807)
This reverts commit 8c4d921.
1 parent 8c4d921 commit 3ccf059

File tree

11 files changed

+2
-573
lines changed

11 files changed

+2
-573
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2525
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
2626
import org.apache.dolphinscheduler.server.master.engine.exceptions.WorkflowEventFireException;
27-
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
28-
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2927
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
3028
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
3129
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
@@ -34,7 +32,6 @@
3432
import org.apache.commons.collections4.MapUtils;
3533

3634
import java.util.Collections;
37-
import java.util.Date;
3835
import java.util.List;
3936
import java.util.Map;
4037
import java.util.Optional;
@@ -130,19 +127,6 @@ private void doFireSingleWorkflowEventBus(final IWorkflowExecutionRunnable workf
130127
workflowEventBus.publish(lifecycleEvent);
131128
ThreadUtils.sleep(5_000);
132129
return;
133-
} else {
134-
log.warn("exception occurred during event handling: {}", lifecycleEvent, ex);
135-
// If other exceptions and the event is task-related, construct and publish a dedicated
136-
// TaskFatalLifecycleEvent
137-
// so that the event will be handled by TaskFatalLifecycleEventHandler
138-
if (lifecycleEvent instanceof AbstractTaskLifecycleEvent) {
139-
AbstractTaskLifecycleEvent taskLifecycleEvent = (AbstractTaskLifecycleEvent) lifecycleEvent;
140-
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
141-
.taskExecutionRunnable(taskLifecycleEvent.getTaskExecutionRunnable())
142-
.endTime(new Date())
143-
.build();
144-
workflowEventBus.publish(taskFatalEvent);
145-
}
146130
}
147131
workflowEventBus.getWorkflowEventBusSummary().decreaseFireSuccessEventCount();
148132
workflowEventBus.getWorkflowEventBusSummary().increaseFireFailedEventCount();

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
2929
* Dispatch the task instance to target.
3030
*/
3131
DISPATCH,
32-
/**
33-
* Task instance encounters catastrophic failure(such as initialization failure), it will enter a failed state.
34-
*/
35-
FATAL,
3632
/**
3733
* The task instance is dispatched to the target executor server.
3834
*/

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskFatalLifecycleEvent.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskFatalLifecycleEventHandler.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
3838
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
39-
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
4039
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
4140
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
4241
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
@@ -100,38 +99,6 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
10099
}
101100
}
102101

103-
@Override
104-
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
105-
final ITaskExecutionRunnable taskExecutionRunnable,
106-
final TaskFatalLifecycleEvent taskFatalEvent) {
107-
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
108-
persistentTaskInstanceFatalEventToDB(taskExecutionRunnable, taskFatalEvent);
109-
110-
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
111-
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
112-
return;
113-
}
114-
115-
// If all successors are condition tasks, then the task will not be marked as failure.
116-
// And the DAG will continue to execute.
117-
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
118-
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
119-
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
120-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
121-
return;
122-
}
123-
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
124-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
125-
}
126-
127-
private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
128-
final TaskFatalLifecycleEvent taskFatalEvent) {
129-
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
130-
taskInstance.setState(TaskExecutionStatus.FAILURE);
131-
taskInstance.setEndTime(taskFatalEvent.getEndTime());
132-
taskInstanceDao.updateById(taskInstance);
133-
}
134-
135102
@Override
136103
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
137104
final ITaskExecutionRunnable taskExecutionRunnable,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2323
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
2424
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
25-
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2625
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
2726
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
2827
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
@@ -92,14 +91,6 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
9291
final ITaskExecutionRunnable taskExecutionRunnable,
9392
final TaskDispatchLifecycleEvent taskDispatchEvent);
9493

95-
/**
96-
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
97-
* <p> This method is called when the task encounters catastrophic failure (e.g., initialization failure).
98-
*/
99-
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
100-
final ITaskExecutionRunnable taskExecutionRunnable,
101-
final TaskFatalLifecycleEvent taskFatalEvent);
102-
10394
/**
10495
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
10596
* <p> This method is called when the task has been dispatched to executor.

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 0 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -849,36 +849,6 @@ public void testStartWorkflow_with_oneFailedTask() {
849849
masterContainer.assertAllResourceReleased();
850850
}
851851

852-
@Test
853-
@DisplayName("Test start a workflow with one fake task(A) fatal")
854-
public void testStartWorkflow_with_oneFatalTask() {
855-
final String yaml = "/it/start/workflow_with_one_fake_task_fatal.yaml";
856-
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
857-
final WorkflowDefinition workflow = context.getOneWorkflow();
858-
859-
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
860-
.workflowDefinition(workflow)
861-
.runWorkflowCommandParam(new RunWorkflowCommandParam())
862-
.build();
863-
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
864-
865-
await()
866-
.atMost(Duration.ofMinutes(1))
867-
.untilAsserted(() -> {
868-
Assertions
869-
.assertThat(repository.queryWorkflowInstance(workflow))
870-
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
871-
.isEqualTo(WorkflowExecutionStatus.FAILURE));
872-
Assertions
873-
.assertThat(repository.queryTaskInstance(workflow))
874-
.satisfiesExactly(taskInstance -> {
875-
assertThat(taskInstance.getName()).isEqualTo("A");
876-
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
877-
});
878-
});
879-
masterContainer.assertAllResourceReleased();
880-
}
881-
882852
@Test
883853
@DisplayName("Test start a workflow with one fake task(A) failed")
884854
public void testStartWorkflow_with_oneFailedTaskWithRetry() {
@@ -1433,46 +1403,6 @@ void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
14331403
masterContainer.assertAllResourceReleased();
14341404
}
14351405

1436-
@Test
1437-
@DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run fatal")
1438-
void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFatal() {
1439-
final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml";
1440-
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1441-
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1442-
1443-
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1444-
.workflowDefinition(parentWorkflow)
1445-
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1446-
.build();
1447-
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1448-
1449-
await()
1450-
.atMost(Duration.ofMinutes(1))
1451-
.untilAsserted(() -> {
1452-
Assertions
1453-
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1454-
.matches(
1455-
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
1456-
1457-
Assertions
1458-
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1459-
.hasSize(3)
1460-
.anySatisfy(taskInstance -> {
1461-
assertThat(taskInstance.getName()).isEqualTo("A");
1462-
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
1463-
})
1464-
.anySatisfy(taskInstance -> {
1465-
assertThat(taskInstance.getName()).isEqualTo("B");
1466-
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1467-
})
1468-
.anySatisfy(taskInstance -> {
1469-
assertThat(taskInstance.getName()).isEqualTo("D");
1470-
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1471-
});
1472-
});
1473-
masterContainer.assertAllResourceReleased();
1474-
}
1475-
14761406
@Test
14771407
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run failed")
14781408
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFailed() {
@@ -1505,37 +1435,4 @@ void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runF
15051435
});
15061436
masterContainer.assertAllResourceReleased();
15071437
}
1508-
1509-
@Test
1510-
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run fatal")
1511-
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFatal() {
1512-
final String yaml =
1513-
"/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml";
1514-
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1515-
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1516-
1517-
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1518-
.workflowDefinition(parentWorkflow)
1519-
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1520-
.build();
1521-
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1522-
1523-
await()
1524-
.atMost(Duration.ofMinutes(1))
1525-
.untilAsserted(() -> {
1526-
Assertions
1527-
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1528-
.matches(
1529-
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);
1530-
1531-
Assertions
1532-
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1533-
.hasSize(1)
1534-
.satisfiesExactly(taskInstance -> {
1535-
assertThat(taskInstance.getName()).isEqualTo("A");
1536-
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
1537-
});
1538-
});
1539-
masterContainer.assertAllResourceReleased();
1540-
}
15411438
}

0 commit comments

Comments
 (0)