Skip to content

Commit 3a00aae

Browse files
段晓雄cursoragent
andcommitted
refactor(flink-sqlgateway): move parameter replacement to script usage site
- Remove param replacement from init(), apply in resolveFlinkJdbcUrl/resolveInitScriptContent/resolveMainScriptContent - Consistent with PR apache#17987 review: move replacement to where values are used Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9c42d0b commit 3a00aae

File tree

1 file changed

+25
-24
lines changed

1 file changed

+25
-24
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-sqlgateway/src/main/java/org/apache/dolphinscheduler/plugin/task/flinksqlgateway/FlinkSqlGatewayTask.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -65,24 +65,6 @@ public void init() {
6565
parameters =
6666
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkSqlGatewayParameters.class);
6767

68-
// Replace parameter placeholders (e.g. ${system.biz.date}, $[yyyyMMdd]); empty params still allow time placeholders
69-
if (parameters != null) {
70-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
71-
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
72-
73-
if (StringUtils.isNotBlank(parameters.getFlinkJdbcUrl())) {
74-
parameters.setFlinkJdbcUrl(ParameterUtils.convertParameterPlaceholders(parameters.getFlinkJdbcUrl(), stringParams));
75-
}
76-
if (FlinkSqlGatewayParameters.SCRIPT_SOURCE_SCRIPT.equals(parameters.getInitScriptType())
77-
&& StringUtils.isNotBlank(parameters.getInitScript())) {
78-
parameters.setInitScript(ParameterUtils.convertParameterPlaceholders(parameters.getInitScript(), stringParams));
79-
}
80-
if (FlinkSqlGatewayParameters.SCRIPT_SOURCE_SCRIPT.equals(parameters.getRawScriptType())
81-
&& StringUtils.isNotBlank(parameters.getRawScript())) {
82-
parameters.setRawScript(ParameterUtils.convertParameterPlaceholders(parameters.getRawScript(), stringParams));
83-
}
84-
}
85-
8668
log.info("Initialize flink sqlgateway task params {}", JSONUtils.toPrettyJsonString(parameters));
8769

8870
if (parameters == null || !parameters.checkParameters()) {
@@ -98,8 +80,9 @@ public AbstractParameters getParameters() {
9880
@Override
9981
public void handle(TaskCallBack taskCallBack) throws TaskException {
10082
try {
83+
String jdbcUrl = resolveFlinkJdbcUrl();
10184
Properties props = parameters.toJdbcProperties();
102-
connection = DriverManager.getConnection(parameters.getFlinkJdbcUrl(), props);
85+
connection = DriverManager.getConnection(jdbcUrl, props);
10386
statement = connection.createStatement();
10487

10588
executeScriptIfPresent(resolveInitScriptContent(), "init");
@@ -135,33 +118,51 @@ public void cancel() throws TaskException {
135118
}
136119
}
137120

121+
private String resolveFlinkJdbcUrl() {
122+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
123+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
124+
return ParameterUtils.convertParameterPlaceholders(parameters.getFlinkJdbcUrl(), stringParams);
125+
}
126+
138127
private String resolveInitScriptContent() throws Exception {
128+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
129+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
130+
139131
if (FlinkSqlGatewayParameters.SCRIPT_SOURCE_FILE.equals(parameters.getInitScriptType())) {
140132
List<ResourceInfo> initList = parameters.getInitScriptResourceList();
141133
if (initList != null && !initList.isEmpty()) {
142134
String resourceName = initList.get(0).getResourceName();
143135
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
144136
String localPath = resourceContext.getResourceItem(resourceName).getResourceAbsolutePathInLocal();
145137
String content = FileUtils.readFileToString(new File(localPath), StandardCharsets.UTF_8);
146-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
147-
return ParameterUtils.convertParameterPlaceholders(content, ParameterUtils.convert(paramsMap));
138+
return ParameterUtils.convertParameterPlaceholders(content, stringParams);
148139
}
149140
}
141+
if (FlinkSqlGatewayParameters.SCRIPT_SOURCE_SCRIPT.equals(parameters.getInitScriptType())
142+
&& StringUtils.isNotBlank(parameters.getInitScript())) {
143+
return ParameterUtils.convertParameterPlaceholders(parameters.getInitScript(), stringParams);
144+
}
150145
return parameters.getInitScript();
151146
}
152147

153148
private String resolveMainScriptContent() throws Exception {
149+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
150+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
151+
154152
if (FlinkSqlGatewayParameters.SCRIPT_SOURCE_FILE.equals(parameters.getRawScriptType())) {
155-
List<ResourceInfo> resourceList = parameters.getResourceList();
153+
List<ResourceInfo> resourceList = parameters.getResourceList();
156154
if (resourceList != null && !resourceList.isEmpty()) {
157155
String resourceName = resourceList.get(0).getResourceName();
158156
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
159157
String localPath = resourceContext.getResourceItem(resourceName).getResourceAbsolutePathInLocal();
160158
String content = FileUtils.readFileToString(new File(localPath), StandardCharsets.UTF_8);
161-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
162-
return ParameterUtils.convertParameterPlaceholders(content, ParameterUtils.convert(paramsMap));
159+
return ParameterUtils.convertParameterPlaceholders(content, stringParams);
163160
}
164161
}
162+
if (FlinkSqlGatewayParameters.SCRIPT_SOURCE_SCRIPT.equals(parameters.getRawScriptType())
163+
&& StringUtils.isNotBlank(parameters.getRawScript())) {
164+
return ParameterUtils.convertParameterPlaceholders(parameters.getRawScript(), stringParams);
165+
}
165166
return parameters.getRawScript();
166167
}
167168

0 commit comments

Comments
 (0)