Skip to content

Commit 9bab028

Browse files
authored
[FLINK-36529] Allow Flink version configs to be set to greater than given version
1 parent 4683333 commit 9bab028

File tree

8 files changed

+322
-23
lines changed

8 files changed

+322
-23
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
117117
| v1_18 | |
118118
| v1_19 | |
119119
| v1_20 | |
120+
| majorVersion | int | The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. |
121+
| minorVersion | int | The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18. |
120122

121123
### IngressSpec
122124
**Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec

docs/content/docs/operations/configuration.md

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,43 @@ To learn more about metrics and logging configuration please refer to the dedica
5252

5353
### Flink Version and Namespace specific defaults
5454

55-
The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc).
55+
The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions, or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc.):
5656

5757
```
58-
# Flink Version specific defaults
59-
kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
60-
kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
61-
kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
62-
6358
# Namespace specific defaults
6459
kubernetes.operator.default-configuration.namespace.ns1.k1: v1
6560
kubernetes.operator.default-configuration.namespace.ns1.k2: v2
6661
kubernetes.operator.default-configuration.namespace.ns2.k1: v1
6762
```
6863

69-
Flink version specific defaults will have a higher precedence so namespace defaults would be overridden by the same key.
64+
Flink version specific defaults have a higher precedence, so namespace defaults will be overridden by version defaults with the same key.
65+
66+
Flink version defaults can also be suffixed by a `+` character after the version string. This indicates that the default applies to this Flink version and any higher version.
67+
68+
For example, taking the configuration below:
69+
```
70+
# Flink Version specific defaults
71+
kubernetes.operator.default-configuration.flink-version.v1_16+.k4: v4
72+
kubernetes.operator.default-configuration.flink-version.v1_16+.k5: v5
73+
kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
74+
kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
75+
kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
76+
kubernetes.operator.default-configuration.flink-version.v1_17.k5: v5.1
77+
```
78+
This would result in the defaults for Flink 1.17 being:
79+
```
80+
k1: v1
81+
k2: v2
82+
k3: v3
83+
k4: v4
84+
k5: v5.1
85+
```
86+
87+
**Note**: The configuration above sets `k5: v5` for all versions >= 1.16.
88+
However, this is overridden for Flink 1.17 to `v5.1`.
89+
But if you ran a Flink 1.18 deployment with this configuration, then the value of `k5` would be `v5` not `v5.1`. The `k5` override only applies to Flink 1.17.
90+
Adding a `+` to the Flink 1.17 `k5` default would apply the new value to all future versions.
91+
7092

7193
## Dynamic Operator Configuration
7294

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,38 @@
2525
public enum FlinkVersion {
2626
/** No longer supported since 1.7 operator release. */
2727
@Deprecated
28-
v1_13,
28+
v1_13(1, 13),
2929
/** No longer supported since 1.7 operator release. */
3030
@Deprecated
31-
v1_14,
31+
v1_14(1, 14),
3232
/** Deprecated since 1.10 operator release. */
3333
@Deprecated
34-
v1_15,
35-
v1_16,
36-
v1_17,
37-
v1_18,
38-
v1_19,
39-
v1_20;
34+
v1_15(1, 15),
35+
v1_16(1, 16),
36+
v1_17(1, 17),
37+
v1_18(1, 18),
38+
v1_19(1, 19),
39+
v1_20(1, 20);
40+
41+
/** The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. */
42+
private final int majorVersion;
43+
44+
/** The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18. */
45+
private final int minorVersion;
46+
47+
FlinkVersion(int major, int minor) {
48+
this.majorVersion = major;
49+
this.minorVersion = minor;
50+
}
4051

4152
public boolean isEqualOrNewer(FlinkVersion otherVersion) {
42-
return this.ordinal() >= otherVersion.ordinal();
53+
if (this.majorVersion > otherVersion.majorVersion) {
54+
return true;
55+
}
56+
if (this.majorVersion == otherVersion.majorVersion) {
57+
return this.minorVersion >= otherVersion.minorVersion;
58+
}
59+
return false;
4360
}
4461

4562
/**
@@ -54,4 +71,22 @@ public static FlinkVersion current() {
5471
public static boolean isSupported(FlinkVersion version) {
5572
return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
5673
}
74+
75+
/**
76+
* Returns the FlinkVersion associated with the supplied major and minor version integers.
77+
*
78+
* @param major The major part of the Flink version (e.g. 1 for 1.18.1).
79+
* @param minor The minor part of the Flink version (e.g. 18 for 1.18.1).
80+
* @throws IllegalArgumentException If the supplied major and minor version do not correspond to
81+
* a supported FlinkVersion.
82+
* @return The FlinkVersion associated with the supplied major and minor version integers.
83+
*/
84+
public static FlinkVersion fromMajorMinor(int major, int minor) {
85+
for (FlinkVersion version : values()) {
86+
if (version.majorVersion == major && version.minorVersion == minor) {
87+
return version;
88+
}
89+
}
90+
throw new IllegalArgumentException("Unknown Flink version: " + major + "." + minor);
91+
}
5792
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertFalse;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
27+
class FlinkVersionTest {
28+
29+
@Test
30+
void isEqualOrNewer() {
31+
assertFalse(FlinkVersion.v1_16.isEqualOrNewer(FlinkVersion.v1_17));
32+
assertTrue(FlinkVersion.v1_17.isEqualOrNewer(FlinkVersion.v1_17));
33+
assertTrue(FlinkVersion.v1_18.isEqualOrNewer(FlinkVersion.v1_17));
34+
}
35+
36+
@Test
37+
void current() {
38+
assertEquals(FlinkVersion.v1_20, FlinkVersion.current());
39+
}
40+
41+
@Test
42+
void isSupported() {
43+
assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_20));
44+
}
45+
46+
@Test
47+
void fromMajorMinor() {
48+
assertEquals(FlinkVersion.fromMajorMinor(1, 17), FlinkVersion.v1_17);
49+
assertEquals(FlinkVersion.fromMajorMinor(1, 18), FlinkVersion.v1_18);
50+
assertEquals(FlinkVersion.fromMajorMinor(1, 19), FlinkVersion.v1_19);
51+
assertEquals(FlinkVersion.fromMajorMinor(1, 20), FlinkVersion.v1_20);
52+
assertThrows(IllegalArgumentException.class, () -> FlinkVersion.fromMajorMinor(0, 1));
53+
}
54+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,21 @@
4949
import org.slf4j.LoggerFactory;
5050

5151
import java.time.Duration;
52+
import java.util.ArrayList;
53+
import java.util.HashMap;
54+
import java.util.List;
5255
import java.util.Map;
5356
import java.util.Optional;
5457
import java.util.Set;
58+
import java.util.SortedMap;
59+
import java.util.TreeMap;
5560
import java.util.concurrent.Executors;
5661
import java.util.concurrent.ScheduledExecutorService;
5762
import java.util.concurrent.TimeUnit;
5863
import java.util.concurrent.atomic.AtomicLong;
5964
import java.util.function.Consumer;
65+
import java.util.regex.Matcher;
66+
import java.util.regex.Pattern;
6067

6168
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
6269
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.applyJobConfig;
@@ -80,6 +87,14 @@ public class FlinkConfigManager {
8087
private final AtomicLong defaultConfigVersion = new AtomicLong(0);
8188
private final LoadingCache<Key, Configuration> cache;
8289
private final Consumer<Set<String>> namespaceListener;
90+
private volatile Map<FlinkVersion, List<String>> relevantFlinkVersionPrefixes;
91+
92+
protected static final Pattern FLINK_VERSION_PATTERN =
93+
Pattern.compile(
94+
VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.")
95+
+ "v(?<major>\\d+)_(?<minor>\\d+)(?<gt>\\"
96+
+ KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX
97+
+ ")?\\..*");
8398

8499
@VisibleForTesting
85100
public FlinkConfigManager(Configuration defaultConfig) {
@@ -99,6 +114,7 @@ public FlinkConfigManager(
99114
this.namespaceListener = namespaceListener;
100115
Duration cacheTimeout =
101116
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
117+
this.relevantFlinkVersionPrefixes = new HashMap<>();
102118
this.cache =
103119
CacheBuilder.newBuilder()
104120
.maximumSize(
@@ -169,6 +185,11 @@ public void updateDefaultConfig(Configuration newConf) {
169185
// We do not invalidate the cache to avoid deleting currently used temp files,
170186
// simply bump the version
171187
this.defaultConfigVersion.incrementAndGet();
188+
189+
// We clear the cached relevant Flink version prefixes as the base config may include new
190+
// version overrides.
191+
// This will trigger a regeneration of the prefixes in the next call to getDefaultConfig.
192+
relevantFlinkVersionPrefixes = new HashMap<>();
172193
}
173194

174195
/**
@@ -202,6 +223,65 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
202223
getDefaultConfig(namespace, flinkVersion));
203224
}
204225

226+
/**
227+
* This method will search the keys of the supplied map and find any that contain a flink
228+
* version string that is relevant to the supplied flink version.
229+
*
230+
* <p>Relevance is defined as any key with the {@link
231+
* KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by either the supplied flink
232+
* version (with or without the {@link
233+
* KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}) or a lower flink version
234+
* string followed by the {@link
235+
* KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}.
236+
*
237+
* <p>Prefixes are returned in ascending order of flink version.
238+
*
239+
* @param baseConfMap The configuration map that should be searched for relevant Flink version
240+
* prefixes.
241+
* @return A list of relevant Flink version prefixes in order of ascending Flink version.
242+
*/
243+
protected static List<String> getRelevantVersionPrefixes(
244+
Map<String, String> baseConfMap, FlinkVersion flinkVersion) {
245+
SortedMap<FlinkVersion, String> greaterThanVersionPrefixes = new TreeMap<>();
246+
247+
for (Map.Entry<String, String> entry : baseConfMap.entrySet()) {
248+
Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey());
249+
if (versionMatcher.matches() && versionMatcher.group("gt") != null) {
250+
try {
251+
FlinkVersion keyFlinkVersion =
252+
FlinkVersion.fromMajorMinor(
253+
Integer.parseInt(versionMatcher.group("major")),
254+
Integer.parseInt(versionMatcher.group("minor")));
255+
if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) {
256+
greaterThanVersionPrefixes.put(
257+
keyFlinkVersion,
258+
VERSION_CONF_PREFIX
259+
+ keyFlinkVersion
260+
+ KubernetesOperatorConfigOptions
261+
.FLINK_VERSION_GREATER_THAN_SUFFIX
262+
+ ".");
263+
}
264+
} catch (NumberFormatException numberFormatException) {
265+
LOG.warn("Unable to parse version number in config key: {}", entry.getKey());
266+
} catch (IllegalArgumentException illegalArgumentException) {
267+
LOG.warn("Unknown Flink version in config key: {}", entry.getKey());
268+
}
269+
}
270+
}
271+
272+
// Extract the prefixes from the sorted map, these will be ascending Flink version order
273+
List<String> sortedRelevantVersionPrefixes =
274+
new ArrayList<>(greaterThanVersionPrefixes.values());
275+
276+
// Add the current flink version prefix (without the greater than symbol) to the set.
277+
// Any current flink version prefix with the greater than symbol would already have been
278+
// added
279+
// in the loop above.
280+
sortedRelevantVersionPrefixes.add(VERSION_CONF_PREFIX + flinkVersion + ".");
281+
282+
return sortedRelevantVersionPrefixes;
283+
}
284+
205285
/**
206286
* Get the base configuration for the given namespace and flink version combination. This is
207287
* different from the platform level base config as it may contain namespaces or version
@@ -220,7 +300,20 @@ public Configuration getDefaultConfig(String namespace, FlinkVersion flinkVersio
220300
}
221301

222302
if (flinkVersion != null) {
223-
applyDefault(VERSION_CONF_PREFIX + flinkVersion + ".", baseConfMap, conf);
303+
// Fetch or create a list of Flink version configs that apply to this current
304+
// FlinkVersion. That will include all versions that are equal to or lower than
305+
// the current one that are suffixed by a `+`
306+
List<String> versionPrefixes =
307+
relevantFlinkVersionPrefixes.computeIfAbsent(
308+
flinkVersion,
309+
fv -> getRelevantVersionPrefixes(baseConfMap, flinkVersion));
310+
311+
// The version prefixes are returned in ascending order of Flink version, so configs
312+
// attached to newer versions will override older ones. For example v1_16+.conf1 will
313+
// be overridden if a key containing v1_18+.conf1 is present.
314+
for (String versionPrefix : versionPrefixes) {
315+
applyDefault(versionPrefix, baseConfMap, conf);
316+
}
224317
}
225318

226319
return conf;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class KubernetesOperatorConfigOptions {
4040

4141
private static final String DEFAULT_CONF_PREFIX = K8S_OP_CONF_PREFIX + "default-configuration.";
4242
public static final String VERSION_CONF_PREFIX = DEFAULT_CONF_PREFIX + "flink-version.";
43+
public static final String FLINK_VERSION_GREATER_THAN_SUFFIX = "+";
4344
public static final String NAMESPACE_CONF_PREFIX = DEFAULT_CONF_PREFIX + "namespace.";
4445
public static final String SECTION_SYSTEM = "system";
4546
public static final String SECTION_ADVANCED = "system_advanced";

0 commit comments

Comments
 (0)