Skip to content

Commit 0bc817e

Browse files
authored
[Fix-17710][Master] Fix master task dispatch failure by filtering out null map keys. (#17711)
1 parent 9ce565e commit 0bc817e

File tree

7 files changed

+1038
-335
lines changed

7 files changed

+1038
-335
lines changed

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,50 @@ public void testStartWorkflow_fakeTask_usingLocalParamOverWriteByVarPool() {
775775
masterContainer.assertAllResourceReleased();
776776
}
777777

778+
@Test
779+
@DisplayName("Test start a workflow which using null key params")
780+
public void testStartWorkflow_usingNullKeyParam() {
781+
final String yaml = "/it/start/workflow_with_null_key_param.yaml";
782+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
783+
final WorkflowDefinition workflow = context.getOneWorkflow();
784+
785+
final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder()
786+
.commandParams(Lists.newArrayList(Property.builder()
787+
.prop(null)
788+
.direct(Direct.IN)
789+
.type(DataType.VARCHAR)
790+
.value("commandParam")
791+
.build()))
792+
.build();
793+
794+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
795+
.workflowDefinition(workflow)
796+
.runWorkflowCommandParam(runWorkflowCommandParam)
797+
.build();
798+
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
799+
800+
await()
801+
.atMost(Duration.ofMinutes(1))
802+
.untilAsserted(() -> {
803+
Assertions
804+
.assertThat(repository.queryWorkflowInstance(workflow))
805+
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
806+
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
807+
Assertions
808+
.assertThat(repository.queryTaskInstance(workflow))
809+
.hasSize(2)
810+
.anySatisfy(taskInstance -> {
811+
assertThat(taskInstance.getName()).isEqualTo("A");
812+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
813+
})
814+
.anySatisfy(taskInstance -> {
815+
assertThat(taskInstance.getName()).isEqualTo("B");
816+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
817+
});
818+
});
819+
masterContainer.assertAllResourceReleased();
820+
}
821+
778822
@Test
779823
@DisplayName("Test start a workflow with one fake task(A) failed")
780824
public void testStartWorkflow_with_oneFailedTask() {
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2024-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_fake_task_success
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a fake workflow with single task
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2024-08-12 00:00:00
36+
userId: 1
37+
globalParams: '[{"prop":null,"value":"workflowParam","direct":"IN","type":"VARCHAR"}]'
38+
executionType: PARALLEL
39+
40+
tasks:
41+
- name: A
42+
code: 1
43+
version: 1
44+
projectCode: 1
45+
userId: 1
46+
taskType: LogicFakeTask
47+
taskParams: >
48+
{
49+
"localParams": [
50+
{
51+
"prop": "",
52+
"direct": "IN",
53+
"type": "VARCHAR",
54+
"value": ""
55+
}
56+
],
57+
"shellScript": "echo 111",
58+
"resourceList": []
59+
}
60+
workerGroup: default
61+
createTime: 2024-08-12 00:00:00
62+
updateTime: 2024-08-12 00:00:00
63+
taskExecuteType: BATCH
64+
- name: B
65+
code: 2
66+
version: 1
67+
projectCode: 1
68+
userId: 1
69+
taskType: LogicFakeTask
70+
taskParams: >
71+
{
72+
"localParams": [
73+
{
74+
"prop": null,
75+
"direct": "IN",
76+
"type": "VARCHAR",
77+
"value": ""
78+
}
79+
],
80+
"shellScript": "echo 111",
81+
"resourceList": []
82+
}
83+
workerGroup: default
84+
createTime: 2024-08-12 00:00:00
85+
updateTime: 2024-08-12 00:00:00
86+
taskExecuteType: BATCH
87+
88+
89+
90+
taskRelations:
91+
- projectCode: 1
92+
workflowDefinitionCode: 1
93+
workflowDefinitionVersion: 1
94+
preTaskCode: 0
95+
preTaskVersion: 0
96+
postTaskCode: 1
97+
postTaskVersion: 1
98+
createTime: 2024-08-12 00:00:00
99+
updateTime: 2024-08-12 00:00:00
100+
- projectCode: 1
101+
workflowDefinitionCode: 1
102+
workflowDefinitionVersion: 1
103+
preTaskCode: 0
104+
preTaskVersion: 0
105+
postTaskCode: 2
106+
postTaskVersion: 1
107+
createTime: 2024-08-12 00:00:00
108+
updateTime: 2024-08-12 00:00:00

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java

Lines changed: 84 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@
4848
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
4949
import org.apache.dolphinscheduler.plugin.task.api.utils.PropertyUtils;
5050
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
51+
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
5152

5253
import org.apache.commons.collections4.CollectionUtils;
5354
import org.apache.commons.lang3.StringUtils;
5455

5556
import java.util.Collections;
5657
import java.util.Date;
5758
import java.util.HashMap;
58-
import java.util.Iterator;
5959
import java.util.LinkedHashMap;
6060
import java.util.List;
6161
import java.util.Map;
@@ -66,10 +66,12 @@
6666
import javax.annotation.Nullable;
6767

6868
import lombok.NonNull;
69+
import lombok.extern.slf4j.Slf4j;
6970

7071
import org.springframework.beans.factory.annotation.Autowired;
7172
import org.springframework.stereotype.Component;
7273

74+
@Slf4j
7375
@Component
7476
public class CuringParamsServiceImpl implements CuringParamsService {
7577

@@ -160,7 +162,13 @@ public Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, Stri
160162
}
161163

162164
/**
163-
* Generate prepare params include project params, global parameters, local parameters, built-in parameters, varpool, start-up params.
165+
* Prepares the final map of task execution parameters by merging parameters from multiple sources
166+
* in a well-defined priority order. The resulting map is guaranteed to contain only valid entries:
167+
* <ul>
168+
* <li>Keys are non-null and non-blank strings</li>
169+
* <li>Values are non-null {@link Property} objects</li>
170+
* </ul>
171+
*
164172
* <p> The priority of the parameters is as follows:
165173
* <p> varpool > command parameters > local parameters > global parameters > project parameters > built-in parameters
166174
* todo: Use TaskRuntimeParams to represent this.
@@ -180,87 +188,101 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
180188
String workflowDefinitionName) {
181189
Map<String, Property> prepareParamsMap = new HashMap<>();
182190

183-
// assign value to definedParams here
184-
Map<String, Property> globalParams = parseGlobalParamsMap(workflowInstance);
185-
186-
// combining local and global parameters
187-
Map<String, Property> localParams = parameters.getInputLocalParametersMap();
188-
189-
// stream pass params
190-
List<Property> varPools = parseVarPool(taskInstance);
191-
192-
// if it is a complement,
193-
// you need to pass in the task instance id to locate the time
194-
// of the process instance complement
191+
// If it is a complement, you need to pass in the task instance id
192+
// to locate the time of the process instance complement.
195193
ICommandParam commandParam = JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class);
194+
if (commandParam == null) {
195+
throw new ServiceException(String.format("Failed to parse command parameter for workflow instance %s",
196+
workflowInstance.getId()));
197+
}
196198
String timeZone = commandParam.getTimeZone();
197199

198-
// built-in params
199-
Map<String, String> builtInParams =
200-
setBuiltInParamsMap(taskInstance, workflowInstance, timeZone, projectName, workflowDefinitionName);
200+
// 1. Built-in parameters (lowest precedence)
201+
Map<String, String> builtInParams = setBuiltInParamsMap(
202+
taskInstance, workflowInstance, timeZone, projectName, workflowDefinitionName);
203+
safePutAll(prepareParamsMap, ParameterUtils.getUserDefParamsMap(builtInParams));
201204

202-
// project-level params
205+
// 2. Project-level parameters
203206
Map<String, Property> projectParams = getProjectParameterMap(taskInstance.getProjectCode());
207+
safePutAll(prepareParamsMap, projectParams);
204208

205-
if (MapUtils.isNotEmpty(builtInParams)) {
206-
prepareParamsMap.putAll(ParameterUtils.getUserDefParamsMap(builtInParams));
207-
}
208-
209-
if (MapUtils.isNotEmpty(projectParams)) {
210-
prepareParamsMap.putAll(projectParams);
211-
}
212-
213-
if (MapUtils.isNotEmpty(globalParams)) {
214-
prepareParamsMap.putAll(globalParams);
215-
}
209+
// 3. Workflow global parameters
210+
Map<String, Property> globalParams = parseGlobalParamsMap(workflowInstance);
211+
safePutAll(prepareParamsMap, globalParams);
216212

217-
if (MapUtils.isNotEmpty(localParams)) {
218-
prepareParamsMap.putAll(localParams);
219-
}
213+
// 4. Task-local parameters
214+
Map<String, Property> localParams = parameters.getInputLocalParametersMap();
215+
safePutAll(prepareParamsMap, localParams);
220216

217+
// 5. Command-line / complement parameters
221218
if (CollectionUtils.isNotEmpty(commandParam.getCommandParams())) {
222-
prepareParamsMap.putAll(commandParam.getCommandParams().stream()
223-
.collect(Collectors.toMap(Property::getProp, Function.identity())));
219+
Map<String, Property> commandParamsMap = commandParam.getCommandParams().stream()
220+
.filter(prop -> StringUtils.isNotBlank(prop.getProp()))
221+
.collect(Collectors.toMap(
222+
Property::getProp,
223+
Function.identity(),
224+
(v1, v2) -> v2 // retain last on duplicate key
225+
));
226+
safePutAll(prepareParamsMap, commandParamsMap);
224227
}
225228

229+
// 6. VarPool: override values only for existing IN-direction parameters
230+
List<Property> varPools = parseVarPool(taskInstance);
226231
if (CollectionUtils.isNotEmpty(varPools)) {
227-
// overwrite the in parameter by varPool
228232
for (Property varPool : varPools) {
229-
Property property = prepareParamsMap.get(varPool.getProp());
230-
if (property == null || property.getDirect() != Direct.IN) {
233+
if (StringUtils.isBlank(varPool.getProp())) {
231234
continue;
232235
}
233-
property.setValue(varPool.getValue());
236+
Property targetParam = prepareParamsMap.get(varPool.getProp());
237+
if (targetParam != null && Direct.IN.equals(targetParam.getDirect())) {
238+
targetParam.setValue(varPool.getValue());
239+
}
234240
}
235241
}
236242

237-
Iterator<Map.Entry<String, Property>> iter = prepareParamsMap.entrySet().iterator();
238-
while (iter.hasNext()) {
239-
Map.Entry<String, Property> en = iter.next();
240-
Property property = en.getValue();
241-
242-
if (StringUtils.isNotEmpty(property.getValue())
243-
&& property.getValue().contains(Constants.FUNCTION_START_WITH)) {
244-
/**
245-
* local parameter refers to global parameter with the same name
246-
* note: the global parameters of the process instance here are solidified parameters,
247-
* and there are no variables in them.
248-
*/
249-
String val = property.getValue();
250-
251-
// handle some chain parameter assign, such as `{"var1": "${var2}", "var2": 1}` should be convert to
252-
// `{"var1": 1, "var2": 1}`
253-
val = convertParameterPlaceholders(val, prepareParamsMap);
254-
property.setValue(val);
255-
}
243+
// 7. Inject business/scheduling parameters (e.g., ${datetime}), which may contain or reference placeholders
244+
Map<String, Property> businessParams = preBuildBusinessParams(workflowInstance);
245+
safePutAll(prepareParamsMap, businessParams);
246+
247+
// 8. Resolve all placeholders (e.g., "${output_dir}") using the current parameter context
248+
resolvePlaceholders(prepareParamsMap);
249+
250+
return prepareParamsMap;
251+
}
252+
253+
/**
254+
* Safely merges entries from the {@code source} map into the {@code target} map,
255+
* skipping any entry with a {@code null}, empty, or blank key, or a {@code null} value.
256+
*
257+
* @param target the destination map to merge into (must not be null)
258+
* @param source the source map whose valid entries will be copied (may be null or empty)
259+
*/
260+
private void safePutAll(Map<String, Property> target, Map<String, Property> source) {
261+
if (MapUtils.isEmpty(source)) {
262+
return;
256263
}
264+
source.forEach((key, value) -> {
265+
if (StringUtils.isNotBlank(key) && value != null) {
266+
target.put(key, value);
267+
} else {
268+
log.warn("Skipped invalid parameter entry: key='{}', value={}", key, value);
269+
}
270+
});
271+
}
257272

258-
// put schedule time param to params map
259-
Map<String, Property> paramsMap = preBuildBusinessParams(workflowInstance);
260-
if (MapUtils.isNotEmpty(paramsMap)) {
261-
prepareParamsMap.putAll(paramsMap);
273+
/**
274+
* Resolves placeholder expressions (e.g., "${var}") in parameter values by substituting them
275+
* with actual values from the current {@code paramsMap}.
276+
*
277+
* @param paramsMap the map of parameters (key: parameter name, value: {@link Property}) to resolve
278+
*/
279+
private void resolvePlaceholders(Map<String, Property> paramsMap) {
280+
for (Property prop : paramsMap.values()) {
281+
String val = prop.getValue();
282+
if (StringUtils.isNotEmpty(val) && val.contains(Constants.FUNCTION_START_WITH)) {
283+
prop.setValue(convertParameterPlaceholders(val, paramsMap));
284+
}
262285
}
263-
return prepareParamsMap;
264286
}
265287

266288
/**

0 commit comments

Comments
 (0)