Skip to content

Commit f4d540a

Browse files
authored
[Fix-17534][Service/Master] Add global parameters and varpool from current workflow instance and add them to start params list of the trigger request of a sub workflow (#17578)
1 parent 5838237 commit f4d540a

File tree

3 files changed

+294
-2
lines changed

3 files changed

+294
-2
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.executor.plugin.subworkflow;
1919

20+
import static java.util.Arrays.asList;
21+
import static org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.deserializeVarPool;
22+
2023
import org.apache.dolphinscheduler.common.enums.Flag;
2124
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2225
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -35,6 +38,7 @@
3538
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest;
3639
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
3740
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
41+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
3842
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters;
3943
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.AbstractLogicTask;
4044
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
@@ -43,6 +47,14 @@
4347
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
4448
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
4549

50+
import org.apache.commons.collections4.CollectionUtils;
51+
52+
import java.util.ArrayList;
53+
import java.util.Collections;
54+
import java.util.HashMap;
55+
import java.util.List;
56+
import java.util.Map;
57+
4658
import lombok.extern.slf4j.Slf4j;
4759

4860
import org.springframework.context.ApplicationContext;
@@ -222,6 +234,11 @@ private SubWorkflowLogicTaskRuntimeContext triggerNewSubWorkflow() {
222234
final ICommandParam commandParam =
223235
JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class);
224236

237+
final List<Property> paramList = mergeParams(asList(
238+
new ArrayList<>(deserializeVarPool(workflowInstance.getGlobalParams())),
239+
commandParam.getCommandParams(),
240+
new ArrayList<>(deserializeVarPool(workflowInstance.getVarPool()))));
241+
225242
final WorkflowManualTriggerRequest workflowManualTriggerRequest = WorkflowManualTriggerRequest.builder()
226243
.userId(taskExecutionContext.getExecutorId())
227244
.workflowDefinitionCode(subWorkflowDefinition.getCode())
@@ -233,8 +250,7 @@ private SubWorkflowLogicTaskRuntimeContext triggerNewSubWorkflow() {
233250
.workerGroup(workflowInstance.getWorkerGroup())
234251
.tenantCode(workflowInstance.getTenantCode())
235252
.environmentCode(workflowInstance.getEnvironmentCode())
236-
// todo: transport varpool and local params
237-
.startParamList(commandParam.getCommandParams())
253+
.startParamList(paramList)
238254
.dryRun(Flag.of(workflowInstance.getDryRun()))
239255
.build();
240256
final Integer subWorkflowInstanceId = applicationContext
@@ -243,6 +259,25 @@ private SubWorkflowLogicTaskRuntimeContext triggerNewSubWorkflow() {
243259
return SubWorkflowLogicTaskRuntimeContext.of(subWorkflowInstanceId);
244260
}
245261

262+
private List<Property> mergeParams(List<List<Property>> params) {
263+
if (CollectionUtils.isEmpty(params)) {
264+
return Collections.emptyList();
265+
}
266+
if (params.size() == 1) {
267+
return params.get(0);
268+
}
269+
Map<String, Property> result = new HashMap<>();
270+
for (List<Property> param : params) {
271+
if (CollectionUtils.isEmpty(param)) {
272+
continue;
273+
}
274+
for (Property property : param) {
275+
result.put(property.getProp(), property);
276+
}
277+
}
278+
return new ArrayList<>(result.values());
279+
}
280+
246281
private void upsertSubWorkflowRelation() {
247282
final WorkflowInstanceMapDao workflowInstanceMapDao = applicationContext.getBean(WorkflowInstanceMapDao.class);
248283
WorkflowInstanceRelation workflowInstanceRelation = workflowInstanceMapDao.queryWorkflowMapByParent(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.integration.cases;
19+
20+
import static com.google.common.truth.Truth.assertThat;
21+
import static org.awaitility.Awaitility.await;
22+
23+
import org.apache.dolphinscheduler.common.enums.Flag;
24+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
25+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
26+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
27+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
28+
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
29+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
30+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
31+
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
32+
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
33+
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
34+
35+
import java.time.Duration;
36+
import java.util.List;
37+
38+
import org.assertj.core.api.Assertions;
39+
import org.junit.jupiter.api.DisplayName;
40+
import org.junit.jupiter.api.Test;
41+
42+
/**
43+
* The integration test for validating sub workflow instances inherit global params from all parent flows.
44+
* Global params are asserted in each sub workflow instance and the fake_task in the sub-sub-workflow is used to verify the global params.
45+
*/
46+
class SubWorkflowInstanceGlobalParamsInheritanceTestCase extends AbstractMasterIntegrationTestCase {
47+
48+
@Test
49+
@DisplayName("Test subflows inherit global params from all parent flows")
50+
void testSubflowInheritsGlobalParamsFromParentFlows_with_oneSuccessTask() {
51+
final String yaml = "/it/start/workflow_with_sub_workflows_with_global_params.yaml";
52+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
53+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
54+
55+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
56+
.workflowDefinition(parentWorkflow)
57+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
58+
.build();
59+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
60+
61+
await()
62+
.atMost(Duration.ofMinutes(1))
63+
.untilAsserted(() -> {
64+
Assertions
65+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
66+
.matches(workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
67+
.matches(workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO);
68+
69+
final List<WorkflowInstance> subWorkflowInstance =
70+
repository.queryWorkflowInstance(context.getWorkflows().get(1));
71+
Assertions
72+
.assertThat(subWorkflowInstance)
73+
.hasSize(1)
74+
.satisfiesExactly(workflowInstance -> {
75+
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
76+
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
77+
78+
Assertions
79+
.assertThat(
80+
JSONUtils.toList(workflowInstance.getGlobalParams(), Property.class))
81+
.hasSize(1)
82+
.anySatisfy(property -> {
83+
assertThat(property.getProp()).isEqualTo("parentWorkflowParam");
84+
assertThat(property.getValue()).isEqualTo("parentWorkflowParamValue");
85+
});
86+
});
87+
88+
final List<WorkflowInstance> subSubWorkflowInstance =
89+
repository.queryWorkflowInstance(context.getWorkflows().get(2));
90+
Assertions
91+
.assertThat(subSubWorkflowInstance)
92+
.hasSize(1)
93+
.satisfiesExactly(workflowInstance -> {
94+
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
95+
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
96+
97+
Assertions
98+
.assertThat(
99+
JSONUtils.toList(workflowInstance.getGlobalParams(), Property.class))
100+
.hasSize(2)
101+
.anySatisfy(property -> {
102+
assertThat(property.getProp()).isEqualTo("parentWorkflowParam");
103+
assertThat(property.getValue()).isEqualTo("parentWorkflowParamValue");
104+
})
105+
.anySatisfy(property -> {
106+
assertThat(property.getProp()).isEqualTo("subWorkflowParam");
107+
assertThat(property.getValue()).isEqualTo("subWorkflowParamValue");
108+
});
109+
});
110+
111+
Assertions
112+
.assertThat(repository.queryTaskInstance(subSubWorkflowInstance.get(0).getId()))
113+
.satisfiesExactly(taskInstance -> {
114+
assertThat(taskInstance.getName()).isEqualTo("fake_task");
115+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
116+
});
117+
});
118+
119+
masterContainer.assertAllResourceReleased();
120+
}
121+
122+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_sub_workflow
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a fake workflow with single subflow task
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
globalParams: >
38+
[{
39+
"prop": "parentWorkflowParam",
40+
"value": "parentWorkflowParamValue",
41+
"direct": "IN",
42+
"type":"VARCHAR"
43+
}]
44+
executionType: PARALLEL
45+
- name: subworkflow_with_another_subworkflow
46+
code: 2
47+
version: 1
48+
projectCode: 1
49+
description: This is a fake sub workflow with another single subflow task
50+
releaseState: ONLINE
51+
createTime: 2024-08-12 00:00:00
52+
updateTime: 2021-08-12 00:00:00
53+
userId: 1
54+
executionType: PARALLEL
55+
- name: subworkflow_with_single_task
56+
code: 3
57+
version: 1
58+
projectCode: 1
59+
description: This is a fake sub sub workflow with single task
60+
releaseState: ONLINE
61+
createTime: 2024-08-12 00:00:00
62+
updateTime: 2021-08-12 00:00:00
63+
userId: 1
64+
globalParams: >
65+
[{
66+
"prop": "subWorkflowParam",
67+
"value": "subWorkflowParamValue",
68+
"direct": "IN",
69+
"type":"VARCHAR"
70+
}]
71+
executionType: PARALLEL
72+
73+
tasks:
74+
- name: sub_workflow_task
75+
code: 1
76+
version: 1
77+
projectCode: 1
78+
userId: 1
79+
taskType: SUB_WORKFLOW
80+
taskParams: '{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
81+
workerGroup: default
82+
createTime: 2024-08-12 00:00:00
83+
updateTime: 2021-08-12 00:00:00
84+
taskExecuteType: BATCH
85+
- name: sub_sub_workflow_task
86+
code: 2
87+
version: 1
88+
projectCode: 1
89+
userId: 1
90+
taskType: SUB_WORKFLOW
91+
taskParams: '{"localParams":[],"resourceList":[],"workflowDefinitionCode":3}'
92+
workerGroup: default
93+
createTime: 2024-08-12 00:00:00
94+
updateTime: 2021-08-12 00:00:00
95+
taskExecuteType: BATCH
96+
- name: fake_task
97+
code: 3
98+
version: 1
99+
projectCode: 1
100+
userId: 1
101+
taskType: LogicFakeTask
102+
taskParams: '{"localParams":null,"varPool":[],"shellScript": "if [ \"${parentWorkflowParam}\" != \"parentWorkflowParamValue\" ]; then\n exit 1\nelif [ \"${subWorkflowParam}\" != \"subWorkflowParamValue\" ]; then\n exit 1\nelse\n exit 0\nfi"}'
103+
workerGroup: default
104+
createTime: 2024-08-12 00:00:00
105+
updateTime: 2021-08-12 00:00:00
106+
taskExecuteType: BATCH
107+
108+
taskRelations:
109+
- projectCode: 1
110+
workflowDefinitionCode: 1
111+
workflowDefinitionVersion: 1
112+
preTaskCode: 0
113+
preTaskVersion: 0
114+
postTaskCode: 1
115+
postTaskVersion: 1
116+
createTime: 2024-08-12 00:00:00
117+
updateTime: 2024-08-12 00:00:00
118+
- projectCode: 1
119+
workflowDefinitionCode: 2
120+
workflowDefinitionVersion: 1
121+
preTaskCode: 0
122+
preTaskVersion: 0
123+
postTaskCode: 2
124+
postTaskVersion: 1
125+
createTime: 2024-08-12 00:00:00
126+
updateTime: 2024-08-12 00:00:00
127+
- projectCode: 1
128+
workflowDefinitionCode: 3
129+
workflowDefinitionVersion: 1
130+
preTaskCode: 0
131+
preTaskVersion: 0
132+
postTaskCode: 3
133+
postTaskVersion: 1
134+
createTime: 2024-08-12 00:00:00
135+
updateTime: 2024-08-12 00:00:00

0 commit comments

Comments
 (0)