diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 48a70b3a71..733ebd3ae2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.utils.JobStatusUtils; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; @@ -984,7 +985,8 @@ protected static Configuration removeOperatorConfigs(Configuration config) { config.toMap() .forEach( (k, v) -> { - if (!k.startsWith(K8S_OP_CONF_PREFIX)) { + if (!k.startsWith(K8S_OP_CONF_PREFIX) + && !k.startsWith(AutoScalerOptions.AUTOSCALER_CONF_PREFIX)) { newConfig.setString(k, v); } }); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index 9082ff358a..a57d60a8a3 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -987,10 +987,15 @@ public void fetchCheckpointInfoTest() throws Exception { @Test public void removeOperatorConfigTest() { - var key = "kubernetes.operator.meyKey"; - var deployConfig = Configuration.fromMap(Map.of("kubernetes.operator.meyKey", "v")); + var opKey1 = "kubernetes.operator.meyKey"; + var opKey2 = "job.autoscaler."; + var regularKey = "k"; + var deployConfig = + Configuration.fromMap(Map.of(opKey1, "v", opKey2, "v", regularKey, "v1")); var newConf = AbstractFlinkService.removeOperatorConfigs(deployConfig); - assertFalse(newConf.containsKey(key)); + assertFalse(newConf.containsKey(opKey1)); + assertFalse(newConf.containsKey(opKey2)); + assertTrue(newConf.containsKey(regularKey)); } @Test