Skip to content

Commit 721e06c

Browse files
committed
[FLINK-37236] FlinkConfMountDecorator and HA Configmap related changes for 2.0 compatibility
1 parent d6e20e2 commit 721e06c

File tree

7 files changed

+403
-44
lines changed

7 files changed

+403
-44
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.kubernetes.kubeclient.decorators;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.ConfigurationUtils;
24+
import org.apache.flink.configuration.DeploymentOptionsInternal;
25+
import org.apache.flink.configuration.JobManagerOptions;
26+
import org.apache.flink.configuration.RestOptions;
27+
import org.apache.flink.configuration.TaskManagerOptions;
28+
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
29+
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
30+
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
31+
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
32+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap;
33+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMapBuilder;
34+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
35+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
36+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
37+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPath;
38+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPathBuilder;
39+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod;
40+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
41+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume;
42+
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder;
43+
import org.apache.flink.kubernetes.utils.Constants;
44+
45+
import org.apache.flink.shaded.guava31.com.google.common.io.Files;
46+
47+
import java.io.File;
48+
import java.io.IOException;
49+
import java.io.PrintWriter;
50+
import java.io.StringWriter;
51+
import java.nio.charset.StandardCharsets;
52+
import java.util.ArrayList;
53+
import java.util.Collections;
54+
import java.util.HashMap;
55+
import java.util.List;
56+
import java.util.Map;
57+
import java.util.stream.Collectors;
58+
59+
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
60+
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
61+
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
62+
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX;
63+
import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
64+
import static org.apache.flink.util.Preconditions.checkNotNull;
65+
66+
/** Copied from Flink core and modified to handle all Flink versions. */
67+
public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
68+
69+
private final AbstractKubernetesParameters kubernetesComponentConf;
70+
71+
public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {
72+
this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);
73+
}
74+
75+
@Override
76+
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
77+
final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());
78+
79+
final Container mountedMainContainer =
80+
new ContainerBuilder(flinkPod.getMainContainer())
81+
.addNewVolumeMount()
82+
.withName(FLINK_CONF_VOLUME)
83+
.withMountPath(kubernetesComponentConf.getFlinkConfDirInPod())
84+
.endVolumeMount()
85+
.build();
86+
87+
return new FlinkPod.Builder(flinkPod)
88+
.withPod(mountedPod)
89+
.withMainContainer(mountedMainContainer)
90+
.build();
91+
}
92+
93+
private Pod decoratePod(Pod pod) {
94+
final List<KeyToPath> keyToPaths =
95+
getLocalLogConfFiles().stream()
96+
.map(
97+
file ->
98+
new KeyToPathBuilder()
99+
.withKey(file.getName())
100+
.withPath(file.getName())
101+
.build())
102+
.collect(Collectors.toList());
103+
keyToPaths.add(
104+
new KeyToPathBuilder()
105+
.withKey(getFlinkConfFilename())
106+
.withPath(getFlinkConfFilename())
107+
.build());
108+
109+
final Volume flinkConfVolume =
110+
new VolumeBuilder()
111+
.withName(FLINK_CONF_VOLUME)
112+
.withNewConfigMap()
113+
.withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId()))
114+
.withItems(keyToPaths)
115+
.endConfigMap()
116+
.build();
117+
118+
return new PodBuilder(pod)
119+
.editSpec()
120+
.addNewVolumeLike(flinkConfVolume)
121+
.endVolume()
122+
.endSpec()
123+
.build();
124+
}
125+
126+
@Override
127+
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
128+
final String clusterId = kubernetesComponentConf.getClusterId();
129+
130+
final Map<String, String> data = new HashMap<>();
131+
132+
final List<File> localLogFiles = getLocalLogConfFiles();
133+
for (File file : localLogFiles) {
134+
data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8));
135+
}
136+
137+
final List<String> confData =
138+
getClusterSideConfData(kubernetesComponentConf.getFlinkConfiguration());
139+
data.put(getFlinkConfFilename(), getFlinkConfData(confData));
140+
141+
final ConfigMap flinkConfConfigMap =
142+
new ConfigMapBuilder()
143+
.withApiVersion(Constants.API_VERSION)
144+
.withNewMetadata()
145+
.withName(getFlinkConfConfigMapName(clusterId))
146+
.withLabels(kubernetesComponentConf.getCommonLabels())
147+
.endMetadata()
148+
.addToData(data)
149+
.build();
150+
151+
return Collections.singletonList(flinkConfConfigMap);
152+
}
153+
154+
/** Get properties map for the cluster-side after removal of some keys. */
155+
private List<String> getClusterSideConfData(Configuration flinkConfig) {
156+
// For Flink versions that use the standard config we have to set the standardYaml flag in
157+
// the Configuration object manually instead of simply cloning, otherwise it would simply
158+
// inherit it from the base config (which would always be false currently).
159+
Configuration clusterSideConfig = new Configuration(useStandardYamlConfig());
160+
clusterSideConfig.addAll(flinkConfig);
161+
// Remove some configuration options that should not be taken to cluster side.
162+
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
163+
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
164+
clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS);
165+
clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST);
166+
clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
167+
clusterSideConfig.removeConfig(TaskManagerOptions.HOST);
168+
169+
return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false);
170+
}
171+
172+
@VisibleForTesting
173+
String getFlinkConfData(List<String> confData) throws IOException {
174+
try (StringWriter sw = new StringWriter();
175+
PrintWriter out = new PrintWriter(sw)) {
176+
confData.forEach(out::println);
177+
178+
return sw.toString();
179+
}
180+
}
181+
182+
private List<File> getLocalLogConfFiles() {
183+
final String confDir = kubernetesComponentConf.getConfigDirectory();
184+
final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
185+
final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
186+
187+
List<File> localLogConfFiles = new ArrayList<>();
188+
if (logbackFile.exists()) {
189+
localLogConfFiles.add(logbackFile);
190+
}
191+
if (log4jFile.exists()) {
192+
localLogConfFiles.add(log4jFile);
193+
}
194+
195+
return localLogConfFiles;
196+
}
197+
198+
@VisibleForTesting
199+
public static String getFlinkConfConfigMapName(String clusterId) {
200+
return CONFIG_MAP_PREFIX + clusterId;
201+
}
202+
203+
/**
204+
* We have to override the GlobalConfiguration#getFlinkConfFilename() logic to make sure we can
205+
* separate operator level (Global) and Flink deployment specific config handling.
206+
*
207+
* @return conf file name
208+
*/
209+
public String getFlinkConfFilename() {
210+
return useStandardYamlConfig() ? "config.yaml" : "flink-conf.yaml";
211+
}
212+
213+
/**
214+
* Determine based on the Flink Version if we should use the new standard config.yaml vs the old
215+
* flink-conf.yaml. While technically 1.19+ could use this we don't want to change the behaviour
216+
* for already released Flink versions, so only switch to new yaml from Flink 2.0 onwards.
217+
*
218+
* @return True for Flink version 2.0 and above
219+
*/
220+
boolean useStandardYamlConfig() {
221+
return kubernetesComponentConf
222+
.getFlinkConfiguration()
223+
.get(FLINK_VERSION)
224+
.isEqualOrNewer(FlinkVersion.v2_0);
225+
}
226+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class ConfigMapStore {
3939

4040
private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
4141

42-
private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
42+
public static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
4343

4444
private final KubernetesClient kubernetesClient;
4545

@@ -110,15 +110,19 @@ private ObjectMeta createCmObjectMeta(ResourceID uid) {
110110
var objectMeta = new ObjectMeta();
111111
objectMeta.setName("autoscaler-" + uid.getName());
112112
uid.getNamespace().ifPresent(objectMeta::setNamespace);
113-
objectMeta.setLabels(
114-
Map.of(
115-
Constants.LABEL_COMPONENT_KEY,
116-
LABEL_COMPONENT_AUTOSCALER,
117-
Constants.LABEL_APP_KEY,
118-
uid.getName()));
113+
objectMeta.setLabels(getAutoscalerCmLabels(uid));
119114
return objectMeta;
120115
}
121116

117+
@VisibleForTesting
118+
public static Map<String, String> getAutoscalerCmLabels(ResourceID uid) {
119+
return Map.of(
120+
Constants.LABEL_COMPONENT_KEY,
121+
LABEL_COMPONENT_AUTOSCALER,
122+
Constants.LABEL_APP_KEY,
123+
uid.getName());
124+
}
125+
122126
private ConfigMap buildConfigMap(HasMetadata cr, ObjectMeta meta) {
123127
var cm = new ConfigMap();
124128
cm.setMetadata(meta);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
301301
effectiveConfig.set(
302302
DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
303303

304-
if (jobSpec.getJarURI() != null) {
304+
if (deploymentMode == KubernetesDeploymentMode.NATIVE && jobSpec.getJarURI() != null) {
305305
effectiveConfig.set(
306306
PipelineOptions.JARS,
307307
Collections.singletonList(new URI(jobSpec.getJarURI()).toString()));

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,7 @@ protected void runJar(
810810
JarRunHeaders headers = JarRunHeaders.getInstance();
811811
JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
812812
parameters.jarIdPathParameter.resolve(jarId);
813+
var flinkVersion = conf.get(FLINK_VERSION);
813814
JarRunRequestBody runRequestBody =
814815
new JarRunRequestBody(
815816
job.getEntryClass(),
@@ -819,7 +820,12 @@ protected void runJar(
819820
jobID,
820821
job.getAllowNonRestoredState(),
821822
savepoint,
822-
RestoreMode.DEFAULT,
823+
flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
824+
? null
825+
: RestoreMode.DEFAULT,
826+
flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
827+
? RestoreMode.DEFAULT
828+
: null,
823829
conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17)
824830
? conf.toMap()
825831
: null);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
2626
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
2727
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
28+
import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
2829
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2930
import org.apache.flink.kubernetes.utils.Constants;
3031
import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -47,6 +48,8 @@
4748
import io.fabric8.kubernetes.api.model.apps.Deployment;
4849
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
4950
import io.fabric8.kubernetes.client.KubernetesClient;
51+
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
52+
import io.fabric8.kubernetes.client.dsl.Resource;
5053
import io.javaoperatorsdk.operator.api.reconciler.Context;
5154
import org.slf4j.Logger;
5255
import org.slf4j.LoggerFactory;
@@ -60,8 +63,6 @@
6063
import java.util.Optional;
6164
import java.util.function.Predicate;
6265

63-
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
64-
6566
/** Flink Utility methods used by the operator. */
6667
public class FlinkUtils {
6768

@@ -214,13 +215,20 @@ public static void deleteZookeeperHAMetadata(Configuration conf) {
214215

215216
public static void deleteKubernetesHAMetadata(
216217
String clusterId, String namespace, KubernetesClient kubernetesClient) {
217-
kubernetesClient
218+
getFlinkKubernetesHaConfigmaps(clusterId, namespace, kubernetesClient).delete();
219+
}
220+
221+
private static FilterWatchListDeletable<ConfigMap, ConfigMapList, Resource<ConfigMap>>
222+
getFlinkKubernetesHaConfigmaps(
223+
String clusterId, String namespace, KubernetesClient kubernetesClient) {
224+
return kubernetesClient
218225
.configMaps()
219226
.inNamespace(namespace)
220-
.withLabels(
221-
KubernetesUtils.getConfigMapLabels(
222-
clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
223-
.delete();
227+
.withNewFilter()
228+
.withLabels(KubernetesUtils.getCommonLabels(clusterId))
229+
.withoutLabel(
230+
Constants.LABEL_COMPONENT_KEY, ConfigMapStore.LABEL_COMPONENT_AUTOSCALER)
231+
.endFilter();
224232
}
225233

226234
public static void deleteJobGraphInZookeeperHA(Configuration conf) throws Exception {
@@ -233,16 +241,8 @@ public static void deleteJobGraphInZookeeperHA(Configuration conf) throws Except
233241

234242
public static void deleteJobGraphInKubernetesHA(
235243
String clusterId, String namespace, KubernetesClient kubernetesClient) {
236-
// The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
237-
// them and delete job graph key
238-
final Map<String, String> haConfigMapLabels =
239-
KubernetesUtils.getConfigMapLabels(
240-
clusterId, Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
241-
final ConfigMapList configMaps =
242-
kubernetesClient
243-
.configMaps()
244-
.inNamespace(namespace)
245-
.withLabels(haConfigMapLabels)
244+
var configMaps =
245+
FlinkUtils.getFlinkKubernetesHaConfigmaps(clusterId, namespace, kubernetesClient)
246246
.list();
247247

248248
boolean shouldUpdate = false;
@@ -303,18 +303,11 @@ private static boolean isKubernetesHaMetadataAvailable(
303303
KubernetesClient kubernetesClient,
304304
Predicate<ConfigMap> cmPredicate) {
305305

306-
String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
307-
String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
308-
309-
var haConfigMapLabels =
310-
KubernetesUtils.getConfigMapLabels(
311-
clusterId, Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
312-
313306
var configMaps =
314-
kubernetesClient
315-
.configMaps()
316-
.inNamespace(namespace)
317-
.withLabels(haConfigMapLabels)
307+
FlinkUtils.getFlinkKubernetesHaConfigmaps(
308+
conf.get(KubernetesConfigOptions.CLUSTER_ID),
309+
conf.get(KubernetesConfigOptions.NAMESPACE),
310+
kubernetesClient)
318311
.list()
319312
.getItems();
320313

0 commit comments

Comments
 (0)