Skip to content

Commit 584ad2d

Browse files
author
苏义超
committed
add TaskFatalLifecycleEvent and handler
1 parent ab750bb commit 584ad2d

File tree

4 files changed

+50
-0
lines changed

4 files changed

+50
-0
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,13 @@ public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) throws
6666
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
6767
final String taskName = taskExecutionContext.getTaskName();
6868
final String workerGroup = taskExecutionContext.getWorkerGroup();
69+
70+
// workerGroup not exist
6971
if (!clusterManager.getWorkerClusters().containsWorkerGroup(workerGroup)) {
7072
throw new WorkerGroupNotFoundException(workerGroup);
7173
}
74+
75+
// select an available worker from the worker group; throws NoAvailableWorkerException if none is available.
7276
final String physicalTaskExecutorAddress = workerLoadBalancer
7377
.select(workerGroup)
7478
.map(Host::of)

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
2929
* Dispatch the task instance to target.
3030
*/
3131
DISPATCH,
32+
/**
33+
* Task instance encounters catastrophic failure(such as initialization failure), it will enter a failed state.
34+
*/
35+
FATAL,
3236
/**
3337
* The task instance is dispatched to the target executor server.
3438
*/

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
3838
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
39+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
3940
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
4041
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
4142
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
@@ -99,6 +100,38 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
99100
}
100101
}
101102

103+
@Override
104+
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
105+
final ITaskExecutionRunnable taskExecutionRunnable,
106+
final TaskFatalLifecycleEvent taskFatalEvent) {
107+
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
108+
persistentTaskInstanceFatalEventToDB(taskExecutionRunnable, taskFatalEvent);
109+
110+
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
111+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
112+
return;
113+
}
114+
115+
// If all successors are condition tasks, then the task will not be marked as failure.
116+
// And the DAG will continue to execute.
117+
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
118+
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
119+
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
120+
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
121+
return;
122+
}
123+
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
124+
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
125+
}
126+
127+
private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
128+
final TaskFatalLifecycleEvent taskFatalEvent) {
129+
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
130+
taskInstance.setState(TaskExecutionStatus.FAILURE);
131+
taskInstance.setEndTime(taskFatalEvent.getEndTime());
132+
taskInstanceDao.updateById(taskInstance);
133+
}
134+
102135
@Override
103136
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
104137
final ITaskExecutionRunnable taskExecutionRunnable,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2323
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
2424
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
25+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2526
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
2627
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
2728
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
@@ -91,6 +92,14 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
9192
final ITaskExecutionRunnable taskExecutionRunnable,
9293
final TaskDispatchLifecycleEvent taskDispatchEvent);
9394

95+
/**
96+
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
97+
* <p> This method is called when the task encounters catastrophic failure (e.g., initialization failure).
98+
*/
99+
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
100+
final ITaskExecutionRunnable taskExecutionRunnable,
101+
final TaskFatalLifecycleEvent taskFatalEvent);
102+
94103
/**
95104
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
96105
* <p> This method is called when the task has been dispatched to executor.

0 commit comments

Comments
 (0)