Skip to content

Commit 5b2722d

Browse files
author
苏义超
committed
update TaskDispatchPolicy
1 parent 5e1dd52 commit 5b2722d

File tree

12 files changed

+59
-207
lines changed

12 files changed

+59
-207
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class MasterConfig implements Validator {
7979
* This controls whether the system enforces a time limit for dispatching tasks to workers,
8080
* and if so, how long to wait before marking a task as failed due to dispatch timeout.
8181
*/
82-
private MasterDispatchTimeoutCheckerConfig dispatchTimeoutChecker = new MasterDispatchTimeoutCheckerConfig();
82+
private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
8383

8484
@Override
8585
public boolean supports(Class<?> clazz) {
@@ -105,13 +105,13 @@ public void validate(Object target, Errors errors) {
105105
errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s");
106106
}
107107

108-
// Validate dispatch timeout checker config
109-
MasterDispatchTimeoutCheckerConfig timeoutChecker = masterConfig.getDispatchTimeoutChecker();
110-
if (timeoutChecker != null && timeoutChecker.isEnabled()) {
111-
if (timeoutChecker.getMaxTaskDispatchDuration() == null) {
108+
// Validate task dispatch policy config
109+
TaskDispatchPolicy configTaskDispatchPolicy = masterConfig.getTaskDispatchPolicy();
110+
if (configTaskDispatchPolicy != null && configTaskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
111+
if (configTaskDispatchPolicy.getMaxTaskDispatchDuration() == null) {
112112
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
113113
"must be specified when dispatch timeout checker is enabled");
114-
} else if (timeoutChecker.getMaxTaskDispatchDuration().toMillis() <= 0) {
114+
} else if (configTaskDispatchPolicy.getMaxTaskDispatchDuration().toMillis() <= 0) {
115115
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
116116
"must be a positive duration (e.g., '2m', '5m', '30m')");
117117
}
@@ -142,7 +142,7 @@ private void printConfig() {
142142
"\n command-fetch-strategy: " + commandFetchStrategy +
143143
"\n worker-load-balancer-configuration-properties: "
144144
+ workerLoadBalancerConfigurationProperties +
145-
"\n dispatchTimeoutChecker: " + dispatchTimeoutChecker +
145+
"\n taskDispatchPolicy: " + taskDispatchPolicy +
146146
"\n****************************Master Configuration**************************************";
147147
log.info(config);
148148
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterDispatchTimeoutCheckerConfig.java renamed to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,27 @@
2222
import lombok.Data;
2323

2424
/**
25-
* Configuration for the master's task dispatch timeout checker.
26-
* If enabled, tasks that remain in the dispatch queue longer than {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing.
25+
* Configuration for the master's task dispatch policy.
26+
* <p>
27+
* When enabled, tasks that remain in the dispatch queue longer than
28+
* {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing.
2729
*/
2830
@Data
29-
public class MasterDispatchTimeoutCheckerConfig {
31+
public class TaskDispatchPolicy {
3032

3133
/**
32-
* Whether to enable the dispatch timeout checking mechanism.
34+
* Indicates whether the dispatch timeout checking mechanism is enabled.
35+
* <p>
36+
* If {@code true}, tasks exceeding the configured dispatch duration will be failed automatically.
3337
*/
34-
private boolean enabled = false;
38+
private boolean dispatchTimeoutFailedEnabled = false;
3539

3640
/**
37-
* Maximum allowed time for a task to be dispatched to a worker.
38-
* Tasks exceeding this duration in the dispatch queue will be failed.
39-
* Examples: "2m", "5m", "30m". Defaults to 5 minutes.
41+
* The maximum allowed duration a task may wait in the dispatch queue before being assigned to a worker.
42+
* <p>
43+
* Tasks that exceed this duration will be marked as failed.
44+
* <p>
45+
* Examples: {@code "2m"}, {@code "5m"}, {@code "30m"}.
4046
*/
41-
private Duration maxTaskDispatchDuration = Duration.ofMinutes(5);
47+
private Duration maxTaskDispatchDuration;
4248
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
2121
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2222
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
23-
import org.apache.dolphinscheduler.server.master.config.MasterDispatchTimeoutCheckerConfig;
23+
import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
2424
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2525
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
2626
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
27-
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2827
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
29-
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
3028
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
3129
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
3230

@@ -55,15 +53,15 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
5553

5654
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
5755

58-
private final MasterDispatchTimeoutCheckerConfig dispatchTimeoutChecker;
56+
private final TaskDispatchPolicy taskDispatchPolicy;
5957

6058
public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient,
61-
MasterDispatchTimeoutCheckerConfig dispatchTimeoutChecker) {
59+
TaskDispatchPolicy taskDispatchPolicy) {
6260
super("WorkerGroupTaskDispatcher-" + workerGroupName);
6361
this.taskExecutorClient = taskExecutorClient;
6462
this.workerGroupEventBus = new TaskDispatchableEventBus<>();
6563
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
66-
this.dispatchTimeoutChecker = dispatchTimeoutChecker;
64+
this.taskDispatchPolicy = taskDispatchPolicy;
6765
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
6866
}
6967

@@ -105,10 +103,10 @@ private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
105103
return;
106104
}
107105
taskExecutorClient.dispatch(taskExecutionRunnable);
108-
} catch (TaskDispatchException ex) {
109-
if (dispatchTimeoutChecker.isEnabled()) {
106+
} catch (Exception ex) {
107+
if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
110108
// Checks whether the given task has exceeded its allowed dispatch timeout.
111-
long timeoutMs = this.dispatchTimeoutChecker.getMaxTaskDispatchDuration().toMillis();
109+
long timeoutMs = this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
112110
long elapsed = System.currentTimeMillis() - taskExecutionContext.getFirstDispatchTime();
113111
if (elapsed > timeoutMs) {
114112
handleDispatchFailure(taskExecutionRunnable, ex, elapsed, timeoutMs);
@@ -132,32 +130,32 @@ private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
132130
* Once this method is called, the task is considered permanently failed and will not be retried.
133131
*
134132
* @param taskExecutionRunnable the task to mark as fatally failed; must not be null
135-
* @param exception the dispatch exception that triggered this failure handling; must not be null
133+
* @param ex the dispatch exception that triggered this failure handling; must not be null
136134
* @param elapsed the time (in milliseconds) already spent attempting to dispatch the task
137135
* @param timeoutMs the configured dispatch timeout threshold (in milliseconds)
138136
*/
139-
private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, TaskDispatchException exception,
137+
private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex,
140138
long elapsed, long timeoutMs) {
141139
final String taskName = taskExecutionRunnable.getName();
142140

143141
log.warn("[DISPATCH_FAILED] taskName: {}, timed out after {} ms (limit: {} ms))", taskName, elapsed, timeoutMs);
144142

145-
if (ExceptionUtils.isWorkerGroupNotFoundException(exception)) {
146-
log.error("[DISPATCH_FAILED] taskName: {}, Worker group not found.", taskName, exception);
147-
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
143+
if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) {
144+
log.error("[DISPATCH_FAILED] taskName: {}, Worker group not found.", taskName, ex);
145+
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
148146
.taskExecutionRunnable(taskExecutionRunnable)
149147
.endTime(new Date())
150148
.build();
151-
taskExecutionRunnable.getWorkflowEventBus().publish(taskFatalEvent);
152-
} else if (ExceptionUtils.isNoAvailableWorkerException(exception)) {
153-
log.error("[DISPATCH_FAILED] taskName: {}, No available worker.", taskName, exception);
149+
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
150+
} else if (ExceptionUtils.isNoAvailableWorkerException(ex)) {
151+
log.error("[DISPATCH_FAILED] taskName: {}, No available worker.", taskName, ex);
154152
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
155153
.taskExecutionRunnable(taskExecutionRunnable)
156154
.endTime(new Date())
157155
.build();
158156
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
159157
} else {
160-
log.error("[DISPATCH_FAILED] taskName: {}, Unexpected dispatch error.", taskName, exception);
158+
log.error("[DISPATCH_FAILED] taskName: {}, Unexpected dispatch error.", taskName, ex);
161159
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
162160
.taskExecutionRunnable(taskExecutionRunnable)
163161
.endTime(new Date())

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void close() throws Exception {
104104
private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String workerGroup) {
105105
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
106106
WorkerGroupDispatcher workerGroupDispatcher =
107-
new WorkerGroupDispatcher(wg, taskExecutorClient, masterConfig.getDispatchTimeoutChecker());
107+
new WorkerGroupDispatcher(wg, taskExecutorClient, masterConfig.getTaskDispatchPolicy());
108108
workerGroupDispatcher.start();
109109
return workerGroupDispatcher;
110110
});

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/TaskLifecycleEventType.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
2929
* Dispatch the task instance to target.
3030
*/
3131
DISPATCH,
32-
/**
33-
* Task instance encounters catastrophic failure(such as initialization failure), it will enter a failed state.
34-
*/
35-
FATAL,
3632
/**
3733
* The task instance is dispatched to the target executor server.
3834
*/

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskFatalLifecycleEvent.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskFatalLifecycleEventHandler.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
3838
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
39-
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
4039
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
4140
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
4241
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
@@ -100,38 +99,6 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
10099
}
101100
}
102101

103-
@Override
104-
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
105-
final ITaskExecutionRunnable taskExecutionRunnable,
106-
final TaskFatalLifecycleEvent taskFatalEvent) {
107-
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
108-
persistentTaskInstanceFatalEventToDB(taskExecutionRunnable, taskFatalEvent);
109-
110-
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
111-
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
112-
return;
113-
}
114-
115-
// If all successors are condition tasks, then the task will not be marked as failure.
116-
// And the DAG will continue to execute.
117-
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
118-
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
119-
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
120-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
121-
return;
122-
}
123-
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
124-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
125-
}
126-
127-
private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
128-
final TaskFatalLifecycleEvent taskFatalEvent) {
129-
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
130-
taskInstance.setState(TaskExecutionStatus.FAILURE);
131-
taskInstance.setEndTime(taskFatalEvent.getEndTime());
132-
taskInstanceDao.updateById(taskInstance);
133-
}
134-
135102
@Override
136103
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
137104
final ITaskExecutionRunnable taskExecutionRunnable,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/ITaskStateAction.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2323
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
2424
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
25-
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
2625
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
2726
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
2827
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
@@ -92,14 +91,6 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
9291
final ITaskExecutionRunnable taskExecutionRunnable,
9392
final TaskDispatchLifecycleEvent taskDispatchEvent);
9493

95-
/**
96-
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
97-
* <p> This method is called when the task encounters catastrophic failure (e.g., initialization failure).
98-
*/
99-
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
100-
final ITaskExecutionRunnable taskExecutionRunnable,
101-
final TaskFatalLifecycleEvent taskFatalEvent);
102-
10394
/**
10495
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
10596
* <p> This method is called when the task has been dispatched to executor.

0 commit comments

Comments
 (0)