Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.flink.autoscaler.realizer;

import org.apache.flink.core.plugin.Plugin;

/* Flink Autoscaler Scaling Realizer */
public interface FlinkAutoscalerScalingRealizer extends Plugin, ScalingRealizer {}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ void registerDeploymentController() {
var statusRecorder = StatusRecorder.create(client, metricManager, listeners);
var clusterResourceManager =
ClusterResourceManager.of(configManager.getDefaultConfig(), client);
var autoscaler = AutoscalerFactory.create(client, eventRecorder, clusterResourceManager);
var autoscaler =
AutoscalerFactory.create(
client, eventRecorder, clusterResourceManager, configManager);
var reconcilerFactory = new ReconcilerFactory(eventRecorder, statusRecorder, autoscaler);
var observerFactory = new FlinkDeploymentObserverFactory(eventRecorder);
var canaryResourceManager = new CanaryResourceManager<FlinkDeployment>(configManager);
Expand All @@ -209,7 +211,7 @@ void registerSessionJobController() {
var metricManager =
MetricManager.createFlinkSessionJobMetricManager(baseConfig, metricGroup);
var statusRecorder = StatusRecorder.create(client, metricManager, listeners);
var autoscaler = AutoscalerFactory.create(client, eventRecorder, null);
var autoscaler = AutoscalerFactory.create(client, eventRecorder, null, configManager);
var reconciler = new SessionJobReconciler(eventRecorder, statusRecorder, autoscaler);
var observer = new FlinkSessionJobObserver(eventRecorder);
var canaryResourceManager = new CanaryResourceManager<FlinkSessionJob>(configManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,59 @@
import org.apache.flink.autoscaler.RestApiMetricsCollector;
import org.apache.flink.autoscaler.ScalingExecutor;
import org.apache.flink.autoscaler.ScalingMetricEvaluator;
import org.apache.flink.autoscaler.realizer.FlinkAutoscalerScalingRealizer;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.PluginDiscoveryUtils;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

/** The factory of {@link JobAutoScaler}. */
public class AutoscalerFactory {

private static Logger LOG = LoggerFactory.getLogger(AutoscalerFactory.class);

public static JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> create(
KubernetesClient client,
EventRecorder eventRecorder,
ClusterResourceManager clusterResourceManager) {
ClusterResourceManager clusterResourceManager,
FlinkConfigManager configManager) {

var stateStore = new KubernetesAutoScalerStateStore(new ConfigMapStore(client));
var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder);

Set<FlinkAutoscalerScalingRealizer> flinkAutoscalerScalingRealizers =
PluginDiscoveryUtils.discoverResources(
configManager, FlinkAutoscalerScalingRealizer.class);

flinkAutoscalerScalingRealizers.forEach(
realizer -> {
LOG.info(
"Discovered resource from plugin directory {}",
realizer.getClass().getName());
});

ScalingRealizer scalingRealizer = new KubernetesScalingRealizer();

if (!flinkAutoscalerScalingRealizers.isEmpty()) {
scalingRealizer = flinkAutoscalerScalingRealizers.stream().findFirst().get();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uses the scaling realizer from plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumes 1 scaling realizer, I'll harden this

}

return new JobAutoScalerImpl<>(
new RestApiMetricsCollector<>(),
new ScalingMetricEvaluator(),
new ScalingExecutor<>(eventHandler, stateStore, clusterResourceManager),
eventHandler,
new KubernetesScalingRealizer(),
scalingRealizer,
stateStore);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.apache.flink.kubernetes.operator.utils;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;

public class PluginDiscoveryUtils {

private static final Logger LOG = LoggerFactory.getLogger(PluginDiscoveryUtils.class);

public static <T> Set<T> discoverResources(
FlinkConfigManager configManager, Class<T> resourceClass) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keeping this generic, will be beneficial for future changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to ideas

var conf = configManager.getDefaultConfig();

Set<T> resources = new HashSet<>();

PluginUtils.createPluginManagerFromRootFolder(conf)
.load(resourceClass)
.forEachRemaining(
resource -> {
LOG.info(
"Discovered resource from plugin directory [{}]: {}.",
System.getenv()
.getOrDefault(
ConfigConstants.ENV_FLINK_PLUGINS_DIR,
ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS),
resource.getClass().getName());
resources.add(resource);
});

return resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerImpl;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector;
Expand Down Expand Up @@ -47,7 +49,8 @@ void testLoadDefaultImplementation() {
new EventRecorder(
new FlinkResourceEventCollector(),
new FlinkStateSnapshotEventCollector()),
new ClusterResourceManager(Duration.ZERO, kubernetesClient));
new ClusterResourceManager(Duration.ZERO, kubernetesClient),
new FlinkConfigManager(new Configuration()));
Assertions.assertTrue(autoScaler instanceof JobAutoScalerImpl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public TestingFlinkDeploymentController(
flinkService.getKubernetesClient(),
eventRecorder,
new ClusterResourceManager(
Duration.ZERO, flinkService.getKubernetesClient())));
Duration.ZERO, flinkService.getKubernetesClient()),
configManager));
canaryResourceManager = new CanaryResourceManager<>(configManager);
flinkDeploymentController =
new FlinkDeploymentController(
Expand Down