diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/FlinkAutoscalerScalingRealizer.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/FlinkAutoscalerScalingRealizer.java new file mode 100644 index 0000000000..39bc621344 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/FlinkAutoscalerScalingRealizer.java @@ -0,0 +1,6 @@ +package org.apache.flink.autoscaler.realizer; + +import org.apache.flink.core.plugin.Plugin; + +/* Flink Autoscaler Scaling Realizer */ +public interface FlinkAutoscalerScalingRealizer extends Plugin, ScalingRealizer {} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index e2cca03988..0d0f493ba0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -185,7 +185,9 @@ void registerDeploymentController() { var statusRecorder = StatusRecorder.create(client, metricManager, listeners); var clusterResourceManager = ClusterResourceManager.of(configManager.getDefaultConfig(), client); - var autoscaler = AutoscalerFactory.create(client, eventRecorder, clusterResourceManager); + var autoscaler = + AutoscalerFactory.create( + client, eventRecorder, clusterResourceManager, configManager); var reconcilerFactory = new ReconcilerFactory(eventRecorder, statusRecorder, autoscaler); var observerFactory = new FlinkDeploymentObserverFactory(eventRecorder); var canaryResourceManager = new CanaryResourceManager(configManager); @@ -209,7 +211,7 @@ void registerSessionJobController() { var metricManager = MetricManager.createFlinkSessionJobMetricManager(baseConfig, metricGroup); var statusRecorder = StatusRecorder.create(client, metricManager, listeners); - var autoscaler = AutoscalerFactory.create(client, eventRecorder, null); + var autoscaler = AutoscalerFactory.create(client, eventRecorder, null, configManager); var reconciler = new SessionJobReconciler(eventRecorder, statusRecorder, autoscaler); var observer = new FlinkSessionJobObserver(eventRecorder); var canaryResourceManager = new CanaryResourceManager(configManager); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java index 7bbb5492ec..7c2c82e774 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java @@ -22,31 +22,54 @@ import org.apache.flink.autoscaler.RestApiMetricsCollector; import org.apache.flink.autoscaler.ScalingExecutor; import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.realizer.FlinkAutoscalerScalingRealizer; +import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore; import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.PluginDiscoveryUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; /** The factory of {@link JobAutoScaler}. */ public class AutoscalerFactory { + private static Logger LOG = LoggerFactory.getLogger(AutoscalerFactory.class); + public static JobAutoScaler create( KubernetesClient client, EventRecorder eventRecorder, - ClusterResourceManager clusterResourceManager) { + ClusterResourceManager clusterResourceManager, + FlinkConfigManager configManager) { var stateStore = new KubernetesAutoScalerStateStore(new ConfigMapStore(client)); var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder); + Set discoveredFlinkAutoscalerRealizers = + PluginDiscoveryUtils.discoverResources( + configManager, FlinkAutoscalerScalingRealizer.class); + + ScalingRealizer scalingRealizer = new KubernetesScalingRealizer(); + + if (!discoveredFlinkAutoscalerRealizers.isEmpty()) { + scalingRealizer = discoveredFlinkAutoscalerRealizers.stream().findFirst().get(); + + LOG.info("Overriding ScalingRealizer to {}", scalingRealizer.getClass().getName()); + } + return new JobAutoScalerImpl<>( new RestApiMetricsCollector<>(), new ScalingMetricEvaluator(), new ScalingExecutor<>(eventHandler, stateStore, clusterResourceManager), eventHandler, - new KubernetesScalingRealizer(), + scalingRealizer, stateStore); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/PluginDiscoveryUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/PluginDiscoveryUtils.java new file mode 100644 index 0000000000..7966ca8e05 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/PluginDiscoveryUtils.java @@ -0,0 +1,39 @@ +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +public class PluginDiscoveryUtils { + + private static final Logger LOG = LoggerFactory.getLogger(PluginDiscoveryUtils.class); + + public static Set discoverResources( + FlinkConfigManager configManager, Class resourceClass) { + var conf = configManager.getDefaultConfig(); + + Set resources = new HashSet<>(); + + PluginUtils.createPluginManagerFromRootFolder(conf) + .load(resourceClass) + .forEachRemaining( + resource -> { + LOG.info( + "Discovered resource from plugin directory [{}]: {}.", + System.getenv() + .getOrDefault( + ConfigConstants.ENV_FLINK_PLUGINS_DIR, + ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS), + resource.getClass().getName()); + resources.add(resource); + }); + + return resources; + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java index f848d491a6..6b884044b8 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java @@ -19,6 +19,8 @@ import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.autoscaler.JobAutoScalerImpl; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector; @@ -47,7 +49,8 @@ void testLoadDefaultImplementation() { new EventRecorder( new FlinkResourceEventCollector(), new FlinkStateSnapshotEventCollector()), - new ClusterResourceManager(Duration.ZERO, kubernetesClient)); + new ClusterResourceManager(Duration.ZERO, kubernetesClient), + new FlinkConfigManager(new Configuration())); Assertions.assertTrue(autoScaler instanceof JobAutoScalerImpl); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 5eca939457..e00cc7384b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -100,7 +100,8 @@ public TestingFlinkDeploymentController( flinkService.getKubernetesClient(), eventRecorder, new ClusterResourceManager( - Duration.ZERO, flinkService.getKubernetesClient()))); + Duration.ZERO, flinkService.getKubernetesClient()), + configManager)); canaryResourceManager = new CanaryResourceManager<>(configManager); flinkDeploymentController = new FlinkDeploymentController(