2222import org .apache .flink .configuration .Configuration ;
2323import org .apache .flink .configuration .ConfigurationUtils ;
2424import org .apache .flink .configuration .DeploymentOptionsInternal ;
25- import org .apache .flink .configuration .GlobalConfiguration ;
2625import org .apache .flink .configuration .JobManagerOptions ;
2726import org .apache .flink .configuration .RestOptions ;
2827import org .apache .flink .configuration .TaskManagerOptions ;
2928import org .apache .flink .kubernetes .configuration .KubernetesConfigOptions ;
3029import org .apache .flink .kubernetes .kubeclient .FlinkPod ;
3130import org .apache .flink .kubernetes .kubeclient .parameters .AbstractKubernetesParameters ;
31+ import org .apache .flink .kubernetes .operator .api .spec .FlinkVersion ;
32+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .ConfigMap ;
33+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .ConfigMapBuilder ;
34+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .Container ;
35+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .ContainerBuilder ;
36+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .HasMetadata ;
37+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .KeyToPath ;
38+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .KeyToPathBuilder ;
39+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .Pod ;
40+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .PodBuilder ;
41+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .Volume ;
42+ import org .apache .flink .kubernetes .shaded .io .fabric8 .kubernetes .api .model .VolumeBuilder ;
3243import org .apache .flink .kubernetes .utils .Constants ;
3344
3445import org .apache .flink .shaded .guava31 .com .google .common .io .Files ;
3546
36- import io .fabric8 .kubernetes .api .model .ConfigMap ;
37- import io .fabric8 .kubernetes .api .model .ConfigMapBuilder ;
38- import io .fabric8 .kubernetes .api .model .Container ;
39- import io .fabric8 .kubernetes .api .model .ContainerBuilder ;
40- import io .fabric8 .kubernetes .api .model .HasMetadata ;
41- import io .fabric8 .kubernetes .api .model .KeyToPath ;
42- import io .fabric8 .kubernetes .api .model .KeyToPathBuilder ;
43- import io .fabric8 .kubernetes .api .model .Pod ;
44- import io .fabric8 .kubernetes .api .model .PodBuilder ;
45- import io .fabric8 .kubernetes .api .model .Volume ;
46- import io .fabric8 .kubernetes .api .model .VolumeBuilder ;
47-
4847import java .io .File ;
4948import java .io .IOException ;
5049import java .io .PrintWriter ;
5756import java .util .Map ;
5857import java .util .stream .Collectors ;
5958
59+ import static org .apache .flink .kubernetes .operator .config .FlinkConfigBuilder .FLINK_VERSION ;
6060import static org .apache .flink .kubernetes .utils .Constants .CONFIG_FILE_LOG4J_NAME ;
6161import static org .apache .flink .kubernetes .utils .Constants .CONFIG_FILE_LOGBACK_NAME ;
6262import static org .apache .flink .kubernetes .utils .Constants .CONFIG_MAP_PREFIX ;
6363import static org .apache .flink .kubernetes .utils .Constants .FLINK_CONF_VOLUME ;
6464import static org .apache .flink .util .Preconditions .checkNotNull ;
6565
66- /**
67- * Mounts the log4j.properties, logback.xml, and config.yaml configuration on the JobManager or
68- * TaskManager pod.
69- */
66+ /** Copied from Flink core and modified to handle all Flink versions. */
7067public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
7168
7269 private final AbstractKubernetesParameters kubernetesComponentConf ;
@@ -105,8 +102,8 @@ private Pod decoratePod(Pod pod) {
105102 .collect (Collectors .toList ());
106103 keyToPaths .add (
107104 new KeyToPathBuilder ()
108- .withKey (GlobalConfiguration . getFlinkConfFilename ())
109- .withPath (GlobalConfiguration . getFlinkConfFilename ())
105+ .withKey (getFlinkConfFilename ())
106+ .withPath (getFlinkConfFilename ())
110107 .build ());
111108
112109 final Volume flinkConfVolume =
@@ -139,7 +136,7 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
139136
140137 final List <String > confData =
141138 getClusterSideConfData (kubernetesComponentConf .getFlinkConfiguration ());
142- data .put (GlobalConfiguration . getFlinkConfFilename (), getFlinkConfData (confData ));
139+ data .put (getFlinkConfFilename (), getFlinkConfData (confData ));
143140
144141 final ConfigMap flinkConfConfigMap =
145142 new ConfigMapBuilder ()
@@ -156,21 +153,26 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
156153
157154 /** Get properties map for the cluster-side after removal of some keys. */
158155 private List <String > getClusterSideConfData (Configuration flinkConfig ) {
159- final Configuration clusterSideConfig = flinkConfig .clone ();
156+ // For Flink versions that use the standard config we have to set the standardYaml flag in
157+ // the Configuration object manually instead of simply cloning, otherwise it would simply
158+ // inherit it from the base config (which would always be false currently).
159+ Configuration clusterSideConfig = new Configuration (useStandardYamlConfig ());
160+ clusterSideConfig .addAll (flinkConfig );
160161 // Remove some configuration options that should not be taken to cluster side.
161162 clusterSideConfig .removeConfig (KubernetesConfigOptions .KUBE_CONFIG_FILE );
162163 clusterSideConfig .removeConfig (DeploymentOptionsInternal .CONF_DIR );
163164 clusterSideConfig .removeConfig (RestOptions .BIND_ADDRESS );
164165 clusterSideConfig .removeConfig (JobManagerOptions .BIND_HOST );
165166 clusterSideConfig .removeConfig (TaskManagerOptions .BIND_HOST );
166167 clusterSideConfig .removeConfig (TaskManagerOptions .HOST );
168+
167169 return ConfigurationUtils .convertConfigToWritableLines (clusterSideConfig , false );
168170 }
169171
170172 @ VisibleForTesting
171173 String getFlinkConfData (List <String > confData ) throws IOException {
172174 try (StringWriter sw = new StringWriter ();
173- PrintWriter out = new PrintWriter (sw )) {
175+ PrintWriter out = new PrintWriter (sw )) {
174176 confData .forEach (out ::println );
175177
176178 return sw .toString ();
@@ -197,4 +199,28 @@ private List<File> getLocalLogConfFiles() {
197199 public static String getFlinkConfConfigMapName (String clusterId ) {
198200 return CONFIG_MAP_PREFIX + clusterId ;
199201 }
202+
203+ /**
204+ * We have to override the GlobalConfiguration#getFlinkConfFilename() logic to make sure we can
205+ * separate operator level (Global) and Flink deployment specific config handling.
206+ *
207+ * @return conf file name
208+ */
209+ public String getFlinkConfFilename () {
210+ return useStandardYamlConfig () ? "config.yaml" : "flink-conf.yaml" ;
211+ }
212+
213+ /**
214+ * Determine based on the Flink Version if we should use the new standard config.yaml vs the old
215+ * flink-conf.yaml. While technically 1.19+ could use this we don't want to change the behaviour
216+ * for already released Flink versions, so only switch to new yaml from Flink 2.0 onwards.
217+ *
218+ * @return True for Flink version 2.0 and above
219+ */
220+ boolean useStandardYamlConfig () {
221+ return kubernetesComponentConf
222+ .getFlinkConfiguration ()
223+ .get (FLINK_VERSION )
224+ .isEqualOrNewer (FlinkVersion .v2_0 );
225+ }
200226}
0 commit comments