Skip to content

Commit ce3d191

Browse files
committed
[FLINK-36529] Allow flink version configs to be set to greater than given version
Signed-off-by: Thomas Cooper <[email protected]>
1 parent 5d29554 commit ce3d191

File tree

7 files changed

+277
-33
lines changed

7 files changed

+277
-33
lines changed

docs/content/docs/operations/configuration.md

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,43 @@ To learn more about metrics and logging configuration please refer to the dedica
5252

5353
### Flink Version and Namespace specific defaults
5454

55-
The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc).
55+
The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions, or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc.):
5656

5757
```
58-
# Flink Version specific defaults
59-
kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
60-
kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
61-
kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
62-
6358
# Namespace specific defaults
6459
kubernetes.operator.default-configuration.namespace.ns1.k1: v1
6560
kubernetes.operator.default-configuration.namespace.ns1.k2: v2
6661
kubernetes.operator.default-configuration.namespace.ns2.k1: v1
6762
```
6863

69-
Flink version specific defaults will have a higher precedence so namespace defaults would be overridden by the same key.
64+
Flink version specific defaults have a higher precedence, so namespace defaults will be overridden by version defaults with the same key.
65+
66+
Flink version defaults can also be suffixed by a `+` character after the version string. This indicates that the default applies to this Flink version and any higher version.
67+
68+
For example, taking the configuration below:
69+
```
70+
# Flink Version specific defaults
71+
kubernetes.operator.default-configuration.flink-version.v1_16+.k4: v4
72+
kubernetes.operator.default-configuration.flink-version.v1_16+.k5: v5
73+
kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
74+
kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
75+
kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
76+
kubernetes.operator.default-configuration.flink-version.v1_17.k5: v5.1
77+
```
78+
This would result in the defaults for Flink 1.17 being:
79+
```
80+
k1: v1
81+
k2: v2
82+
k3: v3
83+
k4: v4
84+
k5: v5.1
85+
```
86+
87+
**Note**: The configuration above sets `k5: v5` for all versions >= 1.16.
88+
However, this is overridden for Flink 1.17 to `v5.1`.
89+
But if you ran a Flink 1.18 deployment with this configuration, then the value of `k5` would be `v5` not `v5.1`. The `k5` override only applies to Flink 1.17.
90+
Adding a `+` to the Flink 1.17 `k5` default would apply the new value to all future versions.
91+
7092

7193
## Dynamic Operator Configuration
7294

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,37 @@
2525
public enum FlinkVersion {
2626
/** No longer supported since 1.7 operator release. */
2727
@Deprecated
28-
v1_13,
28+
v1_13(1,13),
2929
/** No longer supported since 1.7 operator release. */
3030
@Deprecated
31-
v1_14,
31+
v1_14(1,14),
3232
/** Deprecated since 1.10 operator release. */
3333
@Deprecated
34-
v1_15,
35-
v1_16,
36-
v1_17,
37-
v1_18,
38-
v1_19,
39-
v1_20;
34+
v1_15(1,15),
35+
v1_16(1,16),
36+
v1_17(1,17),
37+
v1_18(1,18),
38+
v1_19(1,19),
39+
v1_20(1,20);
40+
41+
/** The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. */
42+
private final int majorVersion;
43+
/** The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18.*/
44+
private final int minorVersion;
45+
46+
FlinkVersion(int major, int minor) {
47+
this.majorVersion = major;
48+
this.minorVersion = minor;
49+
}
4050

4151
public boolean isEqualOrNewer(FlinkVersion otherVersion) {
42-
return this.ordinal() >= otherVersion.ordinal();
52+
if (this.majorVersion > otherVersion.majorVersion) {
53+
return true;
54+
}
55+
if (this.majorVersion == otherVersion.majorVersion) {
56+
return this.minorVersion >= otherVersion.minorVersion;
57+
}
58+
return false;
4359
}
4460

4561
/**
@@ -54,4 +70,23 @@ public static FlinkVersion current() {
5470
public static boolean isSupported(FlinkVersion version) {
5571
return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
5672
}
73+
74+
/**
75+
* Returns the FlinkVersion associated with the supplied major and minor version integers.
76+
*
77+
* @param major The major part of the Flink version (e.g. 1 for 1.18.1).
78+
* @param minor The minor part of the Flink version (e.g. 18 for 1.18.1).
79+
* @throws IllegalArgumentException If the supplied major and minor version do not correspond
80+
* to a supported FlinkVersion.
81+
* @return The FlinkVersion associated with the supplied major and minor version integers.
82+
*/
83+
public static FlinkVersion fromMajorMinor(int major, int minor) {
84+
for (FlinkVersion version : values()) {
85+
if (version.majorVersion == major && version.minorVersion == minor) {
86+
return version;
87+
}
88+
}
89+
throw new IllegalArgumentException("Unknown Flink version: " + major + "." + minor);
90+
}
91+
5792
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.apache.flink.kubernetes.operator.api.spec;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
class FlinkVersionTest {
8+
9+
@Test
10+
void isEqualOrNewer() {
11+
assertFalse(FlinkVersion.v1_16.isEqualOrNewer(FlinkVersion.v1_17));
12+
assertTrue(FlinkVersion.v1_17.isEqualOrNewer(FlinkVersion.v1_17));
13+
assertTrue(FlinkVersion.v1_18.isEqualOrNewer(FlinkVersion.v1_17));
14+
}
15+
16+
@Test
17+
void current() {
18+
assertEquals(FlinkVersion.v1_20, FlinkVersion.current());
19+
}
20+
21+
@Test
22+
void isSupported() {
23+
assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_20));
24+
}
25+
26+
@Test
27+
void fromMajorMinor() {
28+
assertEquals(FlinkVersion.fromMajorMinor(1,17), FlinkVersion.v1_17);
29+
assertEquals(FlinkVersion.fromMajorMinor(1,18), FlinkVersion.v1_18);
30+
assertEquals(FlinkVersion.fromMajorMinor(1,19), FlinkVersion.v1_19);
31+
assertEquals(FlinkVersion.fromMajorMinor(1,20), FlinkVersion.v1_20);
32+
assertThrows(IllegalArgumentException.class, () -> FlinkVersion.fromMajorMinor(0,1));
33+
}
34+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@
4949
import org.slf4j.LoggerFactory;
5050

5151
import java.time.Duration;
52-
import java.util.Map;
53-
import java.util.Optional;
54-
import java.util.Set;
52+
import java.util.*;
5553
import java.util.concurrent.Executors;
5654
import java.util.concurrent.ScheduledExecutorService;
5755
import java.util.concurrent.TimeUnit;
5856
import java.util.concurrent.atomic.AtomicLong;
5957
import java.util.function.Consumer;
58+
import java.util.regex.Matcher;
59+
import java.util.regex.Pattern;
6060

6161
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
6262
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.applyJobConfig;
@@ -81,6 +81,14 @@ public class FlinkConfigManager {
8181
private final LoadingCache<Key, Configuration> cache;
8282
private final Consumer<Set<String>> namespaceListener;
8383

84+
protected static final Pattern FLINK_VERSION_PATTERN =
85+
Pattern.compile(
86+
VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.") +
87+
"v(?<major>\\d+)_(?<minor>\\d+)(?<gt>\\" +
88+
KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX +
89+
")?\\..*"
90+
);
91+
8492
@VisibleForTesting
8593
public FlinkConfigManager(Configuration defaultConfig) {
8694
this(defaultConfig, ns -> {}, true);
@@ -202,6 +210,58 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
202210
getDefaultConfig(namespace, flinkVersion));
203211
}
204212

213+
/**
214+
* This method will search the keys of the supplied map and find any that contain a flink version string that is
215+
* relevant to the supplied flink version.
216+
* <p>
217+
* Relevance is defined as any key with the {@link KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by
218+
* either the supplied flink version (with or without the {@link KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX})
219+
* or a lower flink version string followed by the {@link KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}.
220+
* <p>
221+
* Prefixes are returned in ascending order of flink version.
222+
*
223+
* @param baseConfMap The configuration map that should be searched for relevant Flink version prefixes.
224+
* @return A list of relevant Flink version prefixes in order of ascending Flink version.
225+
*/
226+
protected static List<String> getRelevantVersionPrefixes(Map<String, String> baseConfMap, FlinkVersion flinkVersion) {
227+
SortedMap<FlinkVersion, String> greaterThanVersionPrefixes = new TreeMap<>();
228+
229+
for (Map.Entry<String, String> entry : baseConfMap.entrySet()) {
230+
Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey());
231+
if (versionMatcher.matches() && versionMatcher.group("gt") != null) {
232+
try {
233+
FlinkVersion keyFlinkVersion = FlinkVersion.fromMajorMinor(
234+
Integer.parseInt(versionMatcher.group("major")),
235+
Integer.parseInt(versionMatcher.group("minor"))
236+
);
237+
if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) {
238+
greaterThanVersionPrefixes.put(
239+
keyFlinkVersion,
240+
VERSION_CONF_PREFIX +
241+
keyFlinkVersion +
242+
KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX +
243+
"."
244+
);
245+
}
246+
} catch (NumberFormatException numberFormatException) {
247+
LOG.warn("Unable to parse version number in config key: {}", entry.getKey());
248+
} catch (IllegalArgumentException illegalArgumentException) {
249+
LOG.warn("Unknown Flink version in config key: {}", entry.getKey());
250+
}
251+
}
252+
}
253+
254+
// Extract the prefixes from the sorted map, these will be ascending Flink version order
255+
List<String> sortedRelevantVersionPrefixes = new ArrayList<>(greaterThanVersionPrefixes.values());
256+
257+
// Add the current flink version prefix (without the greater than symbol) to the set.
258+
// Any current flink version prefix with the greater than symbol would already have been added
259+
// in the loop above.
260+
sortedRelevantVersionPrefixes.add(VERSION_CONF_PREFIX + flinkVersion + ".");
261+
262+
return sortedRelevantVersionPrefixes;
263+
}
264+
205265
/**
206266
* Get the base configuration for the given namespace and flink version combination. This is
207267
* different from the platform level base config as it may contain namespaces or version
@@ -220,7 +280,17 @@ public Configuration getDefaultConfig(String namespace, FlinkVersion flinkVersio
220280
}
221281

222282
if (flinkVersion != null) {
223-
applyDefault(VERSION_CONF_PREFIX + flinkVersion + ".", baseConfMap, conf);
283+
// Get a list of flink version configs that apply to this current flink version
284+
// That will include all versions that are equal to or lower than the current one
285+
// that are suffixed by a `+`
286+
List<String> versionPrefixes = getRelevantVersionPrefixes(baseConfMap, flinkVersion);
287+
288+
// The version prefixes are returned in ascending order of Flink version, so configs
289+
// attached to newer versions will override older ones. For example v1_16+.conf1 will
290+
// be overridden if a key containing v1_18+.conf1 is present.
291+
for (String versionPrefix : versionPrefixes) {
292+
applyDefault(versionPrefix, baseConfMap, conf);
293+
}
224294
}
225295

226296
return conf;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class KubernetesOperatorConfigOptions {
4040

4141
private static final String DEFAULT_CONF_PREFIX = K8S_OP_CONF_PREFIX + "default-configuration.";
4242
public static final String VERSION_CONF_PREFIX = DEFAULT_CONF_PREFIX + "flink-version.";
43+
public static final String FLINK_VERSION_GREATER_THAN_SUFFIX = "+";
4344
public static final String NAMESPACE_CONF_PREFIX = DEFAULT_CONF_PREFIX + "namespace.";
4445
public static final String SECTION_SYSTEM = "system";
4546
public static final String SECTION_ADVANCED = "system_advanced";

0 commit comments

Comments
 (0)