Skip to content

Commit 4505d4b

Browse files
authored
[Fix-17721] [API] Optimize the logic of queryDownstreamDependentWorkflowDefinitions (#17746)
1 parent 0a28d33 commit 4505d4b

File tree

2 files changed

+169
-14
lines changed

2 files changed

+169
-14
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -242,29 +242,38 @@ public List<DependentWorkflowDefinition> queryDownstreamDependentWorkflowDefinit
242242

243243
List<WorkflowDefinition> workflowDefinitionList =
244244
workflowDefinitionMapper.queryByCodes(workflowTaskLineageList.stream()
245-
.map(WorkflowTaskLineage::getDeptWorkflowDefinitionCode).distinct()
245+
.map(WorkflowTaskLineage::getWorkflowDefinitionCode).distinct()
246246
.collect(Collectors.toList()));
247247
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.queryByCodeList(workflowTaskLineageList.stream()
248-
.map(WorkflowTaskLineage::getDeptTaskDefinitionCode).distinct().collect(Collectors.toList()));
249-
for (TaskDefinition taskDefinition : taskDefinitionList) {
248+
.map(WorkflowTaskLineage::getTaskDefinitionCode).filter(code -> code != 0).distinct()
249+
.collect(Collectors.toList()));
250+
251+
for (WorkflowTaskLineage workflowLineage : workflowTaskLineageList) {
250252
DependentWorkflowDefinition dependentWorkflowDefinition = new DependentWorkflowDefinition();
251-
workflowTaskLineageList.stream()
252-
.filter(workflowLineage -> workflowLineage.getDeptTaskDefinitionCode() == taskDefinition.getCode())
253-
.findFirst()
254-
.ifPresent(workflowLineage -> {
255-
dependentWorkflowDefinition
256-
.setWorkflowDefinitionCode(workflowLineage.getDeptWorkflowDefinitionCode());
257-
dependentWorkflowDefinition.setTaskDefinitionCode(taskDefinition.getCode());
258-
dependentWorkflowDefinition.setTaskParams(taskDefinition.getTaskParams());
259-
dependentWorkflowDefinition.setWorkerGroup(taskDefinition.getWorkerGroup());
260-
});
253+
dependentWorkflowDefinition.setWorkflowDefinitionCode(workflowLineage.getWorkflowDefinitionCode());
254+
dependentWorkflowDefinition.setTaskDefinitionCode(workflowLineage.getTaskDefinitionCode());
255+
256+
// If taskDefinitionCode is 0, it means dependency on entire workflow, taskParams and workerGroup remain
257+
// null
258+
if (workflowLineage.getTaskDefinitionCode() != 0) {
259+
taskDefinitionList.stream()
260+
.filter(taskDefinition -> taskDefinition.getCode() == workflowLineage.getTaskDefinitionCode())
261+
.findFirst()
262+
.ifPresent(taskDefinition -> {
263+
dependentWorkflowDefinition.setTaskParams(taskDefinition.getTaskParams());
264+
dependentWorkflowDefinition.setWorkerGroup(taskDefinition.getWorkerGroup());
265+
});
266+
}
267+
261268
workflowDefinitionList.stream()
262-
.filter(workflowDefinition -> workflowDefinition.getCode() == dependentWorkflowDefinition
269+
.filter(workflowDefinition -> workflowDefinition.getCode() == workflowLineage
263270
.getWorkflowDefinitionCode())
264271
.findFirst()
265272
.ifPresent(workflowDefinition -> {
266273
dependentWorkflowDefinition.setWorkflowDefinitionVersion(workflowDefinition.getVersion());
267274
});
275+
276+
dependentWorkflowDefinitionList.add(dependentWorkflowDefinition);
268277
}
269278

270279
return dependentWorkflowDefinitionList;
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.service.impl;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.verifyNoInteractions;
23+
import static org.mockito.Mockito.when;
24+
25+
import org.apache.dolphinscheduler.common.constants.Constants;
26+
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
27+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
28+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
29+
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskLineage;
30+
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
31+
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
32+
import org.apache.dolphinscheduler.dao.repository.WorkflowTaskLineageDao;
33+
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.List;
37+
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.extension.ExtendWith;
41+
import org.mockito.InjectMocks;
42+
import org.mockito.Mock;
43+
import org.mockito.junit.jupiter.MockitoExtension;
44+
import org.springframework.test.util.ReflectionTestUtils;
45+
46+
@ExtendWith(MockitoExtension.class)
47+
class WorkflowLineageServiceImplTest {
48+
49+
@InjectMocks
50+
private WorkflowLineageServiceImpl workflowLineageService;
51+
52+
@Mock
53+
private WorkflowTaskLineageDao workflowTaskLineageDao;
54+
55+
@Mock
56+
private WorkflowDefinitionMapper workflowDefinitionMapper;
57+
58+
@Mock
59+
private TaskDefinitionMapper taskDefinitionMapper;
60+
61+
@BeforeEach
62+
void setUp() {
63+
ReflectionTestUtils.setField(workflowLineageService, "workflowTaskLineageDao", workflowTaskLineageDao);
64+
ReflectionTestUtils.setField(workflowLineageService, "workflowDefinitionMapper", workflowDefinitionMapper);
65+
ReflectionTestUtils.setField(workflowLineageService, "taskDefinitionMapper", taskDefinitionMapper);
66+
}
67+
68+
@Test
69+
void shouldReturnEmptyListWhenNoLineageExist() {
70+
long workflowCode = 100L;
71+
when(workflowTaskLineageDao
72+
.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE, workflowCode, Constants.DEPENDENT_ALL_TASK))
73+
.thenReturn(Collections.emptyList());
74+
75+
List<DependentWorkflowDefinition> result =
76+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(workflowCode);
77+
78+
assertThat(result).isEmpty();
79+
verifyNoInteractions(workflowDefinitionMapper, taskDefinitionMapper);
80+
}
81+
82+
@Test
83+
void shouldBuildDependentWorkflowDefinitions() {
84+
long upstreamWorkflowCode = 1L;
85+
86+
WorkflowTaskLineage taskLineage = new WorkflowTaskLineage();
87+
taskLineage.setWorkflowDefinitionCode(200L);
88+
taskLineage.setDeptWorkflowDefinitionCode(upstreamWorkflowCode);
89+
taskLineage.setTaskDefinitionCode(300L);
90+
taskLineage.setDeptTaskDefinitionCode(0L);
91+
92+
WorkflowTaskLineage workflowLineage = new WorkflowTaskLineage();
93+
workflowLineage.setWorkflowDefinitionCode(201L);
94+
workflowLineage.setDeptWorkflowDefinitionCode(upstreamWorkflowCode);
95+
workflowLineage.setTaskDefinitionCode(0L);
96+
97+
when(workflowTaskLineageDao
98+
.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE, upstreamWorkflowCode,
99+
Constants.DEPENDENT_ALL_TASK))
100+
.thenReturn(Arrays.asList(taskLineage, workflowLineage));
101+
102+
WorkflowDefinition workflowDefinition200 = new WorkflowDefinition();
103+
workflowDefinition200.setCode(200L);
104+
workflowDefinition200.setVersion(3);
105+
106+
WorkflowDefinition workflowDefinition201 = new WorkflowDefinition();
107+
workflowDefinition201.setCode(201L);
108+
workflowDefinition201.setVersion(4);
109+
110+
when(workflowDefinitionMapper.queryByCodes(Arrays.asList(200L, 201L)))
111+
.thenReturn(Arrays.asList(workflowDefinition200, workflowDefinition201));
112+
113+
TaskDefinition taskDefinition = new TaskDefinition();
114+
taskDefinition.setCode(300L);
115+
taskDefinition.setTaskParams("task-params");
116+
taskDefinition.setWorkerGroup("test-group");
117+
118+
when(taskDefinitionMapper.queryByCodeList(Collections.singletonList(300L)))
119+
.thenReturn(Collections.singletonList(taskDefinition));
120+
121+
List<DependentWorkflowDefinition> result =
122+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
123+
124+
assertThat(result).hasSize(2);
125+
126+
DependentWorkflowDefinition taskDependent = result.stream()
127+
.filter(dependent -> dependent.getWorkflowDefinitionCode() == 200L)
128+
.findFirst()
129+
.orElseThrow(() -> new AssertionError("Expected DependentWorkflowDefinition with code 200 not found"));
130+
assertThat(taskDependent.getTaskDefinitionCode()).isEqualTo(300L);
131+
assertThat(taskDependent.getTaskParams()).isEqualTo("task-params");
132+
assertThat(taskDependent.getWorkerGroup()).isEqualTo("test-group");
133+
assertThat(taskDependent.getWorkflowDefinitionVersion()).isEqualTo(3);
134+
135+
DependentWorkflowDefinition workflowDependent = result.stream()
136+
.filter(dependent -> dependent.getWorkflowDefinitionCode() == 201L)
137+
.findFirst()
138+
.orElseThrow(() -> new AssertionError("Expected DependentWorkflowDefinition with code 201 not found"));
139+
assertThat(workflowDependent.getTaskDefinitionCode()).isEqualTo(0L);
140+
assertThat(workflowDependent.getTaskParams()).isNull();
141+
assertThat(workflowDependent.getWorkerGroup()).isNull();
142+
assertThat(workflowDependent.getWorkflowDefinitionVersion()).isEqualTo(4);
143+
144+
verify(taskDefinitionMapper).queryByCodeList(Collections.singletonList(300L));
145+
}
146+
}

0 commit comments

Comments
 (0)