Skip to content

Commit 89f7857

Browse files
committed
Draft PR for plugin based approach for custom evaluator for scaling metric evaluation
1 parent 9eb3c38 commit 89f7857

File tree

21 files changed

+1112
-32
lines changed

21 files changed

+1112
-32
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.api.java.tuple.Tuple2;
2223
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2324
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
2425
import org.apache.flink.autoscaler.exceptions.NotReadyException;
2526
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
27+
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
2628
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
2729
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
2830
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
2931
import org.apache.flink.autoscaler.tuning.ConfigChanges;
32+
import org.apache.flink.configuration.Configuration;
3033
import org.apache.flink.configuration.PipelineOptions;
3134
import org.apache.flink.util.Preconditions;
3235

@@ -36,11 +39,14 @@
3639
import java.time.Clock;
3740
import java.util.HashMap;
3841
import java.util.Map;
42+
import java.util.Optional;
3943
import java.util.concurrent.ConcurrentHashMap;
4044

4145
import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
46+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.CUSTOM_EVALUATOR_NAME;
4247
import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
4348
import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
49+
import static org.apache.flink.autoscaler.metrics.CustomEvaluatorOptions.CUSTOM_EVALUATOR_CLASS;
4450
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
4551
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingTracking;
4652

@@ -58,6 +64,7 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
5864
private final AutoScalerEventHandler<KEY, Context> eventHandler;
5965
private final ScalingRealizer<KEY, Context> scalingRealizer;
6066
private final AutoScalerStateStore<KEY, Context> stateStore;
67+
private final Map<String, CustomEvaluator> customEvaluators;
6168

6269
private Clock clock = Clock.systemDefaultZone();
6370

@@ -73,13 +80,15 @@ public JobAutoScalerImpl(
7380
ScalingExecutor<KEY, Context> scalingExecutor,
7481
AutoScalerEventHandler<KEY, Context> eventHandler,
7582
ScalingRealizer<KEY, Context> scalingRealizer,
76-
AutoScalerStateStore<KEY, Context> stateStore) {
83+
AutoScalerStateStore<KEY, Context> stateStore,
84+
Map<String, CustomEvaluator> customEvaluators) {
7785
this.metricsCollector = metricsCollector;
7886
this.evaluator = evaluator;
7987
this.scalingExecutor = scalingExecutor;
8088
this.eventHandler = eventHandler;
8189
this.scalingRealizer = scalingRealizer;
8290
this.stateStore = stateStore;
91+
this.customEvaluators = customEvaluators;
8392
}
8493

8594
@Override
@@ -203,8 +212,15 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri
203212

204213
// Scaling tracking data contains previous restart times that are taken into account
205214
var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration());
215+
216+
var customEvaluatorWithConfig = getCustomEvaluatorIfRequired(ctx.getConfiguration());
217+
206218
var evaluatedMetrics =
207-
evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime);
219+
evaluator.evaluate(
220+
ctx.getConfiguration(),
221+
collectedMetrics,
222+
restartTime,
223+
customEvaluatorWithConfig);
208224
LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
209225
lastEvaluatedMetrics.put(ctx.getJobKey(), evaluatedMetrics);
210226

@@ -259,4 +275,17 @@ void setClock(Clock clock) {
259275
this.metricsCollector.setClock(clock);
260276
this.scalingExecutor.setClock(clock);
261277
}
278+
279+
@VisibleForTesting
280+
protected Tuple2<CustomEvaluator, Configuration> getCustomEvaluatorIfRequired(
281+
Configuration conf) {
282+
var customEvaluatorName = conf.get(CUSTOM_EVALUATOR_NAME);
283+
var customEvaluatorConfig = AutoScalerOptions.forCustomEvaluator(conf, customEvaluatorName);
284+
CustomEvaluator evaluator =
285+
Optional.ofNullable(customEvaluatorConfig.get(CUSTOM_EVALUATOR_CLASS))
286+
.map(this.customEvaluators::get)
287+
.orElse(null);
288+
289+
return evaluator != null ? new Tuple2<>(evaluator, customEvaluatorConfig) : null;
290+
}
262291
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
package org.apache.flink.autoscaler;
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.api.java.tuple.Tuple2;
2122
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2223
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
2324
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
25+
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
2426
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
2527
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
2628
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2729
import org.apache.flink.autoscaler.metrics.ScalingMetric;
2830
import org.apache.flink.autoscaler.topology.JobTopology;
2931
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
3032
import org.apache.flink.configuration.Configuration;
33+
import org.apache.flink.configuration.UnmodifiableConfiguration;
3134
import org.apache.flink.runtime.jobgraph.JobVertexID;
3235

3336
import org.slf4j.Logger;
@@ -38,6 +41,7 @@
3841

3942
import java.time.Duration;
4043
import java.time.Instant;
44+
import java.util.Collections;
4145
import java.util.HashMap;
4246
import java.util.Map;
4347
import java.util.Optional;
@@ -72,14 +76,34 @@ public class ScalingMetricEvaluator {
7276
private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);
7377

7478
public EvaluatedMetrics evaluate(
75-
Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime) {
79+
Configuration conf,
80+
CollectedMetricHistory collectedMetrics,
81+
Duration restartTime,
82+
@Nullable Tuple2<CustomEvaluator, Configuration> customEvaluatorWithConfig) {
7683
LOG.debug("Restart time used in metrics evaluation: {}", restartTime);
7784
var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>();
7885
var metricsHistory = collectedMetrics.getMetricHistory();
7986
var topology = collectedMetrics.getJobTopology();
8087

8188
boolean processingBacklog = isProcessingBacklog(topology, metricsHistory, conf);
8289

90+
var customEvaluationSession =
91+
Optional.ofNullable(customEvaluatorWithConfig)
92+
.map(
93+
info ->
94+
Tuple2.of(
95+
info.f0,
96+
new CustomEvaluator.Context(
97+
new UnmodifiableConfiguration(conf),
98+
Collections.unmodifiableSortedMap(
99+
metricsHistory),
100+
Collections.unmodifiableMap(scalingOutput),
101+
topology,
102+
processingBacklog,
103+
restartTime,
104+
info.f1)))
105+
.orElse(null);
106+
83107
for (var vertex : topology.getVerticesInTopologicalOrder()) {
84108
scalingOutput.put(
85109
vertex,
@@ -90,7 +114,8 @@ public EvaluatedMetrics evaluate(
90114
topology,
91115
vertex,
92116
processingBacklog,
93-
restartTime));
117+
restartTime,
118+
customEvaluationSession));
94119
}
95120

96121
var globalMetrics = evaluateGlobalMetrics(metricsHistory);
@@ -132,7 +157,8 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
132157
JobTopology topology,
133158
JobVertexID vertex,
134159
boolean processingBacklog,
135-
Duration restartTime) {
160+
Duration restartTime,
161+
@Nullable Tuple2<CustomEvaluator, CustomEvaluator.Context> customEvaluationSession) {
136162

137163
var latestVertexMetrics =
138164
metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex);
@@ -142,6 +168,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
142168
double inputRateAvg = getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory);
143169

144170
var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
171+
145172
computeTargetDataRate(
146173
topology,
147174
vertex,
@@ -175,6 +202,24 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
175202
EvaluatedScalingMetric.of(vertexInfo.getNumSourcePartitions()));
176203

177204
computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime);
205+
206+
Optional.ofNullable(customEvaluationSession)
207+
.map(
208+
session ->
209+
runCustomEvaluator(
210+
vertex,
211+
Collections.unmodifiableMap(evaluatedMetrics),
212+
session))
213+
.filter(customEvaluatedMetrics -> !customEvaluatedMetrics.isEmpty())
214+
.ifPresent(
215+
customEvaluatedMetrics -> {
216+
LOG.info(
217+
"Merging custom evaluated metrics for vertex {}: {}",
218+
vertex,
219+
customEvaluatedMetrics);
220+
mergeEvaluatedMetricsMaps(evaluatedMetrics, customEvaluatedMetrics);
221+
});
222+
178223
return evaluatedMetrics;
179224
}
180225

@@ -585,4 +630,51 @@ protected static double computeEdgeDataRate(
585630
to);
586631
return getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory);
587632
}
633+
634+
@VisibleForTesting
635+
protected static Map<ScalingMetric, EvaluatedScalingMetric> runCustomEvaluator(
636+
JobVertexID vertex,
637+
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
638+
Tuple2<CustomEvaluator, CustomEvaluator.Context> customEvaluationSession) {
639+
try {
640+
return customEvaluationSession.f0.evaluateVertexMetrics(
641+
vertex, evaluatedMetrics, customEvaluationSession.f1);
642+
} catch (UnsupportedOperationException e) {
643+
LOG.warn(
644+
"Custom evaluator {} tried accessing an un-modifiable view.",
645+
customEvaluationSession.f0.getClass(),
646+
e);
647+
} catch (Exception e) {
648+
LOG.warn(
649+
"Custom evaluator {} threw an exception.",
650+
customEvaluationSession.f0.getClass(),
651+
e);
652+
}
653+
654+
return Collections.emptyMap();
655+
}
656+
657+
@VisibleForTesting
658+
protected static void mergeEvaluatedMetricsMaps(
659+
Map<ScalingMetric, EvaluatedScalingMetric> actual,
660+
@Nullable Map<ScalingMetric, EvaluatedScalingMetric> incoming) {
661+
Optional.ofNullable(incoming)
662+
.ifPresent(
663+
customEvaluatedMetric ->
664+
customEvaluatedMetric.forEach(
665+
(scalingMetric, evaluatedScalingMetric) ->
666+
actual.merge(
667+
scalingMetric,
668+
evaluatedScalingMetric,
669+
ScalingMetricEvaluator
670+
::mergeEvaluatedScalingMetric)));
671+
}
672+
673+
@VisibleForTesting
674+
protected static EvaluatedScalingMetric mergeEvaluatedScalingMetric(
675+
EvaluatedScalingMetric actual, EvaluatedScalingMetric incoming) {
676+
return new EvaluatedScalingMetric(
677+
!Double.isNaN(incoming.getCurrent()) ? incoming.getCurrent() : actual.getCurrent(),
678+
!Double.isNaN(incoming.getAverage()) ? incoming.getAverage() : actual.getAverage());
679+
}
588680
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2222
import org.apache.flink.configuration.ConfigOption;
2323
import org.apache.flink.configuration.ConfigOptions;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.DelegatingConfiguration;
2426
import org.apache.flink.configuration.MemorySize;
2527

2628
import java.time.Duration;
@@ -31,6 +33,7 @@ public class AutoScalerOptions {
3133

3234
public static final String OLD_K8S_OP_CONF_PREFIX = "kubernetes.operator.";
3335
public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
36+
public static final String CUSTOM_EVALUATOR_CONF_PREFIX = "metrics.custom-evaluator.";
3437

3538
private static String oldOperatorConfigKey(String key) {
3639
return OLD_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
@@ -382,4 +385,19 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
382385
"scaling.key-group.partitions.adjust.mode"))
383386
.withDescription(
384387
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
388+
389+
public static final ConfigOption<String> CUSTOM_EVALUATOR_NAME =
390+
autoScalerConfig(CUSTOM_EVALUATOR_CONF_PREFIX + "name")
391+
.stringType()
392+
.defaultValue(null)
393+
.withFallbackKeys(oldOperatorConfigKey(CUSTOM_EVALUATOR_CONF_PREFIX + "name"))
394+
.withDescription("Name of the custom evaluator to be used.");
395+
396+
public static Configuration forCustomEvaluator(
397+
Configuration configuration, String customEvaluatorName) {
398+
399+
return new DelegatingConfiguration(
400+
configuration,
401+
AUTOSCALER_CONF_PREFIX + CUSTOM_EVALUATOR_CONF_PREFIX + customEvaluatorName + ".");
402+
}
385403
}

0 commit comments

Comments
 (0)