Skip to content

Commit c2de965

Browse files
authored
Merge branch 'dev' into Fix-17854
2 parents 505efee + b12b682 commit c2de965

File tree

18 files changed

+728
-18
lines changed

18 files changed

+728
-18
lines changed

docs/docs/en/architecture/configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ Location: `master-server/conf/application.yaml`
291291
| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` |
292292
| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db |
293293
| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master |
294+
| master.task-dispatch-policy.dispatch-timeout-enabled | false | Indicates whether the dispatch timeout checking mechanism is enabled |
295+
| master.task-dispatch-policy.max-task-dispatch-duration | 1h | The maximum allowed duration a task may wait in the dispatch queue before being assigned to a worker |
294296

295297
### Worker Server related configuration
296298

docs/docs/zh/architecture/configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
298298
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
299299
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
300300
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
301+
| master.task-dispatch-policy.dispatch-timeout-enabled | false | 是否开启master分派超时检测功能 |
302+
| master.task-dispatch-policy.max-task-dispatch-duration | 1h | master分派检测的超时时长,默认为一小时 |
301303

302304
## Worker Server相关配置
303305

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class MasterConfig implements Validator {
7474
*/
7575
private String masterRegistryPath;
7676

77+
private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
78+
7779
@Override
7880
public boolean supports(Class<?> clazz) {
7981
return MasterConfig.class.isAssignableFrom(clazz);
@@ -97,6 +99,18 @@ public void validate(Object target, Errors errors) {
9799
if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
98100
errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s");
99101
}
102+
103+
TaskDispatchPolicy dispatchPolicy = masterConfig.getTaskDispatchPolicy();
104+
if (dispatchPolicy.isDispatchTimeoutEnabled()) {
105+
if (dispatchPolicy.getMaxTaskDispatchDuration() == null) {
106+
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
107+
"must be specified when dispatch timeout checker is enabled");
108+
} else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis() <= 0) {
109+
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
110+
"must be a positive duration (e.g., '10m', '30m', '1h')");
111+
}
112+
}
113+
100114
if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
101115
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
102116
}
@@ -122,6 +136,7 @@ private void printConfig() {
122136
"\n command-fetch-strategy: " + commandFetchStrategy +
123137
"\n worker-load-balancer-configuration-properties: "
124138
+ workerLoadBalancerConfigurationProperties +
139+
"\n taskDispatchPolicy: " + taskDispatchPolicy +
125140
"\n****************************Master Configuration**************************************";
126141
log.info(config);
127142
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.config;
19+
20+
import java.time.Duration;
21+
22+
import lombok.Data;
23+
24+
/**
25+
* Configuration for the master's task dispatch policy.
26+
* When enabled, tasks that remain in the dispatch queue longer than
27+
* {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing.
28+
*/
29+
@Data
30+
public class TaskDispatchPolicy {
31+
32+
/**
33+
* Indicates whether the dispatch timeout checking mechanism is enabled.
34+
*/
35+
private boolean dispatchTimeoutEnabled = false;
36+
37+
/**
38+
* The maximum allowed duration a task may wait in the dispatch queue before being assigned to a worker.
39+
* Tasks that exceed this duration will be marked as failed.
40+
* Examples: {@code "10m"}, {@code "30m"}, {@code "1h"}.
41+
*/
42+
private Duration maxTaskDispatchDuration;
43+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import org.apache.dolphinscheduler.extract.base.utils.Host;
2525
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
2626
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
27+
import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
2728
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
2829
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2930
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
31+
import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
3032
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
33+
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
3134
import org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
3235
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
3336
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
@@ -55,18 +58,26 @@ public class PhysicalTaskExecutorClientDelegator implements ITaskExecutorClientD
5558
@Autowired
5659
private IWorkerLoadBalancer workerLoadBalancer;
5760

61+
@Autowired
62+
private ClusterManager clusterManager;
63+
5864
@Override
5965
public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) throws TaskDispatchException {
6066
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
6167
final String taskName = taskExecutionContext.getTaskName();
68+
final String workerGroup = taskExecutionContext.getWorkerGroup();
69+
70+
// workerGroup not exist
71+
if (!clusterManager.getWorkerClusters().containsWorkerGroup(workerGroup)) {
72+
throw new WorkerGroupNotFoundException(workerGroup);
73+
}
74+
75+
// select an available worker from the worker group; throws NoAvailableWorkerException if none is available.
6276
final String physicalTaskExecutorAddress = workerLoadBalancer
63-
.select(taskExecutionContext.getWorkerGroup())
77+
.select(workerGroup)
6478
.map(Host::of)
6579
.map(Host::getAddress)
66-
.orElseThrow(() -> new TaskDispatchException(
67-
String.format("Cannot find the host to dispatch Task[id=%s, name=%s, workerGroup=%s]",
68-
taskExecutionContext.getTaskInstanceId(), taskName,
69-
taskExecutionContext.getWorkerGroup())));
80+
.orElseThrow(() -> new NoAvailableWorkerException(workerGroup));
7081

7182
taskExecutionContext.setHost(physicalTaskExecutorAddress);
7283
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
1919

2020
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
21+
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2122
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
23+
import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
2224
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2325
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
26+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
2427
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2528
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
2629

30+
import java.util.Date;
2731
import java.util.Set;
2832
import java.util.concurrent.ConcurrentHashMap;
2933
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,11 +52,22 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
4852

4953
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
5054

51-
public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
55+
private final TaskDispatchPolicy taskDispatchPolicy;
56+
57+
private final long maxTaskDispatchMillis;
58+
59+
public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient,
60+
TaskDispatchPolicy taskDispatchPolicy) {
5261
super("WorkerGroupTaskDispatcher-" + workerGroupName);
5362
this.taskExecutorClient = taskExecutorClient;
5463
this.workerGroupEventBus = new TaskDispatchableEventBus<>();
5564
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
65+
this.taskDispatchPolicy = taskDispatchPolicy;
66+
if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
67+
this.maxTaskDispatchMillis = taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
68+
} else {
69+
this.maxTaskDispatchMillis = 0L;
70+
}
5671
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
5772
}
5873

@@ -84,26 +99,54 @@ public void run() {
8499
}
85100

86101
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
102+
final int taskInstanceId = taskExecutionRunnable.getId();
103+
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
87104
try {
88-
if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) {
105+
if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
89106
log.info(
90107
"The task: {} doesn't exist in waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
91-
taskExecutionRunnable.getId());
108+
taskInstanceId);
92109
return;
93110
}
94111
taskExecutorClient.dispatch(taskExecutionRunnable);
95-
} catch (Exception e) {
112+
} catch (Exception ex) {
113+
if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
114+
// If a dispatch timeout occurs, the task will not be put back into the queue.
115+
long elapsed = System.currentTimeMillis() - taskExecutionContext.getFirstDispatchTime();
116+
if (elapsed > maxTaskDispatchMillis) {
117+
onDispatchTimeout(taskExecutionRunnable, ex, elapsed, maxTaskDispatchMillis);
118+
return;
119+
}
120+
}
121+
96122
// If dispatch failed, will put the task back to the queue
97123
// The task will be dispatched after waiting time.
98124
// the waiting time will increase multiple of times, but will not exceed 60 seconds
99-
long waitingTimeMills = Math.min(
125+
long waitingTimeMillis = Math.min(
100126
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
101-
dispatchTask(taskExecutionRunnable, waitingTimeMills);
102-
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskExecutionRunnable.getId(),
103-
waitingTimeMills, e);
127+
dispatchTask(taskExecutionRunnable, waitingTimeMillis);
128+
log.warn("Dispatch Task: {} failed will retry after: {}/ms", taskInstanceId,
129+
waitingTimeMillis, ex);
104130
}
105131
}
106132

133+
/**
134+
* Marks a task as permanently failed due to dispatch timeout.
135+
* Once called, the task is considered permanently failed and will not be retried.
136+
*/
137+
private void onDispatchTimeout(ITaskExecutionRunnable taskExecutionRunnable, Exception ex,
138+
long elapsed, long timeout) {
139+
String taskName = taskExecutionRunnable.getName();
140+
log.error("Task: {} dispatch timeout after {}ms (limit: {}ms)",
141+
taskName, elapsed, timeout, ex);
142+
143+
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
144+
.taskExecutionRunnable(taskExecutionRunnable)
145+
.endTime(new Date())
146+
.build();
147+
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
148+
}
149+
107150
/**
108151
* Adds a task to the worker group queue.
109152
* This method wraps the given task execution object into a priority and delay-based task entry and adds it to the worker group queue.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
1919

20+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2021
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2122
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2223

@@ -41,8 +42,11 @@ public class WorkerGroupDispatcherCoordinator implements AutoCloseable {
4142

4243
private final ConcurrentHashMap<String, WorkerGroupDispatcher> workerGroupDispatcherMap;
4344

44-
public WorkerGroupDispatcherCoordinator() {
45+
private final MasterConfig masterConfig;
46+
47+
public WorkerGroupDispatcherCoordinator(final MasterConfig masterConfig) {
4548
workerGroupDispatcherMap = new ConcurrentHashMap<>();
49+
this.masterConfig = masterConfig;
4650
}
4751

4852
public void start() {
@@ -99,7 +103,8 @@ public void close() throws Exception {
99103

100104
private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String workerGroup) {
101105
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
102-
WorkerGroupDispatcher workerGroupDispatcher = new WorkerGroupDispatcher(wg, taskExecutorClient);
106+
WorkerGroupDispatcher workerGroupDispatcher =
107+
new WorkerGroupDispatcher(wg, taskExecutorClient, masterConfig.getTaskDispatchPolicy());
103108
workerGroupDispatcher.start();
104109
return workerGroupDispatcher;
105110
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.exception.dispatch;
19+
20+
public class NoAvailableWorkerException extends TaskDispatchException {
21+
22+
public NoAvailableWorkerException(String workerGroup) {
23+
super("Cannot find available worker under worker group: " + workerGroup);
24+
}
25+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.dolphinscheduler.server.master.utils;
1919

2020
import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
21+
import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
22+
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
2123

2224
import org.springframework.dao.DataAccessResourceFailureException;
2325

@@ -30,4 +32,12 @@ public static boolean isDatabaseConnectedFailedException(Throwable e) {
3032
public static boolean isTaskExecutionContextCreateException(Throwable e) {
3133
return e instanceof TaskExecutionContextCreateException;
3234
}
35+
36+
public static boolean isWorkerGroupNotFoundException(Throwable e) {
37+
return e instanceof WorkerGroupNotFoundException;
38+
}
39+
40+
public static boolean isNoAvailableWorkerException(Throwable e) {
41+
return e instanceof NoAvailableWorkerException;
42+
}
3343
}

dolphinscheduler-master/src/main/resources/application.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ master:
111111
# Master max concurrent workflow instances, when the master's workflow instance count exceeds this value, master server will be marked as busy.
112112
max-concurrent-workflow-instances: 2147483647
113113
worker-group-refresh-interval: 5m
114+
# Task dispatch timeout check (currently disabled).
115+
# When enabled, tasks not dispatched within this duration are marked as failed.
116+
task-dispatch-policy:
117+
dispatch-timeout-enabled: false
118+
max-task-dispatch-duration: 1h
114119
command-fetch-strategy:
115120
type: ID_SLOT_BASED
116121
config:

0 commit comments

Comments
 (0)