Skip to content

Commit e245f35

Browse files
段晓雄cursoragent
andcommitted
feat(flink): move parameter replacement and script generation to getScript(), remove Mockito from FlinkTaskTest
- Move FileUtils.generateScriptFile() from init() to getScript() - Apply ParameterUtils.convertParameterPlaceholders() in getScript() before script generation - Refactor FlinkTaskTest without Mockito, use @tempdir and assert on generated file content Ref: PR #17987 Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent f36114f commit e245f35

File tree

3 files changed

+99
-121
lines changed
  • dolphinscheduler-task-plugin
    • dolphinscheduler-task-flink/src

3 files changed

+99
-121
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,27 +52,11 @@ public FlinkStreamTask(TaskExecutionContext taskExecutionContext) {
5252
public void init() {
5353

5454
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkStreamParameters.class);
55-
// Replace parameter placeholders (e.g. ${system.biz.date}, $[yyyyMMdd]) in init script and main script
56-
if (flinkParameters != null) {
57-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
58-
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
59-
60-
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
61-
flinkParameters.setInitScript(
62-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
63-
}
64-
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
65-
flinkParameters.setRawScript(
66-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
67-
}
68-
}
6955
log.info("Initialize Flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters));
7056

7157
if (flinkParameters == null || !flinkParameters.checkParameters()) {
7258
throw new RuntimeException("flink task params is not valid");
7359
}
74-
75-
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
7660
}
7761

7862
/**
@@ -82,6 +66,20 @@ public void init() {
8266
*/
8367
@Override
8468
protected String getScript() {
69+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
70+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
71+
72+
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
73+
flinkParameters.setInitScript(
74+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
75+
}
76+
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
77+
flinkParameters.setRawScript(
78+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
79+
}
80+
81+
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
82+
8583
// flink run/run-application [OPTIONS] <jar-file> <arguments>
8684
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
8785
return args.stream().collect(Collectors.joining(" "));

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,28 +62,11 @@ public FlinkTask(TaskExecutionContext taskExecutionContext) {
6262
public void init() {
6363

6464
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
65-
// Replace parameter placeholders (e.g. ${system.biz.date}, $[yyyyMMdd]) in init script and main script
66-
if (flinkParameters != null) {
67-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
68-
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
69-
70-
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
71-
flinkParameters.setInitScript(
72-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
73-
}
74-
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
75-
flinkParameters.setRawScript(
76-
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
77-
}
78-
}
79-
8065
log.info("Initialize flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters));
8166

8267
if (flinkParameters == null || !flinkParameters.checkParameters()) {
8368
throw new RuntimeException("flink task params is not valid");
8469
}
85-
86-
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
8770
}
8871

8972
/**
@@ -93,6 +76,20 @@ public void init() {
9376
*/
9477
@Override
9578
protected String getScript() {
79+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
80+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
81+
82+
if (StringUtils.isNotBlank(flinkParameters.getInitScript())) {
83+
flinkParameters.setInitScript(
84+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getInitScript(), stringParams));
85+
}
86+
if (StringUtils.isNotBlank(flinkParameters.getRawScript())) {
87+
flinkParameters.setRawScript(
88+
ParameterUtils.convertParameterPlaceholders(flinkParameters.getRawScript(), stringParams));
89+
}
90+
91+
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
92+
9693
// flink run/run-application [OPTIONS] <jar-file> <arguments>
9794
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
9895
return args.stream().collect(Collectors.joining(" "));

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java

Lines changed: 71 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -17,120 +17,103 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.flink;
1919

20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.when;
20+
import static org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
2221

2322
import org.apache.dolphinscheduler.common.utils.JSONUtils;
24-
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
2523
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
26-
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
27-
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
2824
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
29-
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
3025

26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.Files;
28+
import java.nio.file.Path;
3129
import java.util.HashMap;
3230
import java.util.Map;
3331

3432
import org.junit.jupiter.api.Assertions;
3533
import org.junit.jupiter.api.Test;
36-
import org.junit.jupiter.api.extension.ExtendWith;
37-
import org.mockito.MockedStatic;
38-
import org.mockito.Mockito;
39-
import org.mockito.junit.jupiter.MockitoExtension;
34+
import org.junit.jupiter.api.io.TempDir;
4035

4136
/**
42-
* Test FlinkTask parameter replacement for initScript and rawScript.
37+
* FlinkTask unit test. Verifies parameter replacement in initScript and rawScript without Mockito.
4338
*/
44-
@ExtendWith(MockitoExtension.class)
45-
class FlinkTaskTest {
39+
public class FlinkTaskTest {
4640

47-
@Test
48-
void testInitReplacesPlaceholdersInInitScriptAndRawScript() {
49-
Map<String, Property> prepareParamsMap = new HashMap<>();
50-
prepareParamsMap.put("bizdate", new Property("bizdate", Direct.IN, DataType.VARCHAR, "20250101"));
51-
prepareParamsMap.put("customVar", new Property("customVar", Direct.IN, DataType.VARCHAR, "hello"));
52-
53-
FlinkParameters taskParams = new FlinkParameters();
54-
taskParams.setProgramType(ProgramType.SQL);
55-
taskParams.setInitScript("SET 'dt' = '${bizdate}';");
56-
taskParams.setRawScript("SELECT * FROM orders WHERE dt = '${bizdate}' AND tag = '${customVar}'");
57-
String taskParamsJson = JSONUtils.toJsonString(taskParams);
58-
59-
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
60-
when(taskExecutionContext.getTaskParams()).thenReturn(taskParamsJson);
61-
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
62-
63-
try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class)) {
64-
fileUtilsMock.when(() -> FileUtils.generateScriptFile(Mockito.any(), Mockito.any())).then(inv -> null);
65-
66-
FlinkTask task = new FlinkTask(taskExecutionContext);
67-
task.init();
68-
69-
AbstractParameters params = task.getParameters();
70-
Assertions.assertInstanceOf(FlinkParameters.class, params);
71-
FlinkParameters flinkParams = (FlinkParameters) params;
72-
Assertions.assertEquals("SET 'dt' = '20250101';", flinkParams.getInitScript());
73-
Assertions.assertEquals("SELECT * FROM orders WHERE dt = '20250101' AND tag = 'hello'",
74-
flinkParams.getRawScript());
75-
}
76-
}
41+
@TempDir
42+
Path tempDir;
7743

7844
@Test
79-
void testInitReplacesTimePlaceholderWhenParamMapContainsScheduleTime() {
45+
public void testParameterReplacementInScript() throws Exception {
46+
String executePath = tempDir.toString();
47+
String taskAppId = "test-app";
48+
49+
FlinkParameters flinkParameters = new FlinkParameters();
50+
flinkParameters.setProgramType(ProgramType.SQL);
51+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
52+
flinkParameters.setParallelism(2);
53+
flinkParameters.setInitScript("set batch_size=${batch_size};");
54+
flinkParameters.setRawScript("SELECT * FROM logs WHERE dt='$[yyyyMMdd]';");
55+
8056
Map<String, Property> prepareParamsMap = new HashMap<>();
81-
prepareParamsMap.put(TaskConstants.PARAMETER_DATETIME,
82-
new Property(TaskConstants.PARAMETER_DATETIME, Direct.IN, DataType.VARCHAR, "20201201120000"));
57+
prepareParamsMap.put("batch_size", new Property("batch_size", null, null, "1000"));
58+
prepareParamsMap.put(PARAMETER_DATETIME, new Property(PARAMETER_DATETIME, null, null, "20201201120000"));
8359

84-
FlinkParameters taskParams = new FlinkParameters();
85-
taskParams.setProgramType(ProgramType.SQL);
86-
taskParams.setInitScript("");
87-
taskParams.setRawScript("SELECT * FROM t WHERE dt = '$[yyyyMMdd]'");
88-
String taskParamsJson = JSONUtils.toJsonString(taskParams);
60+
TaskExecutionContext context = new TaskExecutionContext();
61+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
62+
context.setExecutePath(executePath);
63+
context.setTaskAppId(taskAppId);
64+
context.setPrepareParamsMap(prepareParamsMap);
8965

90-
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
91-
when(taskExecutionContext.getTaskParams()).thenReturn(taskParamsJson);
92-
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
66+
FlinkTaskForTest task = new FlinkTaskForTest(context);
67+
task.init();
68+
task.callGetScript();
9369

94-
try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class)) {
95-
fileUtilsMock.when(() -> FileUtils.generateScriptFile(Mockito.any(), Mockito.any())).then(inv -> null);
70+
String initScriptPath = String.format("%s/%s_init.sql", executePath, taskAppId);
71+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
9672

97-
FlinkTask task = new FlinkTask(taskExecutionContext);
98-
task.init();
73+
String initContent = Files.readString(Path.of(initScriptPath), StandardCharsets.UTF_8);
74+
String nodeContent = Files.readString(Path.of(nodeScriptPath), StandardCharsets.UTF_8);
9975

100-
FlinkParameters flinkParams = (FlinkParameters) task.getParameters();
101-
// $[yyyyMMdd] with schedule time 20201201120000 -> 20201201
102-
Assertions.assertEquals("SELECT * FROM t WHERE dt = '20201201'", flinkParams.getRawScript());
103-
}
76+
Assertions.assertTrue(initContent.contains("set batch_size=1000;"),
77+
"Expected ${batch_size} to be replaced with 1000, got: " + initContent);
78+
Assertions.assertTrue(nodeContent.contains("dt='20201201'"),
79+
"Expected $[yyyyMMdd] to be replaced with 20201201, got: " + nodeContent);
10480
}
10581

10682
@Test
107-
void testInitWithEmptyPrepareParamsMapStillReplacesTimePlaceholders() {
108-
Map<String, Property> prepareParamsMap = new HashMap<>();
83+
public void testParameterReplacementWithNullParamsMap() throws Exception {
84+
String executePath = tempDir.toString();
85+
String taskAppId = "test-null-params";
86+
87+
FlinkParameters flinkParameters = new FlinkParameters();
88+
flinkParameters.setProgramType(ProgramType.SQL);
89+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
90+
flinkParameters.setParallelism(2);
91+
flinkParameters.setInitScript("");
92+
flinkParameters.setRawScript("SELECT 1;");
93+
94+
TaskExecutionContext context = new TaskExecutionContext();
95+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
96+
context.setExecutePath(executePath);
97+
context.setTaskAppId(taskAppId);
98+
context.setPrepareParamsMap(null);
99+
100+
FlinkTaskForTest task = new FlinkTaskForTest(context);
101+
task.init();
102+
String script = task.callGetScript();
103+
104+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
105+
String nodeContent = Files.readString(Path.of(nodeScriptPath), StandardCharsets.UTF_8);
106+
Assertions.assertEquals("SELECT 1;", nodeContent.trim());
107+
Assertions.assertNotNull(script);
108+
}
109+
110+
private static class FlinkTaskForTest extends FlinkTask {
111+
FlinkTaskForTest(TaskExecutionContext context) {
112+
super(context);
113+
}
109114

110-
FlinkParameters taskParams = new FlinkParameters();
111-
taskParams.setProgramType(ProgramType.SQL);
112-
taskParams.setInitScript("SET dt = '$[yyyyMMdd]';");
113-
taskParams.setRawScript("SELECT * FROM t WHERE dt = '$[yyyyMMdd]'");
114-
String taskParamsJson = JSONUtils.toJsonString(taskParams);
115-
116-
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
117-
when(taskExecutionContext.getTaskParams()).thenReturn(taskParamsJson);
118-
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
119-
120-
try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class)) {
121-
fileUtilsMock.when(() -> FileUtils.generateScriptFile(Mockito.any(), Mockito.any())).then(inv -> null);
122-
123-
FlinkTask task = new FlinkTask(taskExecutionContext);
124-
task.init();
125-
126-
FlinkParameters flinkParams = (FlinkParameters) task.getParameters();
127-
// Even with empty paramsMap, time placeholders $[yyyyMMdd] should still be replaced
128-
// (using current date/time)
129-
String today = java.time.LocalDate.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMdd"));
130-
Assertions.assertTrue(flinkParams.getInitScript().contains(today) ||
131-
flinkParams.getInitScript().matches("SET dt = '\\d{8}';"));
132-
Assertions.assertTrue(flinkParams.getRawScript().contains(today) ||
133-
flinkParams.getRawScript().matches("SELECT \\* FROM t WHERE dt = '\\d{8}'"));
115+
String callGetScript() {
116+
return getScript();
134117
}
135118
}
136119
}

0 commit comments

Comments
 (0)