Skip to content

Commit f4f4a8a

Browse files
committed
[FLINK-37405] Validate config prefixes for Flink 2.0
1 parent 9eb3c38 commit f4f4a8a

File tree

2 files changed

+58
-0
lines changed

2 files changed

+58
-0
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.configuration.Configuration;
2323
import org.apache.flink.configuration.ConfigurationUtils;
2424
import org.apache.flink.configuration.DeploymentOptionsInternal;
25+
import org.apache.flink.configuration.IllegalConfigurationException;
2526
import org.apache.flink.configuration.JobManagerOptions;
2627
import org.apache.flink.configuration.RestOptions;
2728
import org.apache.flink.configuration.TaskManagerOptions;
@@ -166,9 +167,32 @@ private List<String> getClusterSideConfData(Configuration flinkConfig) {
166167
clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
167168
clusterSideConfig.removeConfig(TaskManagerOptions.HOST);
168169

170+
validateConfigKeysForV2(clusterSideConfig);
171+
169172
return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false);
170173
}
171174

175+
private void validateConfigKeysForV2(Configuration clusterSideConfig) {
176+
177+
// Only validate Flink 2.0 yaml configs
178+
if (!useStandardYamlConfig()) {
179+
return;
180+
}
181+
182+
var keys = clusterSideConfig.keySet();
183+
184+
for (var key1 : keys) {
185+
for (var key2 : keys) {
186+
if (key2.startsWith(key1 + ".")) {
187+
throw new IllegalConfigurationException(
188+
String.format(
189+
"Overlapping key prefixes detected (%s -> %s), please replace with Flink v2 compatible, non-deprecated keys.",
190+
key1, key2));
191+
}
192+
}
193+
}
194+
}
195+
172196
@VisibleForTesting
173197
String getFlinkConfData(List<String> confData) throws IOException {
174198
try (StringWriter sw = new StringWriter();

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.client.deployment.ClusterSpecification;
2222
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.IllegalConfigurationException;
2324
import org.apache.flink.configuration.YamlParserUtils;
2425
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
2526
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
@@ -32,6 +33,7 @@
3233
import org.junit.jupiter.params.ParameterizedTest;
3334
import org.junit.jupiter.params.provider.Arguments;
3435
import org.junit.jupiter.params.provider.MethodSource;
36+
import org.junit.jupiter.params.provider.ValueSource;
3537

3638
import java.io.IOException;
3739
import java.util.HashMap;
@@ -95,6 +97,38 @@ void testConfigMap(FlinkVersion version, String expectedConfName, boolean standa
9597
}
9698
}
9799

100+
@ParameterizedTest
101+
@ValueSource(booleans = {true, false})
102+
void testOverlappingKeyDetection(boolean standardYaml) throws IOException {
103+
104+
flinkConfig.set(
105+
FlinkConfigBuilder.FLINK_VERSION,
106+
standardYaml ? FlinkVersion.v2_0 : FlinkVersion.v1_20);
107+
flinkConfig.setString("k", "v");
108+
flinkConfig.setString("kv", "v2");
109+
110+
// Non overlapping keys
111+
flinkConfMountDecorator.buildAccompanyingKubernetesResources();
112+
flinkConfig.setString("k.v", "v3");
113+
114+
IllegalConfigurationException err = null;
115+
try {
116+
var additionalResources =
117+
flinkConfMountDecorator.buildAccompanyingKubernetesResources();
118+
assertThat(additionalResources).hasSize(1);
119+
} catch (IllegalConfigurationException e) {
120+
err = e;
121+
}
122+
123+
if (standardYaml) {
124+
assertThat(err)
125+
.isNotNull()
126+
.hasMessageContaining("Overlapping key prefixes detected (k -> k.v)");
127+
} else {
128+
assertThat(err).isNull();
129+
}
130+
}
131+
98132
private static Stream<Arguments> testArgs() {
99133
return Stream.of(
100134
Arguments.arguments(FlinkVersion.v1_19, "flink-conf.yaml", false),

0 commit comments

Comments
 (0)