Skip to content

Commit 7e04477

Browse files
committed
fix comment
1 parent 91123c5 commit 7e04477

File tree

3 files changed

+16
-14
lines changed

3 files changed

+16
-14
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
6464
private static final String TRANSFORM_KEY = "transform";
6565
private static final String PIPELINE_KEY = "pipeline";
6666
private static final String MODEL_KEY = "model";
67-
private static final String FLINK_KEY = "flink";
67+
private static final String FLINK_KEY = "flink-conf";
6868

6969
// Source / sink keys
7070
private static final String TYPE_KEY = "type";

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,20 @@ private static void mergeCommandLineFlinkConfig(
131131

132132
LOG.info("Dynamic flink config items found from command-line: {}", commandLineProperties);
133133
commandLineProperties.forEach(
134-
(key, value) -> {
135-
String keyStr = key.toString();
136-
String valueStr = value.toString();
137-
if (StringUtils.isNullOrWhitespaceOnly(keyStr)
138-
|| StringUtils.isNullOrWhitespaceOnly(valueStr)) {
139-
throw new IllegalArgumentException(
140-
String.format(
141-
"null or white space argument for key or value: %s=%s",
142-
key, value));
143-
}
144-
flinkConfig.setString(keyStr.trim(), valueStr.trim());
145-
});
134+
(key, value) -> validateAndApplyCommandLineEntry(flinkConfig, key, value));
135+
}
136+
137+
private static void validateAndApplyCommandLineEntry(
138+
Configuration flinkConfig, Object key, Object value) {
139+
String keyStr = key.toString();
140+
String valueStr = value.toString();
141+
if (StringUtils.isNullOrWhitespaceOnly(keyStr)
142+
|| StringUtils.isNullOrWhitespaceOnly(valueStr)) {
143+
throw new IllegalArgumentException(
144+
String.format(
145+
"null or white space argument for key or value: %s=%s", key, value));
146+
}
147+
flinkConfig.setString(keyStr.trim(), valueStr.trim());
146148
}
147149

148150
private static void mergePipelineFlinkConfig(

flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,5 @@ pipeline:
6464
openai.model: text-embedding-3-small
6565
openai.host: https://xxxx
6666
openai.apikey: abcd1234
67-
flink:
67+
flink-conf:
6868
execution.checkpointing.timeout: 12min

0 commit comments

Comments
 (0)