-
Notifications
You must be signed in to change notification settings - Fork 5k
[Improvement-17795][Master] Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available #17796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
00b8dc6
bb089f3
fbd7edf
ab84035
f193af6
ab750bb
584ad2d
877db06
cb44db7
1aa12df
0c29a82
6ad77d1
b760003
39b40c1
d7bf4fa
0c8c93e
5e1dd52
5b2722d
33c16e8
3c25528
99a5d7c
9966047
42a5aa2
879ba5e
77a5dfa
9a6d664
cca935f
e49549e
b5a703b
9f97478
ddb7eb7
93e398c
6864a2e
f7c7fe1
7437249
99d2640
a6302d3
7a01fab
8c4700a
d6f8ccd
5a47e02
743dcbc
62cfb30
edf818c
f772a55
5f19c7b
e12af5d
65e7359
6f103b3
099d3df
0c295a7
aed9460
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -74,6 +74,12 @@ public class MasterConfig implements Validator { | |||||
| */ | ||||||
| private String masterRegistryPath; | ||||||
|
|
||||||
| /** | ||||||
| * Maximum time allowed for a task to be successfully dispatched. | ||||||
| * Default: 5 minutes. | ||||||
| */ | ||||||
| private Duration dispatchTimeout = Duration.ofMinutes(5); | ||||||
|
||||||
| private Duration dispatchTimeout = Duration.ofMinutes(5); | |
| private Duration maxTaskDispatchDuration; |
dispatchTimeout might confuse with single RPC timeout when dispatch task, and the default value should be null or a large duration, should compatibility with the previous behavior.
And you need to add this in document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxTaskDispatchDuration
ok
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -18,12 +18,19 @@ | |||||
| package org.apache.dolphinscheduler.server.master.engine.task.dispatcher; | ||||||
|
|
||||||
| import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; | ||||||
| import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; | ||||||
| import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; | ||||||
| import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; | ||||||
| import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent; | ||||||
| import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent; | ||||||
| import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent; | ||||||
| import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; | ||||||
| import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException; | ||||||
| import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; | ||||||
| import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils; | ||||||
|
|
||||||
| import java.time.Duration; | ||||||
| import java.util.Date; | ||||||
| import java.util.Set; | ||||||
| import java.util.concurrent.ConcurrentHashMap; | ||||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||||
|
|
@@ -48,11 +55,15 @@ public class WorkerGroupDispatcher extends BaseDaemonThread { | |||||
|
|
||||||
| private final AtomicBoolean runningFlag = new AtomicBoolean(false); | ||||||
|
|
||||||
| public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) { | ||||||
| private final Duration dispatchTimeout; | ||||||
|
|
||||||
| public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient, | ||||||
| Duration dispatchTimeout) { | ||||||
| super("WorkerGroupTaskDispatcher-" + workerGroupName); | ||||||
| this.taskExecutorClient = taskExecutorClient; | ||||||
| this.workerGroupEventBus = new TaskDispatchableEventBus<>(); | ||||||
| this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet(); | ||||||
| this.dispatchTimeout = dispatchTimeout; | ||||||
| log.info("Initialize WorkerGroupDispatcher: {}", this.getName()); | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -84,23 +95,77 @@ public void run() { | |||||
| } | ||||||
|
|
||||||
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||||||
| final int taskId = taskExecutionRunnable.getId(); | ||||||
|
||||||
| final int taskId = taskExecutionRunnable.getId(); | |
| final int taskExecutionRunnableId = taskExecutionRunnable.getId(); |
Avoid confuse with task definition id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskExecutionRunnableId
final int taskInstanceId = taskExecutionRunnable.getId(); is it better?
/**
* Get the task instance id.
*
Need to know the id might change since the task instance might be regenerated.
*/
default int getId() {
return getTaskInstance().getId();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use taskExecutionRunnableId , since the outer layer is unaware that this is the taskInstanceId.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to store the mills rather than store duration, then you can avoid execute dispatchTimeout.toMillis() here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to store the mills rather than store duration, then you can avoid execute
dispatchTimeout.toMillis()here.
Using a duration format like '5m' is more concise and readable. Most other configurations I’ve seen also avoid millisecond-level granularity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can translate the duration into milliseconds in the constructor.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } catch (TaskDispatchException ex) { | |
| } catch (Exception ex) { |
Currently, at least in this PR, you can't guarantee that other exceptions won't be thrown here, right?
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't print the taskId and workflowId here, all ids should already be added by MDC. We should only need to print the exception here, the exception already contains failure message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't print the
taskIdandworkflowIdhere, all ids should already be added by MDC. We should only need to print the exception here, theexceptionalready contains failure message.
ok
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
|
|
||
| public class WorkerGroupNotFoundException extends TaskDispatchException { | ||
|
|
||
| public WorkerGroupNotFoundException(String workerGroup) { | ||
| super("Cannot find worker group: " + workerGroup); | ||
| public WorkerGroupNotFoundException(String message) { | ||
|
||
| super(message); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,25 @@ | ||||||||||
| /* | ||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||
| * | ||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||
| * | ||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||
| * See the License for the specific language governing permissions and | ||||||||||
| * limitations under the License. | ||||||||||
| */ | ||||||||||
|
|
||||||||||
| package org.apache.dolphinscheduler.server.master.exception.dispatch; | ||||||||||
|
|
||||||||||
| public class WorkerNotFoundException extends TaskDispatchException { | ||||||||||
|
|
||||||||||
| public WorkerNotFoundException(String message) { | ||||||||||
|
||||||||||
| public WorkerNotFoundException(String message) { | |
| public NoAvailableWorkerException(String workerGroup) { | |
| super("Cannot find available worker under worker group: " + workerGroup); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
|
|
||
| import org.apache.dolphinscheduler.dao.entity.TaskInstance; | ||
| import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; | ||
| import org.apache.dolphinscheduler.server.master.config.MasterConfig; | ||
| import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; | ||
| import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; | ||
| import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException; | ||
|
|
@@ -46,7 +47,8 @@ class WorkerGroupDispatcherTest { | |
| @BeforeEach | ||
| void setUp() { | ||
| taskExecutorClient = mock(ITaskExecutorClient.class); | ||
| dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient); | ||
| final MasterConfig masterConfig = new MasterConfig(); | ||
| dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, masterConfig.getDispatchTimeout()); | ||
|
||
| } | ||
|
|
||
| @Test | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -126,6 +126,11 @@ public class TaskExecutionContext implements Serializable { | |||||||||||
|
|
||||||||||||
| private boolean failover; | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Timestamp (ms) when the task was first enqueued for dispatch. | ||||||||||||
| */ | ||||||||||||
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); | ||||||||||||
|
||||||||||||
| /** | |
| * Timestamp (ms) when the task was first enqueued for dispatch. | |
| */ | |
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); | |
| private final long firstDispatchTime = System.currentTimeMillis(); |
This is more like the creation time of the taskExecutionContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like the creation time of the
taskExecutionContext.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it configurable by users in the configuration file and it can be turned off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea!