Skip to content

Commit 9a16038

Browse files
段晓雄cursoragent
andcommitted
fix(flink-stream): refactor FlinkStreamTaskTest - remove Mockito, assert on generated files
Parameter replacement was moved from init() to getScript(), so tests must call getScript() and assert on generated script file content instead of flinkParams. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 460aaae commit 9a16038

File tree

1 file changed

+79
-57
lines changed

1 file changed

+79
-57
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTaskTest.java

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,88 +17,110 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.flink;
1919

20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.when;
20+
import static org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
2221

2322
import org.apache.dolphinscheduler.common.utils.JSONUtils;
24-
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
2523
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
26-
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
27-
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
2824
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
29-
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
3025

26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.Files;
28+
import java.nio.file.Path;
3129
import java.util.HashMap;
3230
import java.util.Map;
3331

3432
import org.junit.jupiter.api.Assertions;
3533
import org.junit.jupiter.api.Test;
36-
import org.junit.jupiter.api.extension.ExtendWith;
37-
import org.mockito.MockedStatic;
38-
import org.mockito.Mockito;
39-
import org.mockito.junit.jupiter.MockitoExtension;
34+
import org.junit.jupiter.api.io.TempDir;
4035

4136
/**
42-
* Test FlinkStreamTask parameter replacement for initScript and rawScript.
37+
* FlinkStreamTask unit test. Verifies parameter replacement in initScript and rawScript without Mockito.
4338
*/
44-
@ExtendWith(MockitoExtension.class)
45-
class FlinkStreamTaskTest {
39+
public class FlinkStreamTaskTest {
40+
41+
@TempDir
42+
Path tempDir;
4643

4744
@Test
48-
void testInitReplacesPlaceholdersInInitScriptAndRawScript() {
45+
public void testParameterReplacementInScript() throws Exception {
46+
String executePath = tempDir.toString();
47+
String taskAppId = "test-app";
48+
49+
FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
50+
flinkParameters.setProgramType(ProgramType.SQL);
51+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
52+
flinkParameters.setParallelism(2);
53+
flinkParameters.setInitScript("SET 'date' = '${bizdate}';");
54+
flinkParameters.setRawScript("SELECT * FROM logs WHERE dt = '${bizdate}' AND env = '${env}'");
55+
4956
Map<String, Property> prepareParamsMap = new HashMap<>();
50-
prepareParamsMap.put("bizdate", new Property("bizdate", Direct.IN, DataType.VARCHAR, "20250601"));
51-
prepareParamsMap.put("env", new Property("env", Direct.IN, DataType.VARCHAR, "prod"));
52-
53-
FlinkStreamParameters taskParams = new FlinkStreamParameters();
54-
taskParams.setProgramType(ProgramType.SQL);
55-
taskParams.setInitScript("SET 'date' = '${bizdate}';");
56-
taskParams.setRawScript("SELECT * FROM logs WHERE dt = '${bizdate}' AND env = '${env}'");
57-
String taskParamsJson = JSONUtils.toJsonString(taskParams);
58-
59-
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
60-
when(taskExecutionContext.getTaskParams()).thenReturn(taskParamsJson);
61-
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
62-
63-
try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class)) {
64-
fileUtilsMock.when(() -> FileUtils.generateScriptFile(Mockito.any(), Mockito.any())).then(inv -> null);
65-
66-
FlinkStreamTask task = new FlinkStreamTask(taskExecutionContext);
67-
task.init();
68-
69-
AbstractParameters params = task.getParameters();
70-
Assertions.assertInstanceOf(FlinkStreamParameters.class, params);
71-
FlinkStreamParameters flinkParams = (FlinkStreamParameters) params;
72-
Assertions.assertEquals("SET 'date' = '20250601';", flinkParams.getInitScript());
73-
Assertions.assertEquals("SELECT * FROM logs WHERE dt = '20250601' AND env = 'prod'",
74-
flinkParams.getRawScript());
75-
}
57+
prepareParamsMap.put("bizdate", new Property("bizdate", null, null, "20250601"));
58+
prepareParamsMap.put("env", new Property("env", null, null, "prod"));
59+
60+
TaskExecutionContext context = new TaskExecutionContext();
61+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
62+
context.setExecutePath(executePath);
63+
context.setTaskAppId(taskAppId);
64+
context.setPrepareParamsMap(prepareParamsMap);
65+
66+
FlinkStreamTaskForTest task = new FlinkStreamTaskForTest(context);
67+
task.init();
68+
task.callGetScript();
69+
70+
String initScriptPath = String.format("%s/%s_init.sql", executePath, taskAppId);
71+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
72+
73+
String initContent = Files.readString(Path.of(initScriptPath), StandardCharsets.UTF_8);
74+
String nodeContent = Files.readString(Path.of(nodeScriptPath), StandardCharsets.UTF_8);
75+
76+
Assertions.assertTrue(initContent.contains("SET 'date' = '20250601';"),
77+
"Expected ${bizdate} to be replaced, got: " + initContent);
78+
Assertions.assertTrue(nodeContent.contains("dt = '20250601'"),
79+
"Expected ${bizdate} to be replaced, got: " + nodeContent);
80+
Assertions.assertTrue(nodeContent.contains("env = 'prod'"),
81+
"Expected ${env} to be replaced, got: " + nodeContent);
7682
}
7783

7884
@Test
79-
void testInitReplacesTimePlaceholderWhenParamMapContainsScheduleTime() {
85+
public void testParameterReplacementTimePlaceholder() throws Exception {
86+
String executePath = tempDir.toString();
87+
String taskAppId = "test-time";
88+
89+
FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
90+
flinkParameters.setProgramType(ProgramType.SQL);
91+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
92+
flinkParameters.setParallelism(2);
93+
flinkParameters.setInitScript("");
94+
flinkParameters.setRawScript("INSERT INTO t SELECT * FROM s WHERE dt = '$[yyyyMMdd]'");
95+
8096
Map<String, Property> prepareParamsMap = new HashMap<>();
81-
prepareParamsMap.put(TaskConstants.PARAMETER_DATETIME,
82-
new Property(TaskConstants.PARAMETER_DATETIME, Direct.IN, DataType.VARCHAR, "20210815080000"));
97+
prepareParamsMap.put(PARAMETER_DATETIME, new Property(PARAMETER_DATETIME, null, null, "20210815080000"));
98+
99+
TaskExecutionContext context = new TaskExecutionContext();
100+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
101+
context.setExecutePath(executePath);
102+
context.setTaskAppId(taskAppId);
103+
context.setPrepareParamsMap(prepareParamsMap);
83104

84-
FlinkStreamParameters taskParams = new FlinkStreamParameters();
85-
taskParams.setProgramType(ProgramType.SQL);
86-
taskParams.setInitScript("");
87-
taskParams.setRawScript("INSERT INTO t SELECT * FROM s WHERE dt = '$[yyyyMMdd]'");
88-
String taskParamsJson = JSONUtils.toJsonString(taskParams);
105+
FlinkStreamTaskForTest task = new FlinkStreamTaskForTest(context);
106+
task.init();
107+
task.callGetScript();
89108

90-
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
91-
when(taskExecutionContext.getTaskParams()).thenReturn(taskParamsJson);
92-
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
109+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
110+
String nodeContent = Files.readString(Path.of(nodeScriptPath), StandardCharsets.UTF_8);
93111

94-
try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class)) {
95-
fileUtilsMock.when(() -> FileUtils.generateScriptFile(Mockito.any(), Mockito.any())).then(inv -> null);
112+
Assertions.assertTrue(nodeContent.contains("dt = '20210815'"),
113+
"Expected $[yyyyMMdd] to be replaced with 20210815, got: " + nodeContent);
114+
}
96115

97-
FlinkStreamTask task = new FlinkStreamTask(taskExecutionContext);
98-
task.init();
116+
private static class FlinkStreamTaskForTest extends FlinkStreamTask {
117+
118+
FlinkStreamTaskForTest(TaskExecutionContext context) {
119+
super(context);
120+
}
99121

100-
FlinkStreamParameters flinkParams = (FlinkStreamParameters) task.getParameters();
101-
Assertions.assertEquals("INSERT INTO t SELECT * FROM s WHERE dt = '20210815'", flinkParams.getRawScript());
122+
String callGetScript() {
123+
return getScript();
102124
}
103125
}
104126
}

0 commit comments

Comments
 (0)