Skip to content

Commit b760003

Browse files
苏义超ruanwenjun
authored andcommitted
add test for dispatch timeout checker
1 parent 6ad77d1 commit b760003

File tree

1 file changed

+296
-0
lines changed

1 file changed

+296
-0
lines changed

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,37 @@
1919

2020
import static org.awaitility.Awaitility.await;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.argThat;
23+
import static org.mockito.Mockito.atLeast;
24+
import static org.mockito.Mockito.doAnswer;
2225
import static org.mockito.Mockito.doThrow;
2326
import static org.mockito.Mockito.inOrder;
2427
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.never;
2529
import static org.mockito.Mockito.times;
2630
import static org.mockito.Mockito.verify;
2731
import static org.mockito.Mockito.when;
2832

2933
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
34+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
3035
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
3136
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
37+
import org.apache.dolphinscheduler.server.master.config.MasterDispatchTimeoutCheckerConfig;
38+
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
3239
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
40+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
41+
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
3342
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
43+
import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
3444
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
45+
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
3546

3647
import java.time.Duration;
48+
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.ThreadLocalRandom;
50+
import java.util.concurrent.TimeUnit;
3751

52+
import org.junit.jupiter.api.Assertions;
3853
import org.junit.jupiter.api.BeforeEach;
3954
import org.junit.jupiter.api.Test;
4055
import org.mockito.InOrder;
@@ -141,4 +156,285 @@ void dispatch_TaskDispatchFails_RetryLogicWorks() throws TaskDispatchException {
141156
.untilAsserted(() -> verify(taskExecutorClient, times(2)).dispatch(taskExecutionRunnable));
142157
}
143158

159+
@Test
160+
void dispatchTask_WorkerGroupNotFound_TimeoutDisabled_ShouldKeepRetrying() throws TaskDispatchException {
161+
// Given
162+
ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
163+
WorkerGroupNotFoundException ex = new WorkerGroupNotFoundException("no worker group");
164+
doThrow(ex).when(taskExecutorClient).dispatch(task);
165+
166+
dispatcher.start();
167+
dispatcher.dispatchTask(task, 0);
168+
169+
// When & Then
170+
await().atMost(Duration.ofSeconds(3))
171+
.untilAsserted(() -> {
172+
// Ensure it's retrying
173+
verify(taskExecutorClient, atLeast(2)).dispatch(task);
174+
175+
// Ensure NO event has been published during this time
176+
WorkflowEventBus eventBus = task.getWorkflowEventBus();
177+
verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
178+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
179+
});
180+
}
181+
182+
@Test
183+
void dispatchTask_WorkerGroupNotFound_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent() throws TaskDispatchException {
184+
// Given
185+
MasterDispatchTimeoutCheckerConfig dispatchTimeoutCheckerConfig = new MasterDispatchTimeoutCheckerConfig();
186+
dispatchTimeoutCheckerConfig.setEnabled(true);
187+
dispatchTimeoutCheckerConfig.setMaxTaskDispatchDuration(Duration.ofMillis(200));
188+
189+
dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, dispatchTimeoutCheckerConfig);
190+
191+
ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
192+
System.currentTimeMillis() - 500);
193+
194+
WorkerGroupNotFoundException ex = new WorkerGroupNotFoundException("worker group not found");
195+
doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
196+
197+
dispatcher.start();
198+
dispatcher.dispatchTask(taskExecutionRunnable, 0);
199+
200+
// Then
201+
await().atMost(Duration.ofSeconds(2))
202+
.untilAsserted(() -> {
203+
verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
204+
WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
205+
verify(eventBus).publish(argThat(evt -> evt instanceof TaskFatalLifecycleEvent &&
206+
((TaskFatalLifecycleEvent) evt).getTaskExecutionRunnable() == taskExecutionRunnable));
207+
verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
208+
});
209+
}
210+
211+
@Test
212+
void dispatchTask_WorkerGroupNotFound_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent() throws TaskDispatchException, InterruptedException {
213+
// Given: Dispatcher configured with a 5-minute timeout (enabled)
214+
MasterDispatchTimeoutCheckerConfig dispatchTimeoutCheckerConfig = new MasterDispatchTimeoutCheckerConfig();
215+
dispatchTimeoutCheckerConfig.setEnabled(true);
216+
dispatchTimeoutCheckerConfig.setMaxTaskDispatchDuration(Duration.ofMinutes(5));
217+
218+
dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, dispatchTimeoutCheckerConfig);
219+
220+
// Mock task with first dispatch time set to 100ms ago → well within timeout window
221+
ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
222+
System.currentTimeMillis() - 100);
223+
224+
// Use CountDownLatch to reliably detect actual dispatch invocation
225+
CountDownLatch dispatchCalled = new CountDownLatch(1);
226+
227+
// Stub client to throw WorkerGroupNotFoundException and signal the latch
228+
doAnswer(invocation -> {
229+
dispatchCalled.countDown(); // Confirm dispatch was attempted
230+
throw new WorkerGroupNotFoundException("Worker group 'TestGroup' does not exist");
231+
}).when(taskExecutorClient).dispatch(taskExecutionRunnable);
232+
233+
// When: Start dispatcher and dispatch the task
234+
dispatcher.start();
235+
dispatcher.dispatchTask(taskExecutionRunnable, 0);
236+
237+
// Wait up to 1 second for the dispatch attempt to complete
238+
boolean dispatched = dispatchCalled.await(1000, TimeUnit.MILLISECONDS);
239+
Assertions.assertTrue(dispatched, "Expected dispatch() to be called within 1 second");
240+
241+
// Then: Verify NO failure events are published because timeout has NOT been exceeded
242+
WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
243+
verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
244+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
245+
}
246+
247+
@Test
248+
void dispatchTask_NoAvailableWorker_TimeoutDisabled_ShouldKeepRetrying() throws TaskDispatchException {
249+
// Given
250+
ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
251+
NoAvailableWorkerException ex = new NoAvailableWorkerException("no worker");
252+
doThrow(ex).when(taskExecutorClient).dispatch(task);
253+
254+
dispatcher.start();
255+
dispatcher.dispatchTask(task, 0);
256+
257+
// When & Then
258+
await().atMost(Duration.ofSeconds(3))
259+
.untilAsserted(() -> {
260+
// Ensure it's retrying
261+
verify(taskExecutorClient, atLeast(2)).dispatch(task);
262+
263+
// Ensure NO event has been published during this time
264+
WorkflowEventBus eventBus = task.getWorkflowEventBus();
265+
verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
266+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
267+
});
268+
}
269+
270+
@Test
271+
void dispatchTask_NoAvailableWorker_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent() throws TaskDispatchException {
272+
// Given: enable timeout (200ms), task already waited 500ms
273+
MasterDispatchTimeoutCheckerConfig dispatchTimeoutCheckerConfig = new MasterDispatchTimeoutCheckerConfig();
274+
dispatchTimeoutCheckerConfig.setEnabled(true);
275+
dispatchTimeoutCheckerConfig.setMaxTaskDispatchDuration(Duration.ofMillis(200));
276+
277+
dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, dispatchTimeoutCheckerConfig);
278+
279+
ITaskExecutionRunnable taskExecutionRunnable =
280+
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 500);
281+
282+
NoAvailableWorkerException ex = new NoAvailableWorkerException("no worker");
283+
doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
284+
285+
dispatcher.start();
286+
dispatcher.dispatchTask(taskExecutionRunnable, 0);
287+
288+
// Then
289+
await().atMost(Duration.ofSeconds(2))
290+
.untilAsserted(() -> {
291+
verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
292+
WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
293+
verify(eventBus).publish(argThat(evt -> evt instanceof TaskFailedLifecycleEvent &&
294+
((TaskFailedLifecycleEvent) evt).getTaskExecutionRunnable() == taskExecutionRunnable));
295+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
296+
});
297+
}
298+
299+
@Test
300+
void dispatchTask_NoAvailableWorker_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent() throws TaskDispatchException, InterruptedException {
301+
// Given: Configure dispatcher with a 5-minute dispatch timeout (enabled)
302+
MasterDispatchTimeoutCheckerConfig dispatchTimeoutCheckerConfig = new MasterDispatchTimeoutCheckerConfig();
303+
dispatchTimeoutCheckerConfig.setEnabled(true);
304+
dispatchTimeoutCheckerConfig.setMaxTaskDispatchDuration(Duration.ofMinutes(5));
305+
306+
dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, dispatchTimeoutCheckerConfig);
307+
308+
// Mock task with first dispatch time set to 100ms ago → ensures it's NOT timed out yet
309+
ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
310+
System.currentTimeMillis() - 100);
311+
312+
// Use CountDownLatch to reliably detect when dispatch is actually invoked (avoids timing flakiness)
313+
CountDownLatch dispatchCalled = new CountDownLatch(1);
314+
315+
// Stub the client to throw NoAvailableWorkerException on dispatch and signal the latch
316+
doAnswer(invocation -> {
317+
dispatchCalled.countDown(); // Signal that dispatch was attempted
318+
throw new NoAvailableWorkerException("no worker");
319+
}).when(taskExecutorClient).dispatch(taskExecutionRunnable);
320+
321+
// When: Start dispatcher and trigger task dispatch
322+
dispatcher.start();
323+
dispatcher.dispatchTask(taskExecutionRunnable, 0);
324+
325+
// Wait up to 1 second for the dispatch attempt to occur (ensures async execution completes)
326+
boolean dispatched = dispatchCalled.await(1000, TimeUnit.MILLISECONDS);
327+
Assertions.assertTrue(dispatched, "Expected dispatch() to be called within 1 second");
328+
329+
// Then: Verify NO failure events are published since timeout has NOT been exceeded
330+
WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
331+
verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
332+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
333+
}
334+
335+
@Test
336+
void dispatchTask_GenericTaskDispatchException_TimeoutDisabled_ShouldKeepRetrying() throws TaskDispatchException {
337+
// Given
338+
ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
339+
TaskDispatchException ex = new TaskDispatchException("generic dispatch error");
340+
doThrow(ex).when(taskExecutorClient).dispatch(task);
341+
342+
dispatcher.start();
343+
dispatcher.dispatchTask(task, 0);
344+
345+
// When & Then
346+
await().atMost(Duration.ofSeconds(3))
347+
.untilAsserted(() -> {
348+
// Ensure it's retrying
349+
verify(taskExecutorClient, atLeast(2)).dispatch(task);
350+
351+
// Ensure NO event has been published during this time
352+
WorkflowEventBus eventBus = task.getWorkflowEventBus();
353+
verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
354+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
355+
});
356+
}
357+
358+
@Test
359+
void dispatchTask_GenericTaskDispatchException_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent() throws TaskDispatchException {
360+
// Given
361+
MasterDispatchTimeoutCheckerConfig dispatchTimeoutCheckerConfig = new MasterDispatchTimeoutCheckerConfig();
362+
dispatchTimeoutCheckerConfig.setEnabled(true);
363+
dispatchTimeoutCheckerConfig.setMaxTaskDispatchDuration(Duration.ofMillis(200));
364+
365+
dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, dispatchTimeoutCheckerConfig);
366+
367+
ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
368+
System.currentTimeMillis() - 500);
369+
370+
TaskDispatchException ex = new TaskDispatchException("generic dispatch error");
371+
doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
372+
373+
dispatcher.start();
374+
dispatcher.dispatchTask(taskExecutionRunnable, 0);
375+
376+
// Then
377+
await().atMost(Duration.ofSeconds(2))
378+
.untilAsserted(() -> {
379+
verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
380+
WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
381+
verify(eventBus).publish(argThat(evt -> evt instanceof TaskFailedLifecycleEvent &&
382+
((TaskFailedLifecycleEvent) evt).getTaskExecutionRunnable() == taskExecutionRunnable));
383+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
384+
});
385+
}
386+
387+
@Test
388+
void dispatchTask_GenericTaskDispatchException_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent() throws TaskDispatchException, InterruptedException {
389+
// Given: Dispatcher configured with a 5-minute dispatch timeout (enabled)
390+
MasterDispatchTimeoutCheckerConfig config = new MasterDispatchTimeoutCheckerConfig();
391+
config.setEnabled(true);
392+
config.setMaxTaskDispatchDuration(Duration.ofMinutes(5));
393+
394+
dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, config);
395+
396+
// Mock task with first dispatch time set to 100ms ago → well within timeout window
397+
ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(
398+
System.currentTimeMillis() - 100);
399+
400+
// Use CountDownLatch to reliably detect when dispatch is actually invoked
401+
CountDownLatch dispatchCalled = new CountDownLatch(1);
402+
403+
// Stub client to throw a generic TaskDispatchException and signal the latch
404+
doAnswer(invocation -> {
405+
dispatchCalled.countDown(); // Confirm dispatch attempt occurred
406+
throw new TaskDispatchException("Generic dispatch error");
407+
}).when(taskExecutorClient).dispatch(task);
408+
409+
// When: Start dispatcher and trigger task dispatch
410+
dispatcher.start();
411+
dispatcher.dispatchTask(task, 0);
412+
413+
// Wait up to 1 second for the dispatch attempt to complete (handles async execution)
414+
boolean dispatched = dispatchCalled.await(1000, TimeUnit.MILLISECONDS);
415+
Assertions.assertTrue(dispatched, "Expected dispatch() to be called within 1 second");
416+
417+
// Then: Verify NO failure events are published because timeout has NOT been exceeded
418+
WorkflowEventBus eventBus = task.getWorkflowEventBus();
419+
verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
420+
verify(eventBus, never()).publish(any(TaskFatalLifecycleEvent.class));
421+
}
422+
423+
private ITaskExecutionRunnable mockTaskExecutionRunnableWithFirstDispatchTime(long firstDispatchTime) {
424+
ITaskExecutionRunnable taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
425+
TaskInstance taskInstance = mock(TaskInstance.class);
426+
WorkflowInstance workflowInstance = mock(WorkflowInstance.class);
427+
WorkflowEventBus eventBus = mock(WorkflowEventBus.class);
428+
429+
TaskExecutionContext context = mock(TaskExecutionContext.class);
430+
when(context.getFirstDispatchTime()).thenReturn(firstDispatchTime);
431+
432+
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
433+
when(taskExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance);
434+
when(taskExecutionRunnable.getWorkflowEventBus()).thenReturn(eventBus);
435+
when(taskExecutionRunnable.getId()).thenReturn(ThreadLocalRandom.current().nextInt(1000, 9999));
436+
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(context);
437+
438+
return taskExecutionRunnable;
439+
}
144440
}

0 commit comments

Comments
 (0)