Skip to content

Commit 11c9f03

Browse files
author
luxl
committed
fix review
1 parent 7f9d04d commit 11c9f03

File tree

2 files changed

+103
-16
lines changed

2 files changed

+103
-16
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,21 @@ public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate<Backf
7474

7575
@Override
7676
public List<Integer> execute(final BackfillWorkflowDTO backfillWorkflowDTO) {
77+
return executeWithVisitedCodes(backfillWorkflowDTO, null);
78+
}
79+
80+
List<Integer> executeWithVisitedCodes(final BackfillWorkflowDTO backfillWorkflowDTO,
81+
final Set<Long> visitedCodes) {
7782
// todo: directly call the master api to do backfill
7883
if (backfillWorkflowDTO.getBackfillParams().getRunMode() == RunMode.RUN_MODE_SERIAL) {
79-
return doSerialBackfillWorkflow(backfillWorkflowDTO);
84+
return doSerialBackfillWorkflow(backfillWorkflowDTO, visitedCodes);
8085
} else {
81-
return doParallelBackfillWorkflow(backfillWorkflowDTO);
86+
return doParallelBackfillWorkflow(backfillWorkflowDTO, visitedCodes);
8287
}
8388
}
8489

85-
private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO) {
90+
private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
91+
final Set<Long> visitedCodes) {
8692
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
8793
final List<ZonedDateTime> backfillTimeList = backfillParams.getBackfillDateList();
8894
if (backfillParams.getExecutionOrder() == ExecutionOrder.DESC_ORDER) {
@@ -93,11 +99,13 @@ private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfil
9399

94100
final Integer workflowInstanceId = doBackfillWorkflow(
95101
backfillWorkflowDTO,
96-
backfillTimeList.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
102+
backfillTimeList.stream().map(DateUtils::dateToString).collect(Collectors.toList()),
103+
visitedCodes);
97104
return Lists.newArrayList(workflowInstanceId);
98105
}
99106

100-
private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO) {
107+
private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
108+
final Set<Long> visitedCodes) {
101109
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
102110
Integer expectedParallelismNumber = backfillParams.getExpectedParallelismNumber();
103111

@@ -113,7 +121,8 @@ private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backf
113121
for (List<ZonedDateTime> stringDate : splitDateTime(listDate, expectedParallelismNumber)) {
114122
final Integer workflowInstanceId = doBackfillWorkflow(
115123
backfillWorkflowDTO,
116-
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
124+
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()),
125+
visitedCodes);
117126
workflowInstanceIdList.add(workflowInstanceId);
118127
}
119128
return workflowInstanceIdList;
@@ -144,7 +153,8 @@ private List<List<ZonedDateTime>> splitDateTime(List<ZonedDateTime> dateTimeList
144153
}
145154

146155
private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
147-
final List<String> backfillTimeList) {
156+
final List<String> backfillTimeList,
157+
final Set<Long> visitedCodes) {
148158
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
149159
if (masterServer == null) {
150160
throw new ServiceException("no master server available");
@@ -178,9 +188,9 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
178188
}
179189
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
180190
if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
181-
final Set<Long> visitedCodes = new HashSet<>();
182-
visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
183-
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes);
191+
final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new HashSet<>() : visitedCodes;
192+
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
193+
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, effectiveVisitedCodes);
184194
}
185195
return backfillTriggerResponse.getWorkflowInstanceId();
186196
}
@@ -219,7 +229,7 @@ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkf
219229
WorkflowDefinition downstreamWorkflow =
220230
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
221231
if (downstreamWorkflow == null) {
222-
log.warn("Skip dependent workflow {}, definition not found", downstreamCode);
232+
log.warn("Skip dependent workflow {}, workflow definition not found", downstreamCode);
223233
continue;
224234
}
225235

@@ -271,7 +281,7 @@ private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkf
271281

272282
// 4) Mark as visiting before recursive trigger to detect cycles, then trigger downstream backfill
273283
visitedCodes.add(downstreamCode);
274-
execute(dependentBackfillDTO);
284+
executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);
275285
}
276286
}
277287
}

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.dolphinscheduler.api.executor.workflow;
1919

20+
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.ArgumentMatchers.anyLong;
2122
import static org.mockito.Mockito.doReturn;
2223
import static org.mockito.Mockito.never;
24+
import static org.mockito.Mockito.times;
2325
import static org.mockito.Mockito.verify;
2426
import static org.mockito.Mockito.when;
2527

@@ -41,6 +43,7 @@
4143
import java.util.List;
4244
import java.util.Optional;
4345
import java.util.Set;
46+
import java.util.stream.Collectors;
4447

4548
import org.junit.jupiter.api.Assertions;
4649
import org.junit.jupiter.api.Test;
@@ -146,7 +149,8 @@ public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() t
146149
.thenReturn(Optional.of(downstreamWorkflow));
147150

148151
ArgumentCaptor<BackfillWorkflowDTO> captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class);
149-
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture());
152+
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate)
153+
.executeWithVisitedCodes(captor.capture(), any());
150154

151155
Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod(
152156
"doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class);
@@ -219,7 +223,8 @@ public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent(
219223
.thenReturn(Optional.of(downstreamWorkflow));
220224

221225
ArgumentCaptor<BackfillWorkflowDTO> captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class);
222-
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate).execute(captor.capture());
226+
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate)
227+
.executeWithVisitedCodes(captor.capture(), any());
223228

224229
Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod(
225230
"doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class);
@@ -281,7 +286,7 @@ public void testDoBackfillDependentWorkflow_SkipWorkflowNotFound() throws Except
281286
method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"),
282287
visitedCodes);
283288

284-
verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any());
289+
verify(backfillWorkflowExecutorDelegate, never()).executeWithVisitedCodes(any(), any());
285290
}
286291

287292
@Test
@@ -324,6 +329,78 @@ public void testDoBackfillDependentWorkflow_SkipOfflineWorkflow() throws Excepti
324329
method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"),
325330
visitedCodes);
326331

327-
verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any());
332+
verify(backfillWorkflowExecutorDelegate, never()).executeWithVisitedCodes(any(), any());
333+
}
334+
335+
@Test
336+
public void testDoBackfillDependentWorkflow_MultiLevelAndCycle() throws Exception {
337+
long workflowA = 10L;
338+
long workflowB = 20L;
339+
long workflowC = 30L;
340+
341+
WorkflowDefinition upstreamA =
342+
WorkflowDefinition.builder().code(workflowA).releaseState(ReleaseState.ONLINE).build();
343+
WorkflowDefinition downstreamB =
344+
WorkflowDefinition.builder().code(workflowB).releaseState(ReleaseState.ONLINE).warningGroupId(1)
345+
.build();
346+
WorkflowDefinition downstreamC =
347+
WorkflowDefinition.builder().code(workflowC).releaseState(ReleaseState.ONLINE).warningGroupId(2)
348+
.build();
349+
350+
BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder()
351+
.runMode(RunMode.RUN_MODE_SERIAL)
352+
.backfillDateList(Collections.<ZonedDateTime>emptyList())
353+
.backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT)
354+
.allLevelDependent(true)
355+
.executionOrder(ExecutionOrder.ASC_ORDER)
356+
.build();
357+
358+
BackfillWorkflowDTO dtoA = BackfillWorkflowDTO.builder()
359+
.workflowDefinition(upstreamA)
360+
.backfillParams(params)
361+
.build();
362+
363+
DependentWorkflowDefinition depToB = new DependentWorkflowDefinition();
364+
depToB.setWorkflowDefinitionCode(workflowB);
365+
DependentWorkflowDefinition depToA = new DependentWorkflowDefinition();
366+
depToA.setWorkflowDefinitionCode(workflowA);
367+
DependentWorkflowDefinition depToC = new DependentWorkflowDefinition();
368+
depToC.setWorkflowDefinitionCode(workflowC);
369+
370+
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(workflowA))
371+
.thenReturn(Collections.singletonList(depToB));
372+
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(workflowB))
373+
.thenReturn(Arrays.asList(depToA, depToC));
374+
when(workflowDefinitionDao.queryByCode(workflowB)).thenReturn(Optional.of(downstreamB));
375+
when(workflowDefinitionDao.queryByCode(workflowC)).thenReturn(Optional.of(downstreamC));
376+
377+
ArgumentCaptor<BackfillWorkflowDTO> captor = ArgumentCaptor.forClass(BackfillWorkflowDTO.class);
378+
doReturn(Collections.singletonList(1)).when(backfillWorkflowExecutorDelegate)
379+
.executeWithVisitedCodes(captor.capture(), any());
380+
381+
Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod(
382+
"doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class, Set.class);
383+
method.setAccessible(true);
384+
385+
List<String> backfillTimeList = Collections.singletonList("2026-02-01 00:00:00");
386+
Set<Long> visitedCodes = new HashSet<>();
387+
visitedCodes.add(workflowA);
388+
389+
// Level 1: A -> B
390+
method.invoke(backfillWorkflowExecutorDelegate, dtoA, backfillTimeList, visitedCodes);
391+
BackfillWorkflowDTO dtoB = captor.getAllValues().get(0);
392+
393+
// Level 2: B -> A(cycle, should skip) and B -> C(should trigger)
394+
method.invoke(backfillWorkflowExecutorDelegate, dtoB, backfillTimeList, visitedCodes);
395+
396+
verify(backfillWorkflowExecutorDelegate, times(2)).executeWithVisitedCodes(any(), any());
397+
verify(workflowDefinitionDao, never()).queryByCode(workflowA);
398+
399+
List<Long> triggeredCodes = captor.getAllValues().stream()
400+
.map(it -> it.getWorkflowDefinition().getCode())
401+
.collect(Collectors.toList());
402+
Assertions.assertEquals(Arrays.asList(workflowB, workflowC), triggeredCodes);
403+
Assertions.assertTrue(visitedCodes.contains(workflowB));
404+
Assertions.assertTrue(visitedCodes.contains(workflowC));
328405
}
329406
}

0 commit comments

Comments
 (0)