Skip to content

Commit 67d0799

Browse files
committed
updating name for evaluator plugin to match naming convention for plugins.
1 parent 7a57e05 commit 67d0799

File tree

11 files changed

+40
-144
lines changed

11 files changed

+40
-144
lines changed

docs/content.zh/docs/operations/plugins.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,5 @@ The following steps demonstrate how to develop and use a custom mutator.
188188
```text
189189
2023-12-12 06:26:56,667 o.a.f.k.o.u.MutatorUtils [INFO ] Discovered mutator from plugin directory[/opt/flink/plugins]: org.apache.flink.mutator.CustomFlinkMutator.
190190
```
191+
192+
## Flink Autoscaler Custom Evaluator

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
2525
import org.apache.flink.autoscaler.exceptions.NotReadyException;
2626
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
27-
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
2827
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
28+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
2929
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
3030
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
3131
import org.apache.flink.autoscaler.tuning.ConfigChanges;
@@ -63,7 +63,7 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
6363
private final AutoScalerEventHandler<KEY, Context> eventHandler;
6464
private final ScalingRealizer<KEY, Context> scalingRealizer;
6565
private final AutoScalerStateStore<KEY, Context> stateStore;
66-
private final Map<String, CustomEvaluator> customEvaluators;
66+
private final Map<String, FlinkAutoscalerEvaluator> customEvaluators;
6767

6868
private Clock clock = Clock.systemDefaultZone();
6969

@@ -80,7 +80,7 @@ public JobAutoScalerImpl(
8080
AutoScalerEventHandler<KEY, Context> eventHandler,
8181
ScalingRealizer<KEY, Context> scalingRealizer,
8282
AutoScalerStateStore<KEY, Context> stateStore,
83-
Map<String, CustomEvaluator> customEvaluators) {
83+
Map<String, FlinkAutoscalerEvaluator> customEvaluators) {
8484
this.metricsCollector = metricsCollector;
8585
this.evaluator = evaluator;
8686
this.scalingExecutor = scalingExecutor;
@@ -276,12 +276,12 @@ void setClock(Clock clock) {
276276
}
277277

278278
@VisibleForTesting
279-
protected Tuple2<CustomEvaluator, Configuration> getCustomEvaluatorIfRequired(
279+
protected Tuple2<FlinkAutoscalerEvaluator, Configuration> getCustomEvaluatorIfRequired(
280280
Configuration conf) {
281281
return Optional.ofNullable(conf.get(CUSTOM_EVALUATOR_NAME))
282282
.map(
283283
name -> {
284-
CustomEvaluator evaluator = customEvaluators.get(name);
284+
FlinkAutoscalerEvaluator evaluator = customEvaluators.get(name);
285285
return evaluator != null
286286
? new Tuple2<>(
287287
evaluator,

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2323
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
2424
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
25-
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
2625
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
2726
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
27+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
2828
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2929
import org.apache.flink.autoscaler.metrics.ScalingMetric;
3030
import org.apache.flink.autoscaler.topology.JobTopology;
@@ -79,7 +79,9 @@ public EvaluatedMetrics evaluate(
7979
Configuration conf,
8080
CollectedMetricHistory collectedMetrics,
8181
Duration restartTime,
82-
@Nullable Tuple2<CustomEvaluator, Configuration> customEvaluatorWithConfig) {
82+
@Nullable
83+
Tuple2<FlinkAutoscalerEvaluator, Configuration>
84+
customEvaluatorWithConfig) {
8385
LOG.debug("Restart time used in metrics evaluation: {}", restartTime);
8486
var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>();
8587
var metricsHistory = collectedMetrics.getMetricHistory();
@@ -93,7 +95,7 @@ public EvaluatedMetrics evaluate(
9395
info ->
9496
Tuple2.of(
9597
info.f0,
96-
new CustomEvaluator.Context(
98+
new FlinkAutoscalerEvaluator.Context(
9799
new UnmodifiableConfiguration(conf),
98100
Collections.unmodifiableSortedMap(
99101
metricsHistory),
@@ -158,7 +160,9 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
158160
JobVertexID vertex,
159161
boolean processingBacklog,
160162
Duration restartTime,
161-
@Nullable Tuple2<CustomEvaluator, CustomEvaluator.Context> customEvaluationSession) {
163+
@Nullable
164+
Tuple2<FlinkAutoscalerEvaluator, FlinkAutoscalerEvaluator.Context>
165+
customEvaluationSession) {
162166

163167
var latestVertexMetrics =
164168
metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex);
@@ -633,7 +637,7 @@ protected static double computeEdgeDataRate(
633637

634638
/**
635639
* Executes the provided custom evaluator for the given job vertex. Calls {@link
636-
* CustomEvaluator#evaluateVertexMetrics} to evaluate scaling metrics.
640+
* FlinkAutoscalerEvaluator#evaluateVertexMetrics} to evaluate scaling metrics.
637641
*
638642
* @param vertex The job vertex being evaluated.
639643
* @param evaluatedMetrics Current evaluated metrics.
@@ -645,7 +649,8 @@ protected static double computeEdgeDataRate(
645649
protected static Map<ScalingMetric, EvaluatedScalingMetric> runCustomEvaluator(
646650
JobVertexID vertex,
647651
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
648-
Tuple2<CustomEvaluator, CustomEvaluator.Context> customEvaluationSession) {
652+
Tuple2<FlinkAutoscalerEvaluator, FlinkAutoscalerEvaluator.Context>
653+
customEvaluationSession) {
649654
try {
650655
return customEvaluationSession.f0.evaluateVertexMetrics(
651656
vertex, evaluatedMetrics, customEvaluationSession.f1);

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java renamed to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkAutoscalerEvaluator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
import java.util.SortedMap;
3232

3333
/**
34-
* Interface for custom evaluators that allow custom scaling metric evaluations. Implementations of
34+
* Interface for custom evaluators that allow tailored scaling metric evaluations. Implementations of
3535
* this interface can provide custom logic to evaluate vertex metrics and merge them with internally
3636
* evaluated metrics.
3737
*/
38-
public interface CustomEvaluator extends Plugin {
38+
public interface FlinkAutoscalerEvaluator extends Plugin {
3939

4040
/**
4141
* Returns the name of the custom evaluator.

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2323
import org.apache.flink.autoscaler.event.TestingEventCollector;
2424
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
25-
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
25+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
2626
import org.apache.flink.autoscaler.metrics.TestCustomEvaluator;
2727
import org.apache.flink.autoscaler.metrics.TestMetrics;
2828
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
@@ -165,7 +165,7 @@ private void assertFlinkMetricsCount(int scalingCount, int balancedCount) {
165165
assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalancedCount());
166166
}
167167

168-
private Map<String, CustomEvaluator> createTestCustomEvaluator() {
168+
private Map<String, FlinkAutoscalerEvaluator> createTestCustomEvaluator() {
169169
var testCustomEvaluator = new TestCustomEvaluator();
170170
testCustomEvaluator.configure(new Configuration());
171171
return Map.of(testCustomEvaluator.getName(), testCustomEvaluator);

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.autoscaler.exceptions.NotReadyException;
2424
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
2525
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
26-
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
26+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
2727
import org.apache.flink.autoscaler.metrics.ScalingMetric;
2828
import org.apache.flink.autoscaler.metrics.TestCustomEvaluator;
2929
import org.apache.flink.autoscaler.metrics.TestMetrics;
@@ -466,7 +466,7 @@ private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> getEven
466466

467467
@Test
468468
void testGetCustomEvaluatorIfRequired() {
469-
CustomEvaluator testCustomEvaluator = new TestCustomEvaluator();
469+
FlinkAutoscalerEvaluator testCustomEvaluator = new TestCustomEvaluator();
470470
testCustomEvaluator.configure(new Configuration());
471471
var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator);
472472

@@ -491,7 +491,7 @@ void testGetCustomEvaluatorIfRequired() {
491491
autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired(
492492
context.getConfiguration());
493493
assertNotNull(customEvaluatorWithConfig);
494-
assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0);
494+
assertInstanceOf(FlinkAutoscalerEvaluator.class, customEvaluatorWithConfig.f0);
495495
var customEvaluatorConfig = customEvaluatorWithConfig.f1;
496496
assertNotNull(customEvaluatorConfig);
497497
assertEquals(0, customEvaluatorConfig.keySet().size());
@@ -525,7 +525,8 @@ void testGetCustomEvaluatorIfRequired() {
525525
context.getConfiguration());
526526
assertNotNull(customEvaluatorWithConfigContainingAdditionalKeys);
527527
assertInstanceOf(
528-
CustomEvaluator.class, customEvaluatorWithConfigContainingAdditionalKeys.f0);
528+
FlinkAutoscalerEvaluator.class,
529+
customEvaluatorWithConfigContainingAdditionalKeys.f0);
529530
var customEvaluatorConfigContainingAdditionalKeys =
530531
customEvaluatorWithConfigContainingAdditionalKeys.f1;
531532
assertNotNull(customEvaluatorConfigContainingAdditionalKeys);

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2222
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
2323
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
24-
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
2524
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
25+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
2626
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2727
import org.apache.flink.autoscaler.metrics.ScalingMetric;
2828
import org.apache.flink.autoscaler.metrics.TestCustomEvaluator;
@@ -319,7 +319,7 @@ public void testEvaluateWithCustomEvaluator() {
319319
var conf = new Configuration();
320320

321321
conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
322-
CustomEvaluator customEvaluator = new TestCustomEvaluator();
322+
FlinkAutoscalerEvaluator customEvaluator = new TestCustomEvaluator();
323323
var customEvaluatorWithConfig = new Tuple2<>(customEvaluator, new Configuration());
324324

325325
var evaluatedMetrics =
@@ -985,13 +985,13 @@ public void testRunCustomEvaluator() {
985985
Map.of()));
986986

987987
var conf = new Configuration();
988-
CustomEvaluator customEvaluator = new TestCustomEvaluator();
988+
FlinkAutoscalerEvaluator customEvaluator = new TestCustomEvaluator();
989989
var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
990990

991991
var testCustomEvaluationSession =
992992
Tuple2.of(
993993
customEvaluator,
994-
new CustomEvaluator.Context(
994+
new FlinkAutoscalerEvaluator.Context(
995995
new UnmodifiableConfiguration(conf),
996996
Collections.unmodifiableSortedMap(metricHistory),
997997
Collections.unmodifiableMap(new HashMap<>()),

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java

Lines changed: 0 additions & 113 deletions
This file was deleted.

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import java.util.Map;
2626

2727
/**
28-
* A test implementation of the {@link CustomEvaluator} interface that provides custom scaling
29-
* metric evaluations for job vertices in a Flink job.
28+
* A test implementation of the {@link FlinkAutoscalerEvaluator} interface that provides
29+
* custom scaling metric evaluations for job vertices in a Flink job.
3030
*/
31-
public class TestCustomEvaluator implements CustomEvaluator {
31+
public class TestCustomEvaluator implements FlinkAutoscalerEvaluator {
3232
@Override
3333
public String getName() {
3434
return "test-custom-evaluator";

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.flink.kubernetes.operator.autoscaler;
2020

21-
import org.apache.flink.autoscaler.metrics.CustomEvaluator;
21+
import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator;
2222
import org.apache.flink.configuration.ConfigConstants;
2323
import org.apache.flink.core.plugin.PluginUtils;
2424
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -38,14 +38,15 @@ public class AutoscalerUtils {
3838
*
3939
* @param configManager Flink Config manager
4040
* @return A map of discovered custom evaluators, where the key is the evaluator name provided
41-
* by {@link CustomEvaluator#getName()}) and the value is the corresponding instance.
41+
* by {@link FlinkAutoscalerEvaluator#getName()}) and the value is the corresponding
42+
* instance.
4243
*/
43-
public static Map<String, CustomEvaluator> discoverCustomEvaluators(
44+
public static Map<String, FlinkAutoscalerEvaluator> discoverCustomEvaluators(
4445
FlinkConfigManager configManager) {
4546
var conf = configManager.getDefaultConfig();
46-
Map<String, CustomEvaluator> customEvaluators = new HashMap<>();
47+
Map<String, FlinkAutoscalerEvaluator> customEvaluators = new HashMap<>();
4748
PluginUtils.createPluginManagerFromRootFolder(conf)
48-
.load(CustomEvaluator.class)
49+
.load(FlinkAutoscalerEvaluator.class)
4950
.forEachRemaining(
5051
customEvaluator -> {
5152
String customEvaluatorName = customEvaluator.getName();

0 commit comments

Comments
 (0)