Skip to content

Commit fbd7edf

Browse files
author
苏义超
committed
add dispatch-timeout-checker switch
1 parent bb089f3 commit fbd7edf

File tree

10 files changed

+34
-41
lines changed

10 files changed

+34
-41
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ public void validate(Object target, Errors errors) {
108108
// Validate dispatch timeout checker config
109109
MasterDispatchTimeoutCheckerConfig timeoutChecker = masterConfig.getDispatchTimeoutChecker();
110110
if (timeoutChecker != null && timeoutChecker.isEnabled()) {
111-
if (timeoutChecker.getTimeoutDuration() == null) {
112-
errors.rejectValue("master-dispatch-timeout-checker.timeout-duration", null,
111+
if (timeoutChecker.getMaxTaskDispatchDuration() == null) {
112+
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
113113
"must be specified when dispatch timeout checker is enabled");
114-
} else if (timeoutChecker.getTimeoutDuration().toMillis() <= 0) {
115-
errors.rejectValue("master-dispatch-timeout-checker.timeout-duration", null,
114+
} else if (timeoutChecker.getMaxTaskDispatchDuration().toMillis() <= 0) {
115+
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
116116
"must be a positive duration (e.g., '2m', '5m', '30m')");
117117
}
118118
}
@@ -142,6 +142,7 @@ private void printConfig() {
142142
"\n command-fetch-strategy: " + commandFetchStrategy +
143143
"\n worker-load-balancer-configuration-properties: "
144144
+ workerLoadBalancerConfigurationProperties +
145+
"\n dispatchTimeoutChecker: " + dispatchTimeoutChecker +
145146
"\n****************************Master Configuration**************************************";
146147
log.info(config);
147148
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterDispatchTimeoutCheckerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
/**
2525
* Configuration for the master's task dispatch timeout checker.
26-
* If enabled, tasks that remain in the dispatch queue longer than {@link #timeoutDuration} will be marked as failed to prevent indefinite queuing.
26+
* If enabled, tasks that remain in the dispatch queue longer than {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing.
2727
*/
2828
@Data
2929
public class MasterDispatchTimeoutCheckerConfig {
@@ -38,5 +38,5 @@ public class MasterDispatchTimeoutCheckerConfig {
3838
* Tasks exceeding this duration in the dispatch queue will be failed.
3939
* Examples: "2m", "5m", "30m". Defaults to 5 minutes.
4040
*/
41-
private Duration timeoutDuration = Duration.ofMinutes(5);
41+
private Duration maxTaskDispatchDuration = Duration.ofMinutes(5);
4242
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
2929
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3030
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
31+
import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
3132
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
3233
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
33-
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerNotFoundException;
3434
import org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
3535
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
3636
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
@@ -67,20 +67,13 @@ public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) throws
6767
final String taskName = taskExecutionContext.getTaskName();
6868
final String workerGroup = taskExecutionContext.getWorkerGroup();
6969
if (!clusterManager.getWorkerClusters().containsWorkerGroup(workerGroup)) {
70-
throw new WorkerGroupNotFoundException(
71-
String.format("Cannot find worker group to dispatch Task[id=%s, name=%s, workerGroup=%s]",
72-
taskExecutionContext.getTaskInstanceId(), taskName,
73-
workerGroup));
70+
throw new WorkerGroupNotFoundException(workerGroup);
7471
}
7572
final String physicalTaskExecutorAddress = workerLoadBalancer
7673
.select(workerGroup)
7774
.map(Host::of)
7875
.map(Host::getAddress)
79-
.orElseThrow(() -> new WorkerNotFoundException(
80-
String.format(
81-
"Cannot find available worker host to dispatch Task[id=%s, name=%s, workerGroup=%s]",
82-
taskExecutionContext.getTaskInstanceId(), taskName,
83-
workerGroup)));
76+
.orElseThrow(() -> new NoAvailableWorkerException(workerGroup));
8477

8578
taskExecutionContext.setHost(physicalTaskExecutorAddress);
8679
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,21 +95,21 @@ public void run() {
9595
}
9696

9797
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
98-
final int taskId = taskExecutionRunnable.getId();
98+
final int taskInstanceId = taskExecutionRunnable.getId();
9999
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
100100
try {
101-
if (!waitingDispatchTaskIds.remove(taskId)) {
101+
if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
102102
log.info(
103103
"The task: {} doesn't exist in waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
104-
taskId);
104+
taskInstanceId);
105105
return;
106106
}
107107
taskExecutorClient.dispatch(taskExecutionRunnable);
108108
} catch (TaskDispatchException ex) {
109109
if (dispatchTimeoutChecker.isEnabled()) {
110110
// Checks whether the given task has exceeded its allowed dispatch timeout.
111-
long timeoutMs = this.dispatchTimeoutChecker.getTimeoutDuration().toMillis();
112-
long elapsed = System.currentTimeMillis() - taskExecutionContext.getFirstDispatchEnqueueTimeMs();
111+
long timeoutMs = this.dispatchTimeoutChecker.getMaxTaskDispatchDuration().toMillis();
112+
long elapsed = System.currentTimeMillis() - taskExecutionContext.getFirstDispatchTime();
113113
if (elapsed > timeoutMs) {
114114
handleDispatchFailure(taskExecutionRunnable, ex, elapsed, timeoutMs);
115115
return;
@@ -122,7 +122,7 @@ private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
122122
long waitingTimeMillis = Math.min(
123123
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
124124
dispatchTask(taskExecutionRunnable, waitingTimeMillis);
125-
log.warn("Dispatch Task: {} failed will retry after: {}/ms", taskId,
125+
log.warn("Dispatch Task: {} failed will retry after: {}/ms", taskInstanceId,
126126
waitingTimeMillis, ex);
127127
}
128128
}
@@ -138,31 +138,26 @@ private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
138138
*/
139139
private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, TaskDispatchException exception,
140140
long elapsed, long timeoutMs) {
141-
int taskId = taskExecutionRunnable.getId();
142-
int workflowId = taskExecutionRunnable.getWorkflowInstance().getId();
141+
final String taskName = taskExecutionRunnable.getName();
143142

144-
log.warn("[DISPATCH_FAILED] taskId: {}, workflowId: {}, timed out after {} ms (limit: {} ms))", taskId,
145-
workflowId, elapsed, timeoutMs);
143+
log.warn("[DISPATCH_FAILED] taskName: {}, timed out after {} ms (limit: {} ms))", taskName, elapsed, timeoutMs);
146144

147145
if (ExceptionUtils.isWorkerGroupNotFoundException(exception)) {
148-
log.error("[DISPATCH_FAILED] Worker group not found. taskId: {}, workflowId: {}", taskId, workflowId,
149-
exception);
146+
log.error("[DISPATCH_FAILED] taskName: {}, Worker group not found.", taskName, exception);
150147
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
151148
.taskExecutionRunnable(taskExecutionRunnable)
152149
.endTime(new Date())
153150
.build();
154151
taskExecutionRunnable.getWorkflowEventBus().publish(taskFatalEvent);
155152
} else if (ExceptionUtils.isWorkerNotFoundException(exception)) {
156-
log.error("[DISPATCH_FAILED] No available workers. taskId: {}, workflowId: {}", taskId, workflowId,
157-
exception);
153+
log.error("[DISPATCH_FAILED] taskName: {}, No available worker.", taskName, exception);
158154
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
159155
.taskExecutionRunnable(taskExecutionRunnable)
160156
.endTime(new Date())
161157
.build();
162158
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
163159
} else {
164-
log.error("[DISPATCH_FAILED] Unexpected dispatch error. taskId: {}, workflowId: {}", taskId, workflowId,
165-
exception);
160+
log.error("[DISPATCH_FAILED] taskName: {}, Unexpected dispatch error.", taskName, exception);
166161
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
167162
.taskExecutionRunnable(taskExecutionRunnable)
168163
.endTime(new Date())

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/WorkerNotFoundException.java renamed to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.dolphinscheduler.server.master.exception.dispatch;
1919

20-
public class WorkerNotFoundException extends TaskDispatchException {
20+
public class NoAvailableWorkerException extends TaskDispatchException {
2121

22-
public WorkerNotFoundException(String message) {
23-
super(message);
22+
public NoAvailableWorkerException(String workerGroup) {
23+
super("Cannot find available worker under worker group: " + workerGroup);
2424
}
2525
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/WorkerGroupNotFoundException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
public class WorkerGroupNotFoundException extends TaskDispatchException {
2121

22-
public WorkerGroupNotFoundException(String message) {
23-
super(message);
22+
public WorkerGroupNotFoundException(String workerGroup) {
23+
super("Cannot find worker group: " + workerGroup);
2424
}
2525
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.dolphinscheduler.server.master.utils;
1919

2020
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
21-
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerNotFoundException;
2221

2322
import org.springframework.dao.DataAccessResourceFailureException;
2423

dolphinscheduler-master/src/main/resources/application.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ master:
115115
# When enabled, tasks not dispatched within this duration are marked as failed.
116116
dispatch-timeout-checker:
117117
enabled: false
118-
timeout-duration: 5m
118+
max-task-dispatch-duration: 5m
119119
command-fetch-strategy:
120120
type: ID_SLOT_BASED
121121
config:

dolphinscheduler-master/src/test/resources/application.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ master:
7373
cpu-usage-weight: 30
7474
task-thread-pool-usage-weight: 30
7575
worker-group-refresh-interval: 5m
76+
# Task dispatch timeout check (currently disabled).
77+
# When enabled, tasks not dispatched within this duration are marked as failed.
78+
dispatch-timeout-checker:
79+
enabled: false
80+
max-task-dispatch-duration: 5m
7681
command-fetch-strategy:
7782
type: ID_SLOT_BASED
7883
config:

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ public class TaskExecutionContext implements Serializable {
127127
private boolean failover;
128128

129129
/**
130-
* Timestamp (ms) when the task was first enqueued for dispatch.
130+
* Timestamp (ms) when the task was first dispatched.
131131
*/
132-
private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis();
132+
private final long firstDispatchTime = System.currentTimeMillis();
133133

134134
public int increaseDispatchFailTimes() {
135135
return ++dispatchFailTimes;

0 commit comments

Comments
 (0)