|
93 | 93 | import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; |
94 | 94 | import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; |
95 | 95 | import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao; |
| 96 | +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem; |
| 97 | +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel; |
96 | 98 | import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; |
97 | 99 | import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; |
98 | 100 | import org.apache.dolphinscheduler.plugin.task.api.model.Property; |
| 101 | +import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; |
| 102 | +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; |
99 | 103 | import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; |
100 | 104 | import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; |
101 | 105 | import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; |
|
120 | 124 | import java.util.Set; |
121 | 125 | import java.util.TreeSet; |
122 | 126 | import java.util.concurrent.ConcurrentHashMap; |
| 127 | +import java.util.function.Consumer; |
123 | 128 | import java.util.function.Function; |
| 129 | +import java.util.function.Supplier; |
124 | 130 | import java.util.stream.Collectors; |
125 | 131 | import java.util.stream.Stream; |
126 | 132 |
|
@@ -1492,25 +1498,16 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, |
1492 | 1498 | taskDefinitionLog.setProjectCode(targetProjectCode); |
1493 | 1499 | taskDefinitionLog.setVersion(0); |
1494 | 1500 | taskDefinitionLog.setName(taskDefinitionLog.getName()); |
| 1501 | + |
1495 | 1502 | if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) { |
1496 | | - final String taskParams = taskDefinitionLog.getTaskParams(); |
1497 | | - final SwitchParameters switchParameters = |
1498 | | - JSONUtils.parseObject(taskParams, SwitchParameters.class); |
1499 | | - if (switchParameters == null) { |
1500 | | - throw new IllegalArgumentException( |
1501 | | - "Switch task params: " + taskParams + " is invalid."); |
1502 | | - } |
1503 | | - SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); |
1504 | | - switchResult.getDependTaskList().forEach(switchResultVo -> { |
1505 | | - switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode())); |
1506 | | - }); |
1507 | | - if (switchResult.getNextNode() != null) { |
1508 | | - switchResult.setNextNode( |
1509 | | - taskCodeMap.get(switchResult.getNextNode())); |
1510 | | - } |
1511 | | - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters)); |
| 1503 | + replaceTaskCodeForSwitchTaskParams(taskDefinitionLog, taskCodeMap); |
| 1504 | + } |
| 1505 | + |
| 1506 | + if (TaskTypeUtils.isConditionTask(taskDefinitionLog.getTaskType())) { |
| 1507 | + replaceTaskCodeForConditionTaskParams(taskDefinitionLog, taskCodeMap); |
1512 | 1508 | } |
1513 | 1509 | } |
| 1510 | + |
1514 | 1511 | for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) { |
1515 | 1512 | if (workflowTaskRelationLog.getPreTaskCode() > 0) { |
1516 | 1513 | workflowTaskRelationLog |
@@ -1582,6 +1579,109 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, |
1582 | 1579 | } |
1583 | 1580 | } |
1584 | 1581 |
|
| 1582 | + /** |
| 1583 | + * Replaces old task codes with new ones in the parameters of a Switch task. |
| 1584 | + * Used during workflow duplication or import to preserve correct task dependencies. |
| 1585 | + */ |
| 1586 | + private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefLog, Map<Long, Long> taskCodeMap) { |
| 1587 | + String taskParams = taskDefLog.getTaskParams(); |
| 1588 | + SwitchParameters params; |
| 1589 | + |
| 1590 | + try { |
| 1591 | + params = JSONUtils.parseObject(taskParams, SwitchParameters.class); |
| 1592 | + } catch (Exception e) { |
| 1593 | + log.warn("Invalid Switch task params: {}", taskParams, e); |
| 1594 | + throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams, e); |
| 1595 | + } |
| 1596 | + |
| 1597 | + if (params == null) { |
| 1598 | + log.warn("Parsed Switch task params is null: {}", taskParams); |
| 1599 | + throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams); |
| 1600 | + } |
| 1601 | + |
| 1602 | + // Update nextBranch if mapped |
| 1603 | + Long nextBranch = params.getNextBranch(); |
| 1604 | + if (nextBranch != null && taskCodeMap.containsKey(nextBranch)) { |
| 1605 | + params.setNextBranch(taskCodeMap.get(nextBranch)); |
| 1606 | + } |
| 1607 | + |
| 1608 | + // Update switch result nodes |
| 1609 | + SwitchParameters.SwitchResult result = params.getSwitchResult(); |
| 1610 | + if (result != null) { |
| 1611 | + Long nextNode = result.getNextNode(); |
| 1612 | + if (nextNode != null && taskCodeMap.containsKey(nextNode)) { |
| 1613 | + result.setNextNode(taskCodeMap.get(nextNode)); |
| 1614 | + } |
| 1615 | + |
| 1616 | + // Update depend task list in result |
| 1617 | + for (SwitchResultVo vo : result.getDependTaskList()) { |
| 1618 | + Long original = vo.getNextNode(); |
| 1619 | + if (original != null && taskCodeMap.containsKey(original)) { |
| 1620 | + vo.setNextNode(taskCodeMap.get(original)); |
| 1621 | + } |
| 1622 | + } |
| 1623 | + } |
| 1624 | + |
| 1625 | + taskDefLog.setTaskParams(JSONUtils.toJsonString(params)); |
| 1626 | + } |
| 1627 | + |
| 1628 | + /** |
| 1629 | + * Replaces old task codes with new ones in the parameters of a Condition task. |
| 1630 | + * Used during workflow duplication or import to preserve correct task dependencies. |
| 1631 | + */ |
| 1632 | + private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefLog, Map<Long, Long> taskCodeMap) { |
| 1633 | + String taskParams = taskDefLog.getTaskParams(); |
| 1634 | + ConditionsParameters params; |
| 1635 | + |
| 1636 | + try { |
| 1637 | + params = JSONUtils.parseObject(taskParams, ConditionsParameters.class); |
| 1638 | + } catch (Exception e) { |
| 1639 | + log.warn("Invalid Condition task params: {}", taskParams, e); |
| 1640 | + throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams, e); |
| 1641 | + } |
| 1642 | + |
| 1643 | + if (params == null) { |
| 1644 | + log.warn("Parsed Condition task params is null: {}", taskParams); |
| 1645 | + throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams); |
| 1646 | + } |
| 1647 | + |
| 1648 | + // Update dependency task codes |
| 1649 | + ConditionsParameters.ConditionDependency dep = params.getDependence(); |
| 1650 | + if (dep != null) { |
| 1651 | + for (ConditionDependentTaskModel taskModel : dep.getDependTaskList()) { |
| 1652 | + for (ConditionDependentItem item : taskModel.getDependItemList()) { |
| 1653 | + Long oldCode = item.getDepTaskCode(); |
| 1654 | + if (taskCodeMap.containsKey(oldCode)) { |
| 1655 | + item.setDepTaskCode(taskCodeMap.get(oldCode)); |
| 1656 | + } |
| 1657 | + } |
| 1658 | + } |
| 1659 | + } |
| 1660 | + |
| 1661 | + // Update success/failed node lists |
| 1662 | + ConditionsParameters.ConditionResult result = params.getConditionResult(); |
| 1663 | + if (result != null) { |
| 1664 | + replaceInNodeList(result::getSuccessNode, result::setSuccessNode, taskCodeMap); |
| 1665 | + replaceInNodeList(result::getFailedNode, result::setFailedNode, taskCodeMap); |
| 1666 | + } |
| 1667 | + |
| 1668 | + taskDefLog.setTaskParams(JSONUtils.toJsonString(params)); |
| 1669 | + } |
| 1670 | + |
| 1671 | + // Helper to avoid duplication for success/failed node lists |
| 1672 | + private void replaceInNodeList(Supplier<List<Long>> getter, Consumer<List<Long>> setter, |
| 1673 | + Map<Long, Long> taskCodeMap) { |
| 1674 | + List<Long> original = getter.get(); |
| 1675 | + if (CollectionUtils.isEmpty(original)) |
| 1676 | + return; |
| 1677 | + |
| 1678 | + List<Long> updated = original.stream() |
| 1679 | + .map(code -> code != null && taskCodeMap.containsKey(code) ? taskCodeMap.get(code) : code) |
| 1680 | + .collect(Collectors.toList()); |
| 1681 | + |
| 1682 | + setter.accept(updated); |
| 1683 | + } |
| 1684 | + |
1585 | 1685 | /** |
1586 | 1686 | * get new task name or workflow name when copy or import operate |
1587 | 1687 | * |
|
0 commit comments