Skip to content

Commit 28f9c67

Browse files
committed
[FLINK-36529] Allow flink version configs to be set to greater than given version
Signed-off-by: Thomas Cooper <[email protected]>
1 parent 5d29554 commit 28f9c67

File tree

7 files changed

+292
-23
lines changed

7 files changed

+292
-23
lines changed

docs/content/docs/operations/configuration.md

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,43 @@ To learn more about metrics and logging configuration please refer to the dedica
5252

5353
### Flink Version and Namespace specific defaults
5454

55-
The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc).
55+
The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions, or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc.):
5656

5757
```
58-
# Flink Version specific defaults
59-
kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
60-
kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
61-
kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
62-
6358
# Namespace specific defaults
6459
kubernetes.operator.default-configuration.namespace.ns1.k1: v1
6560
kubernetes.operator.default-configuration.namespace.ns1.k2: v2
6661
kubernetes.operator.default-configuration.namespace.ns2.k1: v1
6762
```
6863

69-
Flink version specific defaults will have a higher precedence so namespace defaults would be overridden by the same key.
64+
Flink version specific defaults have a higher precedence, so namespace defaults will be overridden by version defaults with the same key.
65+
66+
Flink version defaults can also be suffixed by a `+` character after the version string. This indicates that the default applies to this Flink version and any higher version.
67+
68+
For example, taking the configuration below:
69+
```
70+
# Flink Version specific defaults
71+
kubernetes.operator.default-configuration.flink-version.v1_16+.k4: v4
72+
kubernetes.operator.default-configuration.flink-version.v1_16+.k5: v5
73+
kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
74+
kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
75+
kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
76+
kubernetes.operator.default-configuration.flink-version.v1_17.k5: v5.1
77+
```
78+
This would result in the defaults for Flink 1.17 being:
79+
```
80+
k1: v1
81+
k2: v2
82+
k3: v3
83+
k4: v4
84+
k5: v5.1
85+
```
86+
87+
**Note**: The configuration above sets `k5: v5` for all versions >= 1.16.
88+
However, this is overridden for Flink 1.17 to `v5.1`.
89+
But if you ran a Flink 1.18 deployment with this configuration, then the value of `k5` would be `v5` not `v5.1`. The `k5` override only applies to Flink 1.17.
90+
Adding a `+` to the Flink 1.17 `k5` default would apply the new value to all future versions.
91+
7092

7193
## Dynamic Operator Configuration
7294

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,38 @@
2525
public enum FlinkVersion {
2626
/** No longer supported since 1.7 operator release. */
2727
@Deprecated
28-
v1_13,
28+
v1_13(1, 13),
2929
/** No longer supported since 1.7 operator release. */
3030
@Deprecated
31-
v1_14,
31+
v1_14(1, 14),
3232
/** Deprecated since 1.10 operator release. */
3333
@Deprecated
34-
v1_15,
35-
v1_16,
36-
v1_17,
37-
v1_18,
38-
v1_19,
39-
v1_20;
34+
v1_15(1, 15),
35+
v1_16(1, 16),
36+
v1_17(1, 17),
37+
v1_18(1, 18),
38+
v1_19(1, 19),
39+
v1_20(1, 20);
40+
41+
/** The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. */
42+
private final int majorVersion;
43+
44+
/** The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18. */
45+
private final int minorVersion;
46+
47+
FlinkVersion(int major, int minor) {
48+
this.majorVersion = major;
49+
this.minorVersion = minor;
50+
}
4051

4152
public boolean isEqualOrNewer(FlinkVersion otherVersion) {
42-
return this.ordinal() >= otherVersion.ordinal();
53+
if (this.majorVersion > otherVersion.majorVersion) {
54+
return true;
55+
}
56+
if (this.majorVersion == otherVersion.majorVersion) {
57+
return this.minorVersion >= otherVersion.minorVersion;
58+
}
59+
return false;
4360
}
4461

4562
/**
@@ -54,4 +71,22 @@ public static FlinkVersion current() {
5471
public static boolean isSupported(FlinkVersion version) {
5572
return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
5673
}
74+
75+
/**
76+
* Returns the FlinkVersion associated with the supplied major and minor version integers.
77+
*
78+
* @param major The major part of the Flink version (e.g. 1 for 1.18.1).
79+
* @param minor The minor part of the Flink version (e.g. 18 for 1.18.1).
80+
* @throws IllegalArgumentException If the supplied major and minor version do not correspond to
81+
* a supported FlinkVersion.
82+
* @return The FlinkVersion associated with the supplied major and minor version integers.
83+
*/
84+
public static FlinkVersion fromMajorMinor(int major, int minor) {
85+
for (FlinkVersion version : values()) {
86+
if (version.majorVersion == major && version.minorVersion == minor) {
87+
return version;
88+
}
89+
}
90+
throw new IllegalArgumentException("Unknown Flink version: " + major + "." + minor);
91+
}
5792
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.apache.flink.kubernetes.operator.api.spec;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
import static org.junit.jupiter.api.Assertions.assertFalse;
7+
import static org.junit.jupiter.api.Assertions.assertThrows;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
class FlinkVersionTest {
11+
12+
@Test
13+
void isEqualOrNewer() {
14+
assertFalse(FlinkVersion.v1_16.isEqualOrNewer(FlinkVersion.v1_17));
15+
assertTrue(FlinkVersion.v1_17.isEqualOrNewer(FlinkVersion.v1_17));
16+
assertTrue(FlinkVersion.v1_18.isEqualOrNewer(FlinkVersion.v1_17));
17+
}
18+
19+
@Test
20+
void current() {
21+
assertEquals(FlinkVersion.v1_20, FlinkVersion.current());
22+
}
23+
24+
@Test
25+
void isSupported() {
26+
assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_20));
27+
}
28+
29+
@Test
30+
void fromMajorMinor() {
31+
assertEquals(FlinkVersion.fromMajorMinor(1, 17), FlinkVersion.v1_17);
32+
assertEquals(FlinkVersion.fromMajorMinor(1, 18), FlinkVersion.v1_18);
33+
assertEquals(FlinkVersion.fromMajorMinor(1, 19), FlinkVersion.v1_19);
34+
assertEquals(FlinkVersion.fromMajorMinor(1, 20), FlinkVersion.v1_20);
35+
assertThrows(IllegalArgumentException.class, () -> FlinkVersion.fromMajorMinor(0, 1));
36+
}
37+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,20 @@
4949
import org.slf4j.LoggerFactory;
5050

5151
import java.time.Duration;
52+
import java.util.ArrayList;
53+
import java.util.List;
5254
import java.util.Map;
5355
import java.util.Optional;
5456
import java.util.Set;
57+
import java.util.SortedMap;
58+
import java.util.TreeMap;
5559
import java.util.concurrent.Executors;
5660
import java.util.concurrent.ScheduledExecutorService;
5761
import java.util.concurrent.TimeUnit;
5862
import java.util.concurrent.atomic.AtomicLong;
5963
import java.util.function.Consumer;
64+
import java.util.regex.Matcher;
65+
import java.util.regex.Pattern;
6066

6167
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
6268
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.applyJobConfig;
@@ -81,6 +87,13 @@ public class FlinkConfigManager {
8187
private final LoadingCache<Key, Configuration> cache;
8288
private final Consumer<Set<String>> namespaceListener;
8389

90+
protected static final Pattern FLINK_VERSION_PATTERN =
91+
Pattern.compile(
92+
VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.")
93+
+ "v(?<major>\\d+)_(?<minor>\\d+)(?<gt>\\"
94+
+ KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX
95+
+ ")?\\..*");
96+
8497
@VisibleForTesting
8598
public FlinkConfigManager(Configuration defaultConfig) {
8699
this(defaultConfig, ns -> {}, true);
@@ -202,6 +215,65 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
202215
getDefaultConfig(namespace, flinkVersion));
203216
}
204217

218+
/**
219+
* This method will search the keys of the supplied map and find any that contain a flink
220+
* version string that is relevant to the supplied flink version.
221+
*
222+
* <p>Relevance is defined as any key with the {@link
223+
* KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by either the supplied flink
224+
* version (with or without the {@link
225+
* KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}) or a lower flink version
226+
* string followed by the {@link
227+
* KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}.
228+
*
229+
* <p>Prefixes are returned in ascending order of flink version.
230+
*
231+
* @param baseConfMap The configuration map that should be searched for relevant Flink version
232+
* prefixes.
233+
* @return A list of relevant Flink version prefixes in order of ascending Flink version.
234+
*/
235+
protected static List<String> getRelevantVersionPrefixes(
236+
Map<String, String> baseConfMap, FlinkVersion flinkVersion) {
237+
SortedMap<FlinkVersion, String> greaterThanVersionPrefixes = new TreeMap<>();
238+
239+
for (Map.Entry<String, String> entry : baseConfMap.entrySet()) {
240+
Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey());
241+
if (versionMatcher.matches() && versionMatcher.group("gt") != null) {
242+
try {
243+
FlinkVersion keyFlinkVersion =
244+
FlinkVersion.fromMajorMinor(
245+
Integer.parseInt(versionMatcher.group("major")),
246+
Integer.parseInt(versionMatcher.group("minor")));
247+
if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) {
248+
greaterThanVersionPrefixes.put(
249+
keyFlinkVersion,
250+
VERSION_CONF_PREFIX
251+
+ keyFlinkVersion
252+
+ KubernetesOperatorConfigOptions
253+
.FLINK_VERSION_GREATER_THAN_SUFFIX
254+
+ ".");
255+
}
256+
} catch (NumberFormatException numberFormatException) {
257+
LOG.warn("Unable to parse version number in config key: {}", entry.getKey());
258+
} catch (IllegalArgumentException illegalArgumentException) {
259+
LOG.warn("Unknown Flink version in config key: {}", entry.getKey());
260+
}
261+
}
262+
}
263+
264+
// Extract the prefixes from the sorted map, these will be ascending Flink version order
265+
List<String> sortedRelevantVersionPrefixes =
266+
new ArrayList<>(greaterThanVersionPrefixes.values());
267+
268+
// Add the current flink version prefix (without the greater than symbol) to the set.
269+
// Any current flink version prefix with the greater than symbol would already have been
270+
// added
271+
// in the loop above.
272+
sortedRelevantVersionPrefixes.add(VERSION_CONF_PREFIX + flinkVersion + ".");
273+
274+
return sortedRelevantVersionPrefixes;
275+
}
276+
205277
/**
206278
* Get the base configuration for the given namespace and flink version combination. This is
207279
* different from the platform level base config as it may contain namespaces or version
@@ -220,7 +292,17 @@ public Configuration getDefaultConfig(String namespace, FlinkVersion flinkVersio
220292
}
221293

222294
if (flinkVersion != null) {
223-
applyDefault(VERSION_CONF_PREFIX + flinkVersion + ".", baseConfMap, conf);
295+
// Get a list of flink version configs that apply to this current flink version
296+
// That will include all versions that are equal to or lower than the current one
297+
// that are suffixed by a `+`
298+
List<String> versionPrefixes = getRelevantVersionPrefixes(baseConfMap, flinkVersion);
299+
300+
// The version prefixes are returned in ascending order of Flink version, so configs
301+
// attached to newer versions will override older ones. For example v1_16+.conf1 will
302+
// be overridden if a key containing v1_18+.conf1 is present.
303+
for (String versionPrefix : versionPrefixes) {
304+
applyDefault(versionPrefix, baseConfMap, conf);
305+
}
224306
}
225307

226308
return conf;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class KubernetesOperatorConfigOptions {
4040

4141
private static final String DEFAULT_CONF_PREFIX = K8S_OP_CONF_PREFIX + "default-configuration.";
4242
public static final String VERSION_CONF_PREFIX = DEFAULT_CONF_PREFIX + "flink-version.";
43+
public static final String FLINK_VERSION_GREATER_THAN_SUFFIX = "+";
4344
public static final String NAMESPACE_CONF_PREFIX = DEFAULT_CONF_PREFIX + "namespace.";
4445
public static final String SECTION_SYSTEM = "system";
4546
public static final String SECTION_ADVANCED = "system_advanced";

0 commit comments

Comments
 (0)