[Improvement-17795][Master] Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available#17796
Conversation
| * Maximum time allowed for a task to be successfully dispatched. | ||
| * Default: 5 minutes. | ||
| */ | ||
| private Duration dispatchTimeout = Duration.ofMinutes(5); |
There was a problem hiding this comment.
We should make it configurable by users in the configuration file and it can be turned off.
There was a problem hiding this comment.
We should make it configurable by users in the configuration file and it can be turned off.
good idea!
| * Maximum time allowed for a task to be successfully dispatched. | ||
| * Default: 5 minutes. | ||
| */ | ||
| private Duration dispatchTimeout = Duration.ofMinutes(5); |
There was a problem hiding this comment.
| private Duration dispatchTimeout = Duration.ofMinutes(5); | |
| private Duration maxTaskDispatchDuration; |
dispatchTimeout might confuse with single RPC timeout when dispatch task, and the default value should be null or a large duration, should compatibility with the previous behavior.
And you need to add this in document.
There was a problem hiding this comment.
maxTaskDispatchDuration
ok
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||
| final int taskId = taskExecutionRunnable.getId(); | ||
| final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext(); | ||
| final long timeoutMs = this.dispatchTimeout.toMillis(); |
There was a problem hiding this comment.
It's better to store the mills rather than store duration, then you can avoid execute dispatchTimeout.toMillis() here.
There was a problem hiding this comment.
It's better to store the mills rather than store duration, then you can avoid execute
dispatchTimeout.toMillis()here.
Using a duration format like '5m' is more concise and readable. Most other configurations I’ve seen also avoid millisecond-level granularity.
There was a problem hiding this comment.
You can translate the duration into milliseconds in the constructor.
| } | ||
|
|
||
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||
| final int taskId = taskExecutionRunnable.getId(); |
There was a problem hiding this comment.
| final int taskId = taskExecutionRunnable.getId(); | |
| final int taskExecutionRunnableId = taskExecutionRunnable.getId(); |
Avoid confuse with task definition id.
There was a problem hiding this comment.
taskExecutionRunnableId
final int taskInstanceId = taskExecutionRunnable.getId(); is it better?
/**
* Get the task instance id.
*
Need to know the id might change since the task instance might be regenerated.
*/
default int getId() {
return getTaskInstance().getId();
}
There was a problem hiding this comment.
It's better to use taskExecutionRunnableId , since the outer layer is unaware that this is the taskInstanceId.
| * @param elapsed the time (in milliseconds) already spent attempting to dispatch the task | ||
| * @param timeoutMs the configured dispatch timeout threshold (in milliseconds) | ||
| */ | ||
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, TaskDispatchException exception, |
There was a problem hiding this comment.
Please don't print the taskId and workflowId here, all ids should already be added by MDC. We should only need to print the exception here, the exception already contains failure message.
There was a problem hiding this comment.
Please don't print the
taskIdandworkflowIdhere, all ids should already be added by MDC. We should only need to print the exception here, theexceptionalready contains failure message.
ok
|
|
||
| public WorkerGroupNotFoundException(String workerGroup) { | ||
| super("Cannot find worker group: " + workerGroup); | ||
| public WorkerGroupNotFoundException(String message) { |
There was a problem hiding this comment.
Why do this change?
restore, remove the task info
|
|
||
| public class WorkerNotFoundException extends TaskDispatchException { | ||
|
|
||
| public WorkerNotFoundException(String message) { |
There was a problem hiding this comment.
| public WorkerNotFoundException(String message) { | |
| public NoAvailableWorkerException(String workerGroup) { | |
| super("Cannot find available worker under worker group: " + workerGroup); | |
| } |
| final MasterConfig masterConfig = new MasterConfig(); | ||
| dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, masterConfig.getDispatchTimeout()); |
There was a problem hiding this comment.
You should add a validated test case.
There was a problem hiding this comment.
You should add a validated test case.
ok, add test for dispatch timeout checker
There was a problem hiding this comment.
You should add a validated test case.
I've already validated and tested it in a real environment.
The LogicFakeTask type runs on the Master and does not involve WorkerGroup dispatching.
I track the fix in a separate issue: #17872
| /** | ||
| * Timestamp (ms) when the task was first enqueued for dispatch. | ||
| */ | ||
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); |
There was a problem hiding this comment.
| /** | |
| * Timestamp (ms) when the task was first enqueued for dispatch. | |
| */ | |
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); | |
| private final long firstDispatchTime = System.currentTimeMillis(); |
This is more like the creation time of the taskExecutionContext.
There was a problem hiding this comment.
This is more like the creation time of the
taskExecutionContext.
ok
877db06 to
d7bf4fa
Compare
|
@ruanwenjun @SbloodyS Whenever you have time, I’d be grateful for your review. Thanks so much! |
| TaskDispatchPolicy dispatchPolicy = masterConfig.getTaskDispatchPolicy(); | ||
| if (dispatchPolicy != null && dispatchPolicy.isDispatchTimeoutFailedEnabled()) { | ||
| if (dispatchPolicy.getMaxTaskDispatchDuration() == null) { | ||
| errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null, | ||
| "must be specified when dispatch timeout checker is enabled"); | ||
| } else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis() <= 0) { | ||
| errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null, | ||
| "must be a positive duration (e.g., '2m', '5m', '30m')"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Can dispatchPolicy be null?
It indeed cannot be null.
private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
| } catch (Exception ex) { | ||
| if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) { | ||
| // If a dispatch timeout occurs, the task will not be put back into the queue. | ||
| long timeoutMs = this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis(); |
There was a problem hiding this comment.
| * Marks the specified task as fatally failed due to an unrecoverable dispatch error,such as timeout | ||
| * Once this method is called, the task is considered permanently failed and will not be retried. | ||
| */ | ||
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, |
There was a problem hiding this comment.
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, | |
| private void onDispatchTimeout(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, |
There was a problem hiding this comment.
onDispatchTimeout
better
| if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) { | ||
| log.error("Dispatch fail, taskName: {}, Worker group not found.", taskName, ex); | ||
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | ||
| .taskExecutionRunnable(taskExecutionRunnable) | ||
| .endTime(new Date()) | ||
| .build(); | ||
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | ||
| } else if (ExceptionUtils.isNoAvailableWorkerException(ex)) { | ||
| log.error("Dispatch fail, taskName: {}, No available worker.", taskName, ex); | ||
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | ||
| .taskExecutionRunnable(taskExecutionRunnable) | ||
| .endTime(new Date()) | ||
| .build(); | ||
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | ||
| } else { | ||
| log.error("Dispatch fail, taskName: {}, Unexpected dispatch error.", taskName, ex); | ||
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | ||
| .taskExecutionRunnable(taskExecutionRunnable) | ||
| .endTime(new Date()) | ||
| .build(); | ||
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); |
There was a problem hiding this comment.
| if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) { | |
| log.error("Dispatch fail, taskName: {}, Worker group not found.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| } else if (ExceptionUtils.isNoAvailableWorkerException(ex)) { | |
| log.error("Dispatch fail, taskName: {}, No available worker.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| } else { | |
| log.error("Dispatch fail, taskName: {}, Unexpected dispatch error.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| log.error("Task: {} dispatch timeout.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); |
There was a problem hiding this comment.
log.error("Task: {} dispatch timeout.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
ok
| # When enabled, tasks not dispatched within this duration are marked as failed. | ||
| task-dispatch-policy: | ||
| dispatch-timeout-failed-enabled: false | ||
| max-task-dispatch-duration: 5m |
There was a problem hiding this comment.
| max-task-dispatch-duration: 5m | |
| max-task-dispatch-duration: 1h |
There was a problem hiding this comment.
We can remove the max-task-dispatch-duration here, since these is not used when dispatch-timeout-failed-enabled is false, only need to add these on the doc.
| /** | ||
| * Indicates whether the dispatch timeout checking mechanism is enabled. | ||
| */ | ||
| private boolean dispatchTimeoutFailedEnabled = false; |
There was a problem hiding this comment.
| private boolean dispatchTimeoutFailedEnabled = false; | |
| private boolean dispatchTimeoutEnabled = false; |
There was a problem hiding this comment.
dispatchTimeoutEnabled
better
| task-dispatch-policy: | ||
| dispatch-timeout-enabled: false | ||
| max-task-dispatch-duration: 1h |
There was a problem hiding this comment.
Please add to the doc.
ok, add it.
|
@SbloodyS Whenever you have time, I’d be grateful for your review. Thanks so much! |
|
#17932 could provide a more complete solution, effectively preventing abnormal tasks from consuming system resources. |
I personally don't think there's a conflict here. The dispatch timeout shouldn't be very long—such exceptions should raise errors as early as possible (e.g., after 10 minutes). In contrast, your workflow or task execution timeout should be configured with a sufficiently long duration, as some weekly tasks may easily run for more than a day. |
For the vast majority of tasks and scenarios, task dispatch should not take long, whereas task execution can legitimately run for a very long time. Therefore, the timeout configuration for task dispatch and task execution should not be handled with a single unified setting. It’s normal for a task to run for several hours or even days during execution, but if a task remains in the dispatch phase for hours or days, it most likely indicates an issue. Dispatch timeout is intended to avoid wasting Master resources, while workflow or task timeout is primarily aimed at preventing Worker resource waste. Wouldn't it be better to have both? |
|
We can keep the both config. |
|



Purpose of the pull request
close #17795
Brief change log
Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available
Verify this pull request
Successfully tested and verified in a real-world environment.
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md