diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index f31fc7ba4b..32b96d729a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -77,6 +78,8 @@ /** Configuration manager for the Flink operator. */ public class FlinkConfigManager { + public static final String ENV_VAR_PREFIX = "FLINK_CONF_"; + private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class); private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -448,15 +451,36 @@ private static Configuration loadGlobalConfiguration() { @VisibleForTesting protected static Configuration loadGlobalConfiguration(Optional confOverrideDir) { + Configuration res; if (confOverrideDir.isPresent()) { Configuration configOverrides = GlobalConfiguration.loadConfiguration(confOverrideDir.get()); LOG.debug( "Loading default configuration with overrides from {}", confOverrideDir.get()); - return GlobalConfiguration.loadConfiguration(configOverrides); + res = GlobalConfiguration.loadConfiguration(configOverrides); + } else { + LOG.debug("Loading default configuration"); + res = GlobalConfiguration.loadConfiguration(); } - LOG.debug("Loading default configuration"); - return GlobalConfiguration.loadConfiguration(); + overriderConfigurationsFromEnvVariables(res, System::getenv); + return res; + } + + @VisibleForTesting + static void overriderConfigurationsFromEnvVariables( + Configuration res, Supplier> envVariables) { + var envVars = envVariables.get(); + envVars.forEach( + (k, v) -> { + if (k.startsWith(ENV_VAR_PREFIX)) { + res.setString(envVarToKey(k), v); + } + }); + } + + @VisibleForTesting + static String envVarToKey(String key) { + return key.replace(ENV_VAR_PREFIX, "").replace("__", "-").replace("_", ".").toLowerCase(); } private static void applyDefault( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java index 39414af44c..1f597cd700 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java @@ -53,6 +53,7 @@ import java.util.regex.Matcher; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -428,4 +429,12 @@ public void testConcurrentDefaultConfig() throws InterruptedException { assertTrue(completed2.get()); assertTrue(completed3.get()); } + + @Test + void envVarToFlinkConfig() { + assertThat( + FlinkConfigManager.envVarToKey( + "FLINK_CONF_RESTART__STRATEGY_FAILURE__RATE_DELAY")) + .isEqualTo("restart-strategy.failure-rate.delay"); + } }