-
Notifications
You must be signed in to change notification settings - Fork 5k
[Improvement-17795][Master] Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available #17796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 43 commits
00b8dc6
bb089f3
fbd7edf
ab84035
f193af6
ab750bb
584ad2d
877db06
cb44db7
1aa12df
0c29a82
6ad77d1
b760003
39b40c1
d7bf4fa
0c8c93e
5e1dd52
5b2722d
33c16e8
3c25528
99a5d7c
9966047
42a5aa2
879ba5e
77a5dfa
9a6d664
cca935f
e49549e
b5a703b
9f97478
ddb7eb7
93e398c
6864a2e
f7c7fe1
7437249
99d2640
a6302d3
7a01fab
8c4700a
d6f8ccd
5a47e02
743dcbc
62cfb30
edf818c
f772a55
5f19c7b
e12af5d
65e7359
6f103b3
099d3df
0c295a7
aed9460
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,43 @@ | ||||||
| /* | ||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||
| * this work for additional information regarding copyright ownership. | ||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
| * (the "License"); you may not use this file except in compliance with | ||||||
| * the License. You may obtain a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| * See the License for the specific language governing permissions and | ||||||
| * limitations under the License. | ||||||
| */ | ||||||
|
|
||||||
| package org.apache.dolphinscheduler.server.master.config; | ||||||
|
|
||||||
| import java.time.Duration; | ||||||
|
|
||||||
| import lombok.Data; | ||||||
|
|
||||||
| /** | ||||||
| * Configuration for the master's task dispatch policy. | ||||||
| * When enabled, tasks that remain in the dispatch queue longer than | ||||||
| * {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing. | ||||||
| */ | ||||||
| @Data | ||||||
| public class TaskDispatchPolicy { | ||||||
|
|
||||||
| /** | ||||||
| * Indicates whether the dispatch timeout checking mechanism is enabled. | ||||||
| */ | ||||||
| private boolean dispatchTimeoutFailedEnabled = false; | ||||||
|
||||||
| private boolean dispatchTimeoutFailedEnabled = false; | |
| private boolean dispatchTimeoutEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispatchTimeoutEnabled
better
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,12 +18,17 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
| package org.apache.dolphinscheduler.server.master.engine.task.dispatcher; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Date; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Set; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.ConcurrentHashMap; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -48,11 +53,15 @@ public class WorkerGroupDispatcher extends BaseDaemonThread { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final AtomicBoolean runningFlag = new AtomicBoolean(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final TaskDispatchPolicy taskDispatchPolicy; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TaskDispatchPolicy taskDispatchPolicy) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super("WorkerGroupTaskDispatcher-" + workerGroupName); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.taskExecutorClient = taskExecutorClient; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.workerGroupEventBus = new TaskDispatchableEventBus<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.taskDispatchPolicy = taskDispatchPolicy; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.info("Initialize WorkerGroupDispatcher: {}", this.getName()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -84,23 +93,69 @@ public void run() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final int taskInstanceId = taskExecutionRunnable.getId(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!waitingDispatchTaskIds.remove(taskInstanceId)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "The task: {} doesn't exist in waitingDispatchTaskIds(it might be paused or killed), will skip dispatch", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| taskExecutionRunnable.getId()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| taskInstanceId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| taskExecutorClient.dispatch(taskExecutionRunnable); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (Exception ex) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If a dispatch timeout occurs, the task will not be put back into the queue. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| long timeoutMs = this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| long elapsed = System.currentTimeMillis() - taskExecutionContext.getFirstDispatchTime(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (elapsed > timeoutMs) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| handleDispatchFailure(taskExecutionRunnable, ex, elapsed, timeoutMs); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If dispatch failed, will put the task back to the queue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // The task will be dispatched after waiting time. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // the waiting time will increase multiple of times, but will not exceed 60 seconds | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| long waitingTimeMills = Math.min( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| long waitingTimeMillis = Math.min( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dispatchTask(taskExecutionRunnable, waitingTimeMills); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.error("Dispatch Task: {} failed will retry after: {}/ms", taskExecutionRunnable.getId(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| waitingTimeMills, e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dispatchTask(taskExecutionRunnable, waitingTimeMillis); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.warn("Dispatch Task: {} failed will retry after: {}/ms", taskInstanceId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| waitingTimeMillis, ex); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Marks the specified task as fatally failed due to an unrecoverable dispatch error,such as timeout | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Once this method is called, the task is considered permanently failed and will not be retried. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, | |
| private void onDispatchTimeout(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onDispatchTimeout
better
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) { | |
| log.error("Dispatch fail, taskName: {}, Worker group not found.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| } else if (ExceptionUtils.isNoAvailableWorkerException(ex)) { | |
| log.error("Dispatch fail, taskName: {}, No available worker.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| } else { | |
| log.error("Dispatch fail, taskName: {}, Unexpected dispatch error.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| log.error("Task: {} dispatch timeout.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error("Task: {} dispatch timeout.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
ok
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.dolphinscheduler.server.master.exception.dispatch; | ||
|
|
||
| public class NoAvailableWorkerException extends TaskDispatchException { | ||
|
|
||
| public NoAvailableWorkerException(String workerGroup) { | ||
| super("Cannot find available worker under worker group: " + workerGroup); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -111,6 +111,11 @@ master: | |||||
| # Master max concurrent workflow instances, when the master's workflow instance count exceeds this value, master server will be marked as busy. | ||||||
| max-concurrent-workflow-instances: 2147483647 | ||||||
| worker-group-refresh-interval: 5m | ||||||
| # Task dispatch timeout check (currently disabled). | ||||||
| # When enabled, tasks not dispatched within this duration are marked as failed. | ||||||
| task-dispatch-policy: | ||||||
| dispatch-timeout-failed-enabled: false | ||||||
| max-task-dispatch-duration: 5m | ||||||
|
||||||
| max-task-dispatch-duration: 5m | |
| max-task-dispatch-duration: 1h |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the max-task-dispatch-duration here, since these is not used when dispatch-timeout-failed-enabled is false, only need to add these on the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1h
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can dispatchPolicy be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.