Skip to content

Commit 1cd9f7b

Browse files
committed
Copy FlinkConfMountDecorator
1 parent cbb8c8d commit 1cd9f7b

File tree

1 file changed

+200
-0
lines changed

1 file changed

+200
-0
lines changed
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.kubernetes.kubeclient.decorators;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.ConfigurationUtils;
24+
import org.apache.flink.configuration.DeploymentOptionsInternal;
25+
import org.apache.flink.configuration.GlobalConfiguration;
26+
import org.apache.flink.configuration.JobManagerOptions;
27+
import org.apache.flink.configuration.RestOptions;
28+
import org.apache.flink.configuration.TaskManagerOptions;
29+
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
30+
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
31+
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
32+
import org.apache.flink.kubernetes.utils.Constants;
33+
34+
import org.apache.flink.shaded.guava31.com.google.common.io.Files;
35+
36+
import io.fabric8.kubernetes.api.model.ConfigMap;
37+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
38+
import io.fabric8.kubernetes.api.model.Container;
39+
import io.fabric8.kubernetes.api.model.ContainerBuilder;
40+
import io.fabric8.kubernetes.api.model.HasMetadata;
41+
import io.fabric8.kubernetes.api.model.KeyToPath;
42+
import io.fabric8.kubernetes.api.model.KeyToPathBuilder;
43+
import io.fabric8.kubernetes.api.model.Pod;
44+
import io.fabric8.kubernetes.api.model.PodBuilder;
45+
import io.fabric8.kubernetes.api.model.Volume;
46+
import io.fabric8.kubernetes.api.model.VolumeBuilder;
47+
48+
import java.io.File;
49+
import java.io.IOException;
50+
import java.io.PrintWriter;
51+
import java.io.StringWriter;
52+
import java.nio.charset.StandardCharsets;
53+
import java.util.ArrayList;
54+
import java.util.Collections;
55+
import java.util.HashMap;
56+
import java.util.List;
57+
import java.util.Map;
58+
import java.util.stream.Collectors;
59+
60+
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
61+
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
62+
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX;
63+
import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
64+
import static org.apache.flink.util.Preconditions.checkNotNull;
65+
66+
/**
67+
* Mounts the log4j.properties, logback.xml, and config.yaml configuration on the JobManager or
68+
* TaskManager pod.
69+
*/
70+
public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
71+
72+
private final AbstractKubernetesParameters kubernetesComponentConf;
73+
74+
public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {
75+
this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);
76+
}
77+
78+
@Override
79+
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
80+
final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());
81+
82+
final Container mountedMainContainer =
83+
new ContainerBuilder(flinkPod.getMainContainer())
84+
.addNewVolumeMount()
85+
.withName(FLINK_CONF_VOLUME)
86+
.withMountPath(kubernetesComponentConf.getFlinkConfDirInPod())
87+
.endVolumeMount()
88+
.build();
89+
90+
return new FlinkPod.Builder(flinkPod)
91+
.withPod(mountedPod)
92+
.withMainContainer(mountedMainContainer)
93+
.build();
94+
}
95+
96+
private Pod decoratePod(Pod pod) {
97+
final List<KeyToPath> keyToPaths =
98+
getLocalLogConfFiles().stream()
99+
.map(
100+
file ->
101+
new KeyToPathBuilder()
102+
.withKey(file.getName())
103+
.withPath(file.getName())
104+
.build())
105+
.collect(Collectors.toList());
106+
keyToPaths.add(
107+
new KeyToPathBuilder()
108+
.withKey(GlobalConfiguration.getFlinkConfFilename())
109+
.withPath(GlobalConfiguration.getFlinkConfFilename())
110+
.build());
111+
112+
final Volume flinkConfVolume =
113+
new VolumeBuilder()
114+
.withName(FLINK_CONF_VOLUME)
115+
.withNewConfigMap()
116+
.withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId()))
117+
.withItems(keyToPaths)
118+
.endConfigMap()
119+
.build();
120+
121+
return new PodBuilder(pod)
122+
.editSpec()
123+
.addNewVolumeLike(flinkConfVolume)
124+
.endVolume()
125+
.endSpec()
126+
.build();
127+
}
128+
129+
@Override
130+
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
131+
final String clusterId = kubernetesComponentConf.getClusterId();
132+
133+
final Map<String, String> data = new HashMap<>();
134+
135+
final List<File> localLogFiles = getLocalLogConfFiles();
136+
for (File file : localLogFiles) {
137+
data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8));
138+
}
139+
140+
final List<String> confData =
141+
getClusterSideConfData(kubernetesComponentConf.getFlinkConfiguration());
142+
data.put(GlobalConfiguration.getFlinkConfFilename(), getFlinkConfData(confData));
143+
144+
final ConfigMap flinkConfConfigMap =
145+
new ConfigMapBuilder()
146+
.withApiVersion(Constants.API_VERSION)
147+
.withNewMetadata()
148+
.withName(getFlinkConfConfigMapName(clusterId))
149+
.withLabels(kubernetesComponentConf.getCommonLabels())
150+
.endMetadata()
151+
.addToData(data)
152+
.build();
153+
154+
return Collections.singletonList(flinkConfConfigMap);
155+
}
156+
157+
/** Get properties map for the cluster-side after removal of some keys. */
158+
private List<String> getClusterSideConfData(Configuration flinkConfig) {
159+
final Configuration clusterSideConfig = flinkConfig.clone();
160+
// Remove some configuration options that should not be taken to cluster side.
161+
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
162+
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
163+
clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS);
164+
clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST);
165+
clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
166+
clusterSideConfig.removeConfig(TaskManagerOptions.HOST);
167+
return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false);
168+
}
169+
170+
@VisibleForTesting
171+
String getFlinkConfData(List<String> confData) throws IOException {
172+
try (StringWriter sw = new StringWriter();
173+
PrintWriter out = new PrintWriter(sw)) {
174+
confData.forEach(out::println);
175+
176+
return sw.toString();
177+
}
178+
}
179+
180+
private List<File> getLocalLogConfFiles() {
181+
final String confDir = kubernetesComponentConf.getConfigDirectory();
182+
final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
183+
final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
184+
185+
List<File> localLogConfFiles = new ArrayList<>();
186+
if (logbackFile.exists()) {
187+
localLogConfFiles.add(logbackFile);
188+
}
189+
if (log4jFile.exists()) {
190+
localLogConfFiles.add(log4jFile);
191+
}
192+
193+
return localLogConfFiles;
194+
}
195+
196+
@VisibleForTesting
197+
public static String getFlinkConfConfigMapName(String clusterId) {
198+
return CONFIG_MAP_PREFIX + clusterId;
199+
}
200+
}

0 commit comments

Comments
 (0)