Skip to content

Commit 3fd2918

Browse files
author
苏义超
committed
replace task code for Switch and Condition Task
1 parent e1ca74d commit 3fd2918

File tree

1 file changed

+52
-62
lines changed

1 file changed

+52
-62
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java

Lines changed: 52 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@
116116
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
117117
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
118118
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
119-
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
120119
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
121120
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
122121
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
@@ -2120,10 +2119,10 @@ protected void doBatchOperateWorkflowDefinition(User loginUser,
21202119
taskDefinitionLog.setVersion(0);
21212120
taskDefinitionLog.setName(taskDefinitionLog.getName());
21222121
if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) {
2123-
updateSwitchTaskParams(taskDefinitionLog, taskCodeMap);
2122+
replaceTaskCodeForSwitchTaskParams(taskDefinitionLog, taskCodeMap);
21242123
}
21252124
if (TaskTypeUtils.isConditionTask(taskDefinitionLog.getTaskType())) {
2126-
updateConditionTaskParams(taskDefinitionLog, taskCodeMap);
2125+
replaceTaskCodeForConditionTaskParams(taskDefinitionLog, taskCodeMap);
21272126
}
21282127
}
21292128
for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) {
@@ -2198,118 +2197,109 @@ protected void doBatchOperateWorkflowDefinition(User loginUser,
21982197
}
21992198

22002199
/**
2201-
* Updates task code references inside the task parameters of a SWITCH-type task.
2202-
* Replaces {@code nextNode} in both the default branch and conditional branches
2203-
* using the provided {@code taskCodeMap}.
2204-
* <p>
2205-
* If parsing fails, an error is recorded in {@code result} and the method returns {@code false}.
2200+
* replace task code references inside the task parameters of a Switch task.
22062201
*
22072202
* @param taskDefinitionLog the task log to update
22082203
* @param taskCodeMap mapping from old task code to new task code
22092204
* @throws IllegalArgumentException if taskParams is invalid or cannot be parsed
22102205
*/
2211-
private void updateSwitchTaskParams(TaskDefinitionLog taskDefinitionLog, Map<Long, Long> taskCodeMap) {
2206+
private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefinitionLog, Map<Long, Long> taskCodeMap) {
22122207
final String taskParams = taskDefinitionLog.getTaskParams();
2213-
final SwitchParameters switchParameters = JSONUtils.parseObject(taskParams, SwitchParameters.class);
2214-
2208+
final SwitchParameters switchParameters =
2209+
JSONUtils.parseObject(taskParams, SwitchParameters.class);
22152210
if (switchParameters == null) {
2216-
log.warn("Failed to parse SWITCH task params: {}", taskParams);
2211+
log.warn("Failed to parse Switch task params: {}", taskParams);
22172212
throw new IllegalArgumentException(
22182213
"Switch task params: " + taskParams + " is invalid.");
22192214
}
22202215

2221-
// Update top-level nextBranch if used
2216+
// SwitchParameters.nextBranch
22222217
if (switchParameters.getNextBranch() != null && taskCodeMap.containsKey(switchParameters.getNextBranch())) {
22232218
switchParameters.setNextBranch(taskCodeMap.get(switchParameters.getNextBranch()));
22242219
}
22252220

2226-
// Update switchResult block
2221+
// SwitchParameters.SwitchResult
22272222
SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
22282223
if (switchResult != null) {
2229-
// Default branch
2224+
// SwitchParameters.SwitchResult.nextNode
22302225
if (switchResult.getNextNode() != null && taskCodeMap.containsKey(switchResult.getNextNode())) {
2231-
switchResult.setNextNode(taskCodeMap.get(switchResult.getNextNode()));
2226+
switchResult.setNextNode(
2227+
taskCodeMap.get(switchResult.getNextNode()));
22322228
}
22332229

2234-
// Conditional branches
2235-
if (CollectionUtils.isNotEmpty(switchResult.getDependTaskList())) {
2236-
for (SwitchResultVo vo : switchResult.getDependTaskList()) {
2237-
if (vo != null && vo.getNextNode() != null && taskCodeMap.containsKey(vo.getNextNode())) {
2238-
vo.setNextNode(taskCodeMap.get(vo.getNextNode()));
2239-
}
2240-
}
2241-
}
2230+
// SwitchParameters.SwitchResult.SwitchResultVo.nextNode
2231+
switchResult.getDependTaskList().forEach(switchResultVo -> {
2232+
switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode()));
2233+
});
22422234
}
22432235

22442236
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters));
22452237
}
22462238

22472239
/**
2248-
* Updates task code references inside the task parameters of a CONDITIONS-type task.
2249-
* Replaces:
2250-
* - {@code depTaskCode} in each {@code ConditionDependentItem}
2251-
* - node IDs in {@code successNode} and {@code failedNode}
2252-
* using the provided {@code taskCodeMap}.
2240+
* replace task code references inside the task parameters of a Condition task.
22532241
*
22542242
* @param taskDefinitionLog the task log to update
22552243
* @param taskCodeMap mapping from old task code to new task code
22562244
* @throws IllegalArgumentException if taskParams is invalid or cannot be parsed
22572245
*/
2258-
private void updateConditionTaskParams(TaskDefinitionLog taskDefinitionLog, Map<Long, Long> taskCodeMap) {
2246+
private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefinitionLog,
2247+
Map<Long, Long> taskCodeMap) {
22592248
final String taskParams = taskDefinitionLog.getTaskParams();
2260-
final ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskParams, ConditionsParameters.class);
2249+
final ConditionsParameters conditionsParameters =
2250+
JSONUtils.parseObject(taskParams, ConditionsParameters.class);
22612251

22622252
if (conditionsParameters == null) {
2263-
log.warn("Failed to parse CONDITION task params: {}", taskParams);
2264-
throw new IllegalArgumentException("Condition task params: " + taskParams + " is invalid.");
2253+
log.warn("Failed to parse Condition task params: {}", taskParams);
2254+
throw new IllegalArgumentException(
2255+
"Condition task params: " + taskParams + " is invalid.");
22652256
}
22662257

2267-
// Update dependence -> depTaskCode
2268-
ConditionsParameters.ConditionDependency dependence = conditionsParameters.getDependence();
2269-
if (dependence != null && CollectionUtils.isNotEmpty(dependence.getDependTaskList())) {
2270-
for (ConditionDependentTaskModel dependTask : dependence.getDependTaskList()) {
2271-
if (CollectionUtils.isEmpty(dependTask.getDependItemList())) {
2258+
// ConditionsParameters.ConditionDependency
2259+
ConditionsParameters.ConditionDependency conditionDependency = conditionsParameters.getDependence();
2260+
if (conditionDependency != null && CollectionUtils.isNotEmpty(conditionDependency.getDependTaskList())) {
2261+
for (ConditionDependentTaskModel conditionDependentTaskModel : conditionDependency.getDependTaskList()) {
2262+
if (CollectionUtils.isEmpty(conditionDependentTaskModel.getDependItemList())) {
22722263
continue;
22732264
}
2274-
for (ConditionDependentItem item : dependTask.getDependItemList()) {
2275-
if (item == null) {
2265+
for (ConditionDependentItem conditionDependentItem : conditionDependentTaskModel.getDependItemList()) {
2266+
if (conditionDependentItem == null) {
22762267
continue;
22772268
}
2278-
Long oldCode = item.getDepTaskCode();
2279-
if (taskCodeMap.containsKey(oldCode)) {
2280-
item.setDepTaskCode(taskCodeMap.get(oldCode));
2269+
// ConditionsParameters.ConditionDependency.ConditionDependentTaskModel.ConditionDependentItem.depTaskCode
2270+
Long depTaskCode = conditionDependentItem.getDepTaskCode();
2271+
if (taskCodeMap.containsKey(depTaskCode)) {
2272+
conditionDependentItem.setDepTaskCode(taskCodeMap.get(depTaskCode));
22812273
}
22822274
}
22832275
}
22842276
}
22852277

2286-
// Update condition result branches
2278+
// ConditionsParameters.ConditionResult
22872279
ConditionsParameters.ConditionResult conditionResult = conditionsParameters.getConditionResult();
22882280
if (conditionResult != null) {
2289-
// Success branch
2281+
// ConditionsParameters.ConditionResult.successNode
22902282
if (CollectionUtils.isNotEmpty(conditionResult.getSuccessNode())) {
2291-
List<Long> updatedSuccess = conditionResult.getSuccessNode().stream()
2292-
.map(code -> {
2293-
if (code != null && taskCodeMap.containsKey(code)) {
2294-
return taskCodeMap.get(code);
2283+
List<Long> successNode = conditionResult.getSuccessNode().stream()
2284+
.map(taskCode -> {
2285+
if (taskCode != null && taskCodeMap.containsKey(taskCode)) {
2286+
return taskCodeMap.get(taskCode);
22952287
}
2296-
return code;
2297-
})
2298-
.collect(Collectors.toList());
2299-
conditionResult.setSuccessNode(updatedSuccess);
2288+
return taskCode;
2289+
}).collect(Collectors.toList());
2290+
conditionResult.setSuccessNode(successNode);
23002291
}
23012292

2302-
// Failed branch
2293+
// ConditionsParameters.ConditionResult.failedNode
23032294
if (CollectionUtils.isNotEmpty(conditionResult.getFailedNode())) {
2304-
List<Long> updatedFailed = conditionResult.getFailedNode().stream()
2305-
.map(code -> {
2306-
if (code != null && taskCodeMap.containsKey(code)) {
2307-
return taskCodeMap.get(code);
2295+
List<Long> failedNode = conditionResult.getFailedNode().stream()
2296+
.map(taskCode -> {
2297+
if (taskCode != null && taskCodeMap.containsKey(taskCode)) {
2298+
return taskCodeMap.get(taskCode);
23082299
}
2309-
return code;
2310-
})
2311-
.collect(Collectors.toList());
2312-
conditionResult.setFailedNode(updatedFailed);
2300+
return taskCode;
2301+
}).collect(Collectors.toList());
2302+
conditionResult.setFailedNode(failedNode);
23132303
}
23142304
}
23152305

0 commit comments

Comments
 (0)