Skip to content

Commit 4e40f8d

Browse files
段晓雄cursoragent
andcommitted
feat(flink): move parameter replacement to getScript(), refactor tests without Mockito
- Move FileUtils.generateScriptFile() from init() to getScript() - Apply ParameterUtils.convertParameterPlaceholders() in getScript() before script generation - Refactor FlinkTaskTest and FlinkStreamTaskTest without Mockito - Use @tempdir and assert on generated file content Ref: PR apache#17987 Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent f36114f commit 4e40f8d

File tree

4 files changed

+179
-178
lines changed
  • dolphinscheduler-task-plugin

4 files changed

+179
-178
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,27 +52,11 @@ public FlinkStreamTask(TaskExecutionContext taskExecutionContext) {
5252
public void init() {
5353

5454
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkStreamParameters.class);
55-
// Replace parameter placeholders (e.g. ${system.biz.date}, $[yyyyMMdd]) in init script and main script
56-
if (flinkParameters != null) {
57-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
58-
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
59-
60-
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
61-
flinkParameters.setInitScript(
62-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
63-
}
64-
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
65-
flinkParameters.setRawScript(
66-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
67-
}
68-
}
6955
log.info("Initialize Flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters));
7056

7157
if (flinkParameters == null || !flinkParameters.checkParameters()) {
7258
throw new RuntimeException("flink task params is not valid");
7359
}
74-
75-
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
7660
}
7761

7862
/**
@@ -82,6 +66,20 @@ public void init() {
8266
*/
8367
@Override
8468
protected String getScript() {
69+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
70+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
71+
72+
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
73+
flinkParameters.setInitScript(
74+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
75+
}
76+
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
77+
flinkParameters.setRawScript(
78+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
79+
}
80+
81+
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
82+
8583
// flink run/run-application [OPTIONS] <jar-file> <arguments>
8684
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
8785
return args.stream().collect(Collectors.joining(" "));

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,88 +17,110 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.flink;
1919

20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.when;
20+
import static org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
2221

2322
import org.apache.dolphinscheduler.common.utils.JSONUtils;
24-
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
2523
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
26-
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
27-
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
2824
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
29-
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
3025

26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.Files;
28+
import java.nio.file.Path;
3129
import java.util.HashMap;
3230
import java.util.Map;
3331

3432
import org.junit.jupiter.api.Assertions;
3533
import org.junit.jupiter.api.Test;
36-
import org.junit.jupiter.api.extension.ExtendWith;
37-
import org.mockito.MockedStatic;
38-
import org.mockito.Mockito;
39-
import org.mockito.junit.jupiter.MockitoExtension;
34+
import org.junit.jupiter.api.io.TempDir;
4035

4136
/**
42-
* Test FlinkStreamTask parameter replacement for initScript and rawScript.
37+
* FlinkStreamTask unit test. Verifies parameter replacement in initScript and rawScript without Mockito.
4338
*/
44-
@ExtendWith(MockitoExtension.class)
45-
class FlinkStreamTaskTest {
39+
public class FlinkStreamTaskTest {
40+
41+
@TempDir
42+
Path tempDir;
4643

4744
@Test
48-
void testInitReplacesPlaceholdersInInitScriptAndRawScript() {
45+
public void testParameterReplacementInScript() throws Exception {
46+
String executePath = tempDir.toString();
47+
String taskAppId = "test-app";
48+
49+
FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
50+
flinkParameters.setProgramType(ProgramType.SQL);
51+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
52+
flinkParameters.setParallelism(2);
53+
flinkParameters.setInitScript("SET 'date' = '${bizdate}';");
54+
flinkParameters.setRawScript("SELECT * FROM logs WHERE dt = '${bizdate}' AND env = '${env}'");
55+
4956
Map<String, Property> prepareParamsMap = new HashMap<>();
50-
prepareParamsMap.put("bizdate", new Property("bizdate", Direct.IN, DataType.VARCHAR, "20250601"));
51-
prepareParamsMap.put("env", new Property("env", Direct.IN, DataType.VARCHAR, "prod"));
52-
53-
FlinkStreamParameters taskParams = new FlinkStreamParameters();
54-
taskParams.setProgramType(ProgramType.SQL);
55-
taskParams.setInitScript("SET 'date' = '${bizdate}';");
56-
taskParams.setRawScript("SELECT * FROM logs WHERE dt = '${bizdate}' AND env = '${env}'");
57-
String taskParamsJson = JSONUtils.toJsonString(taskParams);
58-
59-
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
60-
when(taskExecutionContext.getTaskParams()).thenReturn(taskParamsJson);
61-
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
62-
63-
try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class)) {
64-
fileUtilsMock.when(() -> FileUtils.generateScriptFile(Mockito.any(), Mockito.any())).then(inv -> null);
65-
66-
FlinkStreamTask task = new FlinkStreamTask(taskExecutionContext);
67-
task.init();
68-
69-
AbstractParameters params = task.getParameters();
70-
Assertions.assertInstanceOf(FlinkStreamParameters.class, params);
71-
FlinkStreamParameters flinkParams = (FlinkStreamParameters) params;
72-
Assertions.assertEquals("SET 'date' = '20250601';", flinkParams.getInitScript());
73-
Assertions.assertEquals("SELECT * FROM logs WHERE dt = '20250601' AND env = 'prod'",
74-
flinkParams.getRawScript());
75-
}
57+
prepareParamsMap.put("bizdate", new Property("bizdate", null, null, "20250601"));
58+
prepareParamsMap.put("env", new Property("env", null, null, "prod"));
59+
60+
TaskExecutionContext context = new TaskExecutionContext();
61+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
62+
context.setExecutePath(executePath);
63+
context.setTaskAppId(taskAppId);
64+
context.setPrepareParamsMap(prepareParamsMap);
65+
66+
FlinkStreamTaskForTest task = new FlinkStreamTaskForTest(context);
67+
task.init();
68+
task.callGetScript();
69+
70+
String initScriptPath = String.format("%s/%s_init.sql", executePath, taskAppId);
71+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
72+
73+
String initContent = Files.readString(Path.of(initScriptPath), StandardCharsets.UTF_8);
74+
String nodeContent = Files.readString(Path.of(nodeScriptPath), StandardCharsets.UTF_8);
75+
76+
Assertions.assertTrue(initContent.contains("SET 'date' = '20250601';"),
77+
"Expected ${bizdate} to be replaced, got: " + initContent);
78+
Assertions.assertTrue(nodeContent.contains("dt = '20250601'"),
79+
"Expected ${bizdate} to be replaced, got: " + nodeContent);
80+
Assertions.assertTrue(nodeContent.contains("env = 'prod'"),
81+
"Expected ${env} to be replaced, got: " + nodeContent);
7682
}
7783

7884
@Test
79-
void testInitReplacesTimePlaceholderWhenParamMapContainsScheduleTime() {
85+
public void testParameterReplacementTimePlaceholder() throws Exception {
86+
String executePath = tempDir.toString();
87+
String taskAppId = "test-time";
88+
89+
FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
90+
flinkParameters.setProgramType(ProgramType.SQL);
91+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
92+
flinkParameters.setParallelism(2);
93+
flinkParameters.setInitScript("");
94+
flinkParameters.setRawScript("INSERT INTO t SELECT * FROM s WHERE dt = '$[yyyyMMdd]'");
95+
8096
Map<String, Property> prepareParamsMap = new HashMap<>();
81-
prepareParamsMap.put(TaskConstants.PARAMETER_DATETIME,
82-
new Property(TaskConstants.PARAMETER_DATETIME, Direct.IN, DataType.VARCHAR, "20210815080000"));
97+
prepareParamsMap.put(PARAMETER_DATETIME, new Property(PARAMETER_DATETIME, null, null, "20210815080000"));
98+
99+
TaskExecutionContext context = new TaskExecutionContext();
100+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
101+
context.setExecutePath(executePath);
102+
context.setTaskAppId(taskAppId);
103+
context.setPrepareParamsMap(prepareParamsMap);
83104

84-
FlinkStreamParameters taskParams = new FlinkStreamParameters();
85-
taskParams.setProgramType(ProgramType.SQL);
86-
taskParams.setInitScript("");
87-
taskParams.setRawScript("INSERT INTO t SELECT * FROM s WHERE dt = '$[yyyyMMdd]'");
88-
String taskParamsJson = JSONUtils.toJsonString(taskParams);
105+
FlinkStreamTaskForTest task = new FlinkStreamTaskForTest(context);
106+
task.init();
107+
task.callGetScript();
89108

90-
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
91-
when(taskExecutionContext.getTaskParams()).thenReturn(taskParamsJson);
92-
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
109+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
110+
String nodeContent = Files.readString(Path.of(nodeScriptPath), StandardCharsets.UTF_8);
93111

94-
try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class)) {
95-
fileUtilsMock.when(() -> FileUtils.generateScriptFile(Mockito.any(), Mockito.any())).then(inv -> null);
112+
Assertions.assertTrue(nodeContent.contains("dt = '20210815'"),
113+
"Expected $[yyyyMMdd] to be replaced with 20210815, got: " + nodeContent);
114+
}
96115

97-
FlinkStreamTask task = new FlinkStreamTask(taskExecutionContext);
98-
task.init();
116+
private static class FlinkStreamTaskForTest extends FlinkStreamTask {
117+
118+
FlinkStreamTaskForTest(TaskExecutionContext context) {
119+
super(context);
120+
}
99121

100-
FlinkStreamParameters flinkParams = (FlinkStreamParameters) task.getParameters();
101-
Assertions.assertEquals("INSERT INTO t SELECT * FROM s WHERE dt = '20210815'", flinkParams.getRawScript());
122+
String callGetScript() {
123+
return getScript();
102124
}
103125
}
104126
}

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,28 +62,11 @@ public FlinkTask(TaskExecutionContext taskExecutionContext) {
6262
public void init() {
6363

6464
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
65-
// Replace parameter placeholders (e.g. ${system.biz.date}, $[yyyyMMdd]) in init script and main script
66-
if (flinkParameters != null) {
67-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
68-
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
69-
70-
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
71-
flinkParameters.setInitScript(
72-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
73-
}
74-
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
75-
flinkParameters.setRawScript(
76-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
77-
}
78-
}
79-
8065
log.info("Initialize flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters));
8166

8267
if (flinkParameters == null || !flinkParameters.checkParameters()) {
8368
throw new RuntimeException("flink task params is not valid");
8469
}
85-
86-
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
8770
}
8871

8972
/**
@@ -93,6 +76,20 @@ public void init() {
9376
*/
9477
@Override
9578
protected String getScript() {
79+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
80+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
81+
82+
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
83+
flinkParameters.setInitScript(
84+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
85+
}
86+
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
87+
flinkParameters.setRawScript(
88+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
89+
}
90+
91+
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
92+
9693
// flink run/run-application [OPTIONS] <jar-file> <arguments>
9794
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
9895
return args.stream().collect(Collectors.joining(" "));

0 commit comments

Comments
 (0)