diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 600f6289a5..34051837cb 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -117,6 +117,8 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | v1_18 | | | v1_19 | | | v1_20 | | +| majorVersion | int | The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. | +| minorVersion | int | The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18. | ### IngressSpec **Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md index 6612376295..1839562999 100644 --- a/docs/content/docs/operations/configuration.md +++ b/docs/content/docs/operations/configuration.md @@ -52,21 +52,43 @@ To learn more about metrics and logging configuration please refer to the dedica ### Flink Version and Namespace specific defaults -The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc). +The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions, or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc.): ``` -# Flink Version specific defaults -kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1 -kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2 -kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3 - # Namespace specific defaults kubernetes.operator.default-configuration.namespace.ns1.k1: v1 kubernetes.operator.default-configuration.namespace.ns1.k2: v2 kubernetes.operator.default-configuration.namespace.ns2.k1: v1 ``` -Flink version specific defaults will have a higher precedence so namespace defaults would be overridden by the same key. +Flink version specific defaults have a higher precedence, so namespace defaults will be overridden by version defaults with the same key. + +Flink version defaults can also be suffixed by a `+` character after the version string. This indicates that the default applies to this Flink version and any higher version. + +For example, taking the configuration below: +``` +# Flink Version specific defaults +kubernetes.operator.default-configuration.flink-version.v1_16+.k4: v4 +kubernetes.operator.default-configuration.flink-version.v1_16+.k5: v5 +kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1 +kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2 +kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3 +kubernetes.operator.default-configuration.flink-version.v1_17.k5: v5.1 +``` +This would result in the defaults for Flink 1.17 being: +``` +k1: v1 +k2: v2 +k3: v3 +k4: v4 +k5: v5.1 +``` + +**Note**: The configuration above sets `k5: v5` for all versions >= 1.16. +However, this is overridden for Flink 1.17 to `v5.1`. +But if you ran a Flink 1.18 deployment with this configuration, then the value of `k5` would be `v5` not `v5.1`. The `k5` override only applies to Flink 1.17. +Adding a `+` to the Flink 1.17 `k5` default would apply the new value to all future versions. + ## Dynamic Operator Configuration diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java index 634fa6f8e7..1734e95b81 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java @@ -25,21 +25,38 @@ public enum FlinkVersion { /** No longer supported since 1.7 operator release. */ @Deprecated - v1_13, + v1_13(1, 13), /** No longer supported since 1.7 operator release. */ @Deprecated - v1_14, + v1_14(1, 14), /** Deprecated since 1.10 operator release. */ @Deprecated - v1_15, - v1_16, - v1_17, - v1_18, - v1_19, - v1_20; + v1_15(1, 15), + v1_16(1, 16), + v1_17(1, 17), + v1_18(1, 18), + v1_19(1, 19), + v1_20(1, 20); + + /** The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. */ + private final int majorVersion; + + /** The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18. */ + private final int minorVersion; + + FlinkVersion(int major, int minor) { + this.majorVersion = major; + this.minorVersion = minor; + } public boolean isEqualOrNewer(FlinkVersion otherVersion) { - return this.ordinal() >= otherVersion.ordinal(); + if (this.majorVersion > otherVersion.majorVersion) { + return true; + } + if (this.majorVersion == otherVersion.majorVersion) { + return this.minorVersion >= otherVersion.minorVersion; + } + return false; } /** @@ -54,4 +71,22 @@ public static FlinkVersion current() { public static boolean isSupported(FlinkVersion version) { return version != null && version.isEqualOrNewer(FlinkVersion.v1_15); } + + /** + * Returns the FlinkVersion associated with the supplied major and minor version integers. + * + * @param major The major part of the Flink version (e.g. 1 for 1.18.1). + * @param minor The minor part of the Flink version (e.g. 18 for 1.18.1). + * @throws IllegalArgumentException If the supplied major and minor version do not correspond to + * a supported FlinkVersion. + * @return The FlinkVersion associated with the supplied major and minor version integers. + */ + public static FlinkVersion fromMajorMinor(int major, int minor) { + for (FlinkVersion version : values()) { + if (version.majorVersion == major && version.minorVersion == minor) { + return version; + } + } + throw new IllegalArgumentException("Unknown Flink version: " + major + "." + minor); + } } diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java new file mode 100644 index 0000000000..d830d7de7a --- /dev/null +++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.spec; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class FlinkVersionTest { + + @Test + void isEqualOrNewer() { + assertFalse(FlinkVersion.v1_16.isEqualOrNewer(FlinkVersion.v1_17)); + assertTrue(FlinkVersion.v1_17.isEqualOrNewer(FlinkVersion.v1_17)); + assertTrue(FlinkVersion.v1_18.isEqualOrNewer(FlinkVersion.v1_17)); + } + + @Test + void current() { + assertEquals(FlinkVersion.v1_20, FlinkVersion.current()); + } + + @Test + void isSupported() { + assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_20)); + } + + @Test + void fromMajorMinor() { + assertEquals(FlinkVersion.fromMajorMinor(1, 17), FlinkVersion.v1_17); + assertEquals(FlinkVersion.fromMajorMinor(1, 18), FlinkVersion.v1_18); + assertEquals(FlinkVersion.fromMajorMinor(1, 19), FlinkVersion.v1_19); + assertEquals(FlinkVersion.fromMajorMinor(1, 20), FlinkVersion.v1_20); + assertThrows(IllegalArgumentException.class, () -> FlinkVersion.fromMajorMinor(0, 1)); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 6b5f16a34d..93d53fbe05 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -49,14 +49,21 @@ import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.applyJobConfig; @@ -80,6 +87,14 @@ public class FlinkConfigManager { private final AtomicLong defaultConfigVersion = new AtomicLong(0); private final LoadingCache cache; private final Consumer> namespaceListener; + private volatile Map> relevantFlinkVersionPrefixes; + + protected static final Pattern FLINK_VERSION_PATTERN = + Pattern.compile( + VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.") + + "v(?\\d+)_(?\\d+)(?\\" + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + ")?\\..*"); @VisibleForTesting public FlinkConfigManager(Configuration defaultConfig) { @@ -99,6 +114,7 @@ public FlinkConfigManager( this.namespaceListener = namespaceListener; Duration cacheTimeout = defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT); + this.relevantFlinkVersionPrefixes = new HashMap<>(); this.cache = CacheBuilder.newBuilder() .maximumSize( @@ -169,6 +185,11 @@ public void updateDefaultConfig(Configuration newConf) { // We do not invalidate the cache to avoid deleting currently used temp files, // simply bump the version this.defaultConfigVersion.incrementAndGet(); + + // We clear the cached relevant Flink version prefixes as the base config may include new + // version overrides. + // This will trigger a regeneration of the prefixes in the next call to getDefaultConfig. + relevantFlinkVersionPrefixes = new HashMap<>(); } /** @@ -202,6 +223,65 @@ public FlinkOperatorConfiguration getOperatorConfiguration( getDefaultConfig(namespace, flinkVersion)); } + /** + * This method will search the keys of the supplied map and find any that contain a flink + * version string that is relevant to the supplied flink version. + * + *

Relevance is defined as any key with the {@link + * KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by either the supplied flink + * version (with or without the {@link + * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}) or a lower flink version + * string followed by the {@link + * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}. + * + *

Prefixes are returned in ascending order of flink version. + * + * @param baseConfMap The configuration map that should be searched for relevant Flink version + * prefixes. + * @return A list of relevant Flink version prefixes in order of ascending Flink version. + */ + protected static List getRelevantVersionPrefixes( + Map baseConfMap, FlinkVersion flinkVersion) { + SortedMap greaterThanVersionPrefixes = new TreeMap<>(); + + for (Map.Entry entry : baseConfMap.entrySet()) { + Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey()); + if (versionMatcher.matches() && versionMatcher.group("gt") != null) { + try { + FlinkVersion keyFlinkVersion = + FlinkVersion.fromMajorMinor( + Integer.parseInt(versionMatcher.group("major")), + Integer.parseInt(versionMatcher.group("minor"))); + if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) { + greaterThanVersionPrefixes.put( + keyFlinkVersion, + VERSION_CONF_PREFIX + + keyFlinkVersion + + KubernetesOperatorConfigOptions + .FLINK_VERSION_GREATER_THAN_SUFFIX + + "."); + } + } catch (NumberFormatException numberFormatException) { + LOG.warn("Unable to parse version number in config key: {}", entry.getKey()); + } catch (IllegalArgumentException illegalArgumentException) { + LOG.warn("Unknown Flink version in config key: {}", entry.getKey()); + } + } + } + + // Extract the prefixes from the sorted map, these will be ascending Flink version order + List sortedRelevantVersionPrefixes = + new ArrayList<>(greaterThanVersionPrefixes.values()); + + // Add the current flink version prefix (without the greater than symbol) to the set. + // Any current flink version prefix with the greater than symbol would already have been + // added + // in the loop above. + sortedRelevantVersionPrefixes.add(VERSION_CONF_PREFIX + flinkVersion + "."); + + return sortedRelevantVersionPrefixes; + } + /** * Get the base configuration for the given namespace and flink version combination. This is * different from the platform level base config as it may contain namespaces or version @@ -220,7 +300,20 @@ public Configuration getDefaultConfig(String namespace, FlinkVersion flinkVersio } if (flinkVersion != null) { - applyDefault(VERSION_CONF_PREFIX + flinkVersion + ".", baseConfMap, conf); + // Fetch or create a list of Flink version configs that apply to this current + // FlinkVersion. That will include all versions that are equal to or lower than + // the current one that are suffixed by a `+` + List versionPrefixes = + relevantFlinkVersionPrefixes.computeIfAbsent( + flinkVersion, + fv -> getRelevantVersionPrefixes(baseConfMap, flinkVersion)); + + // The version prefixes are returned in ascending order of Flink version, so configs + // attached to newer versions will override older ones. For example v1_16+.conf1 will + // be overridden if a key containing v1_18+.conf1 is present. + for (String versionPrefix : versionPrefixes) { + applyDefault(versionPrefix, baseConfMap, conf); + } } return conf; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 76d3c4d7c6..f4203fb7be 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -40,6 +40,7 @@ public class KubernetesOperatorConfigOptions { private static final String DEFAULT_CONF_PREFIX = K8S_OP_CONF_PREFIX + "default-configuration."; public static final String VERSION_CONF_PREFIX = DEFAULT_CONF_PREFIX + "flink-version."; + public static final String FLINK_VERSION_GREATER_THAN_SUFFIX = "+"; public static final String NAMESPACE_CONF_PREFIX = DEFAULT_CONF_PREFIX + "namespace."; public static final String SECTION_SYSTEM = "system"; public static final String SECTION_ADVANCED = "system_advanced"; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java index c8e575cfea..fcd55157eb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java @@ -45,13 +45,16 @@ import java.time.Duration; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.regex.Matcher; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** Test for FlinkConfigManager. */ @@ -196,7 +199,7 @@ public void testConfigOverrides(@TempDir Path confOverrideDir) throws IOExceptio Arrays.asList("foo: 1", "bar: 2")); var conf = FlinkConfigManager.loadGlobalConfiguration(Optional.of(confOverrideDir.toString())); - Assertions.assertEquals(Map.of("foo", "1", "bar", "2"), conf.toMap()); + assertEquals(Map.of("foo", "1", "bar", "2"), conf.toMap()); } @Test @@ -241,15 +244,102 @@ public void testWatchNamespaceOverrideWhenEmpty() { } } + @Test + public void testFlinkVersionRegex() { + String version116PlusMatch = + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + + FlinkVersion.v1_16 + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + ".conf1"; + String version116PlusNoMatch = + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + "w2_15" + "-" + ".conf2"; + + Matcher match = FlinkConfigManager.FLINK_VERSION_PATTERN.matcher(version116PlusMatch); + assertTrue(match.matches()); + assertEquals(3, match.groupCount()); + assertEquals(1, Integer.parseInt(match.group("major"))); + assertEquals(16, Integer.parseInt(match.group("minor"))); + assertNotNull(match.group("gt")); + Matcher noMatch = FlinkConfigManager.FLINK_VERSION_PATTERN.matcher(version116PlusNoMatch); + assertFalse(noMatch.matches()); + } + + @Test + public void testGetRelevantVersionPrefixes() { + Map baseConfig = new HashMap<>(); + String version116Plus = + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + + FlinkVersion.v1_16 + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + "."; + String version117 = + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + FlinkVersion.v1_17 + "."; + String version118Plus = + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + + FlinkVersion.v1_18 + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + "."; + String version119 = + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + FlinkVersion.v1_19 + "."; + baseConfig.put(version116Plus + "conf1", "v1"); + baseConfig.put(version117 + "conf2", "v2"); + baseConfig.put(version118Plus + "conf3", "v3"); + baseConfig.put(version119 + "conf1", "v1.1"); + + List relevantPrefixes = + FlinkConfigManager.getRelevantVersionPrefixes(baseConfig, FlinkVersion.v1_19); + + System.out.println(relevantPrefixes); + + assertEquals( + 3, + relevantPrefixes.size(), + "Expected 3 version prefix entries, 1.16+, 1.18+ and 1.19"); + assertEquals(relevantPrefixes.get(0), version116Plus); + assertEquals(relevantPrefixes.get(1), version118Plus); + assertEquals(relevantPrefixes.get(2), version119); + } + @Test public void testVersionNamespaceDefaultConfs() { var opConf = new Configuration(); opConf.setString("conf0", "false"); - opConf.setString(KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + "v1_17.conf1", "v1"); opConf.setString( - KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + "v1_17.conf0", "true"); + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + + FlinkVersion.v1_16 + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + ".conf4", + "v1_16 greater than"); + // This entry should not appear in the final config as it is overridden by the v1_17+ entry + opConf.setString( + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + + FlinkVersion.v1_16 + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + ".conf5", + "v1_16 greater than"); + opConf.setString( + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + FlinkVersion.v1_17 + ".conf1", + "v1"); + opConf.setString( + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + FlinkVersion.v1_17 + ".conf0", + "true"); + // This overrides the v1_16+ entry. + opConf.setString( + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + + FlinkVersion.v1_17 + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + ".conf5", + "v1_17 greater than"); - opConf.setString(KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + "v1_18.conf2", "v2"); + opConf.setString( + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + FlinkVersion.v1_18 + ".conf2", + "v2"); + opConf.setString( + KubernetesOperatorConfigOptions.VERSION_CONF_PREFIX + + FlinkVersion.v1_18 + + KubernetesOperatorConfigOptions.FLINK_VERSION_GREATER_THAN_SUFFIX + + ".conf6", + "v6"); opConf.setString(KubernetesOperatorConfigOptions.NAMESPACE_CONF_PREFIX + "ns1.conf1", "vn"); opConf.setString(KubernetesOperatorConfigOptions.NAMESPACE_CONF_PREFIX + "ns1.conf3", "v3"); @@ -262,7 +352,10 @@ public void testVersionNamespaceDefaultConfs() { assertEquals("v1", v17Conf.get("conf1")); assertEquals("true", v17Conf.get("conf0")); + assertEquals("v1_16 greater than", v18Conf.get("conf4")); + assertEquals("v1_17 greater than", v18Conf.get("conf5")); assertEquals("v2", v18Conf.get("conf2")); + assertEquals("v6", v18Conf.get("conf6")); assertEquals("false", v18Conf.get("conf0")); // Namespace defaults diff --git a/helm/flink-kubernetes-operator/conf/flink-conf.yaml b/helm/flink-kubernetes-operator/conf/flink-conf.yaml index 496034bb20..30e0207307 100644 --- a/helm/flink-kubernetes-operator/conf/flink-conf.yaml +++ b/helm/flink-kubernetes-operator/conf/flink-conf.yaml @@ -25,8 +25,7 @@ parallelism.default: 1 # Flink 1.18 uses env.java.opts.all, if a user supplies their own version of these opts in their FlinkDeployment the options below will be overridden. # env.java.default-opts.all is used for 1.19 onwards so users can supply their own opts.all in their Job deployments and have these appended. kubernetes.operator.default-configuration.flink-version.v1_18.env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -kubernetes.operator.default-configuration.flink-version.v1_19.env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -kubernetes.operator.default-configuration.flink-version.v1_20.env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED +kubernetes.operator.default-configuration.flink-version.v1_19+.env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED # Flink operator related configs # kubernetes.operator.reconcile.interval: 60 s