Skip to content

Commit d39a702

Browse files
authored
[Fix-17638][API] Optimize workflow lineage update logic (#17678)
1 parent 67b9a80 commit d39a702

File tree

5 files changed

+203
-19
lines changed

5 files changed

+203
-19
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,15 @@ List<DependentLineageTask> queryDependentWorkflowDefinitions(long projectCode, l
5757

5858
int createWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages);
5959

60-
int updateWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages);
60+
/**
61+
* Replace the lineage of given workflow definition by new lineage list.
62+
* When the list is empty, existing lineage data will be deleted.
63+
*
64+
* @param workflowDefinitionCode workflow definition to update
65+
* @param workflowTaskLineages new lineage list, can be empty
66+
* @return affected rows
67+
*/
68+
int updateWorkflowLineage(long workflowDefinitionCode, List<WorkflowTaskLineage> workflowTaskLineages);
6169

6270
int deleteWorkflowLineage(List<Long> workflowDefinitionCodes);
6371
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -409,23 +409,10 @@ public void saveWorkflowLineage(long projectCode,
409409
long workflowDefinitionCode,
410410
int workflowDefinitionVersion,
411411
List<TaskDefinitionLog> taskDefinitionLogList) {
412-
List<WorkflowTaskLineage> workflowTaskLineageList =
413-
generateWorkflowLineageList(taskDefinitionLogList, workflowDefinitionCode, workflowDefinitionVersion);
414-
if (workflowTaskLineageList.isEmpty()) {
415-
return;
416-
}
412+
List<WorkflowTaskLineage> workflowTaskLineageList = generateWorkflowLineageList(taskDefinitionLogList,
413+
workflowDefinitionCode, workflowDefinitionVersion);
417414

418-
int insertWorkflowLineageResult = workflowLineageService.updateWorkflowLineage(workflowTaskLineageList);
419-
if (insertWorkflowLineageResult <= 0) {
420-
log.error(
421-
"Save workflow lineage error, projectCode: {}, workflowDefinitionCode: {}, workflowDefinitionVersion: {}",
422-
projectCode, workflowDefinitionCode, workflowDefinitionVersion);
423-
throw new ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR);
424-
} else {
425-
log.info(
426-
"Save workflow lineage complete, projectCode: {}, workflowDefinitionCode: {}, workflowDefinitionVersion: {}",
427-
projectCode, workflowDefinitionCode, workflowDefinitionVersion);
428-
}
415+
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, workflowTaskLineageList);
429416
}
430417

431418
private List<WorkflowTaskLineage> generateWorkflowLineageList(List<TaskDefinitionLog> taskDefinitionLogList,

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import java.text.MessageFormat;
4040
import java.util.ArrayList;
41+
import java.util.Collections;
4142
import java.util.List;
4243
import java.util.Optional;
4344
import java.util.stream.Collectors;
@@ -318,8 +319,26 @@ public int createWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages)
318319
}
319320

320321
@Override
321-
public int updateWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages) {
322-
return workflowTaskLineageDao.updateWorkflowTaskLineage(workflowTaskLineages);
322+
public int updateWorkflowLineage(long workflowDefinitionCode, List<WorkflowTaskLineage> workflowTaskLineages) {
323+
// Remove existing lineage first to keep data consistent
324+
workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(
325+
Collections.singletonList(workflowDefinitionCode));
326+
327+
if (CollectionUtils.isEmpty(workflowTaskLineages)) {
328+
log.info("Current lineage is empty, workflowDefinitionCode: {}",
329+
workflowDefinitionCode);
330+
return 0;
331+
}
332+
333+
int insertResult = workflowTaskLineageDao.batchInsert(workflowTaskLineages);
334+
if (insertResult <= 0) {
335+
log.error("Save workflow lineage error, workflowDefinitionCode: {}", workflowDefinitionCode);
336+
throw new ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR);
337+
}
338+
339+
log.info("Save workflow lineage complete, workflowDefinitionCode: {}, inserted rows: {}",
340+
workflowDefinitionCode, insertResult);
341+
return insertResult;
323342
}
324343

325344
@Override

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static org.mockito.Mockito.doNothing;
3636
import static org.mockito.Mockito.doThrow;
3737
import static org.mockito.Mockito.times;
38+
import static org.mockito.Mockito.verify;
3839
import static org.mockito.Mockito.when;
3940

4041
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
@@ -1390,4 +1391,93 @@ private MultipartFile createMultipartFile(String filePath) throws URISyntaxExcep
13901391
content);
13911392
return multipartFile;
13921393
}
1394+
1395+
@Test
1396+
public void testSaveWorkflowLineageWithEmptyList() {
1397+
// Test case: Empty lineage list should delete historical lineage
1398+
long projectCode = 1L;
1399+
long workflowDefinitionCode = 100L;
1400+
int workflowDefinitionVersion = 1;
1401+
List<TaskDefinitionLog> emptyTaskDefinitionLogList = new ArrayList<>();
1402+
1403+
// Mock updateWorkflowLineage to return 0 for empty list
1404+
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode), anyList()))
1405+
.thenReturn(0);
1406+
1407+
// Execute - should not throw exception
1408+
Assertions.assertDoesNotThrow(() -> {
1409+
processDefinitionService.saveWorkflowLineage(projectCode, workflowDefinitionCode,
1410+
workflowDefinitionVersion, emptyTaskDefinitionLogList);
1411+
});
1412+
1413+
// Verify that updateWorkflowLineage was called with empty list
1414+
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode), anyList());
1415+
}
1416+
1417+
@Test
1418+
public void testSaveWorkflowLineageWithNonEmptyList() {
1419+
// Test case: Normal save with non-empty lineage list
1420+
long projectCode = 1L;
1421+
long workflowDefinitionCode = 100L;
1422+
int workflowDefinitionVersion = 1;
1423+
1424+
// Create task definition logs with dependent tasks
1425+
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
1426+
TaskDefinitionLog taskLog = new TaskDefinitionLog();
1427+
taskLog.setCode(200L);
1428+
taskLog.setVersion(1);
1429+
taskLog.setProjectCode(projectCode);
1430+
taskLog.setTaskType("DEPENDENT");
1431+
// Set taskParams with dependent parameters
1432+
String taskParams =
1433+
"{\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"definitionCode\":50,\"depTaskCode\":300}]}]}}";
1434+
taskLog.setTaskParams(taskParams);
1435+
taskDefinitionLogList.add(taskLog);
1436+
1437+
// Mock updateWorkflowLineage to return success
1438+
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode), anyList()))
1439+
.thenReturn(1);
1440+
1441+
// Execute - should not throw exception
1442+
Assertions.assertDoesNotThrow(() -> {
1443+
processDefinitionService.saveWorkflowLineage(projectCode, workflowDefinitionCode,
1444+
workflowDefinitionVersion, taskDefinitionLogList);
1445+
});
1446+
1447+
// Verify that updateWorkflowLineage was called
1448+
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode), anyList());
1449+
}
1450+
1451+
@Test
1452+
public void testSaveWorkflowLineageWithInsertFailure() {
1453+
// Test case: Should throw exception when insert fails
1454+
long projectCode = 1L;
1455+
long workflowDefinitionCode = 100L;
1456+
int workflowDefinitionVersion = 1;
1457+
1458+
// Create task definition logs
1459+
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
1460+
TaskDefinitionLog taskLog = new TaskDefinitionLog();
1461+
taskLog.setCode(200L);
1462+
taskLog.setVersion(1);
1463+
taskLog.setProjectCode(projectCode);
1464+
taskLog.setTaskType("DEPENDENT");
1465+
String taskParams =
1466+
"{\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"definitionCode\":50,\"depTaskCode\":300}]}]}}";
1467+
taskLog.setTaskParams(taskParams);
1468+
taskDefinitionLogList.add(taskLog);
1469+
1470+
// Mock updateWorkflowLineage to throw exception (insert failure)
1471+
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode), anyList()))
1472+
.thenThrow(new ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR));
1473+
1474+
// Execute and verify exception
1475+
ServiceException exception = Assertions.assertThrows(ServiceException.class, () -> {
1476+
processDefinitionService.saveWorkflowLineage(projectCode, workflowDefinitionCode,
1477+
workflowDefinitionVersion, taskDefinitionLogList);
1478+
});
1479+
1480+
Assertions.assertEquals(Status.CREATE_WORKFLOW_LINEAGE_ERROR.getCode(), exception.getCode());
1481+
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode), anyList());
1482+
}
13931483
}

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717

1818
package org.apache.dolphinscheduler.api.service;
1919

20+
import static org.mockito.ArgumentMatchers.anyList;
21+
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.Mockito.verify;
2023
import static org.mockito.Mockito.when;
2124

25+
import org.apache.dolphinscheduler.api.enums.Status;
26+
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
2227
import org.apache.dolphinscheduler.api.service.impl.WorkflowLineageServiceImpl;
2328
import org.apache.dolphinscheduler.dao.entity.Project;
2429
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -256,4 +261,79 @@ public void testTaskDependentMsgWithEmptyListAfterFilteringOrphanedRecords() {
256261
Assertions.assertFalse(result.isPresent());
257262
}
258263

264+
@Test
265+
public void testUpdateWorkflowLineageWithNonEmptyList() {
266+
// Test case: Normal update with non-empty lineage list
267+
long workflowDefinitionCode = 100L;
268+
List<WorkflowTaskLineage> workflowTaskLineages = new ArrayList<>();
269+
270+
WorkflowTaskLineage lineage1 = new WorkflowTaskLineage();
271+
lineage1.setWorkflowDefinitionCode(workflowDefinitionCode);
272+
lineage1.setTaskDefinitionCode(200L);
273+
workflowTaskLineages.add(lineage1);
274+
275+
WorkflowTaskLineage lineage2 = new WorkflowTaskLineage();
276+
lineage2.setWorkflowDefinitionCode(workflowDefinitionCode);
277+
lineage2.setTaskDefinitionCode(300L);
278+
workflowTaskLineages.add(lineage2);
279+
280+
// Mock DAO methods
281+
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(2);
282+
when(workflowTaskLineageDao.batchInsert(workflowTaskLineages)).thenReturn(2);
283+
284+
// Execute
285+
int result = workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, workflowTaskLineages);
286+
287+
// Verify
288+
Assertions.assertEquals(2, result);
289+
verify(workflowTaskLineageDao)
290+
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
291+
verify(workflowTaskLineageDao).batchInsert(workflowTaskLineages);
292+
}
293+
294+
@Test
295+
public void testUpdateWorkflowLineageWithEmptyList() {
296+
// Test case: Empty list should delete historical lineage and return 0
297+
long workflowDefinitionCode = 100L;
298+
List<WorkflowTaskLineage> emptyList = new ArrayList<>();
299+
300+
// Mock DAO method
301+
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(1);
302+
303+
// Execute
304+
int result = workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, emptyList);
305+
306+
// Verify
307+
Assertions.assertEquals(0, result);
308+
verify(workflowTaskLineageDao)
309+
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
310+
// batchInsert should not be called when list is empty
311+
}
312+
313+
@Test
314+
public void testUpdateWorkflowLineageWithInsertFailure() {
315+
// Test case: Should throw exception when insert fails
316+
long workflowDefinitionCode = 100L;
317+
List<WorkflowTaskLineage> workflowTaskLineages = new ArrayList<>();
318+
319+
WorkflowTaskLineage lineage1 = new WorkflowTaskLineage();
320+
lineage1.setWorkflowDefinitionCode(workflowDefinitionCode);
321+
lineage1.setTaskDefinitionCode(200L);
322+
workflowTaskLineages.add(lineage1);
323+
324+
// Mock DAO methods
325+
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(1);
326+
when(workflowTaskLineageDao.batchInsert(workflowTaskLineages)).thenReturn(0); // Insert failure
327+
328+
// Execute and verify exception
329+
ServiceException exception = Assertions.assertThrows(ServiceException.class, () -> {
330+
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, workflowTaskLineages);
331+
});
332+
333+
Assertions.assertEquals(Status.CREATE_WORKFLOW_LINEAGE_ERROR.getCode(), exception.getCode());
334+
verify(workflowTaskLineageDao)
335+
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
336+
verify(workflowTaskLineageDao).batchInsert(workflowTaskLineages);
337+
}
338+
259339
}

0 commit comments

Comments
 (0)