Skip to content

Commit 7aa4185

Browse files
committed
1. Adding getName method to CustomEvaluator to derive the name from the interface. 2. Updating javadoc’s for the added methods.
1 parent d1cbb11 commit 7aa4185

File tree

9 files changed

+133
-75
lines changed

9 files changed

+133
-75
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import static org.apache.flink.autoscaler.config.AutoScalerOptions.CUSTOM_EVALUATOR_NAME;
4747
import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
4848
import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
49-
import static org.apache.flink.autoscaler.metrics.CustomEvaluatorOptions.CUSTOM_EVALUATOR_CLASS;
5049
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
5150
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingTracking;
5251

@@ -279,13 +278,16 @@ void setClock(Clock clock) {
279278
@VisibleForTesting
280279
protected Tuple2<CustomEvaluator, Configuration> getCustomEvaluatorIfRequired(
281280
Configuration conf) {
282-
var customEvaluatorName = conf.get(CUSTOM_EVALUATOR_NAME);
283-
var customEvaluatorConfig = AutoScalerOptions.forCustomEvaluator(conf, customEvaluatorName);
284-
CustomEvaluator evaluator =
285-
Optional.ofNullable(customEvaluatorConfig.get(CUSTOM_EVALUATOR_CLASS))
286-
.map(this.customEvaluators::get)
287-
.orElse(null);
288-
289-
return evaluator != null ? new Tuple2<>(evaluator, customEvaluatorConfig) : null;
281+
return Optional.ofNullable(conf.get(CUSTOM_EVALUATOR_NAME))
282+
.map(
283+
name -> {
284+
CustomEvaluator evaluator = customEvaluators.get(name);
285+
return evaluator != null
286+
? new Tuple2<>(
287+
evaluator,
288+
AutoScalerOptions.forCustomEvaluator(conf, name))
289+
: null;
290+
})
291+
.orElse(null);
290292
}
291293
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,16 @@ protected static double computeEdgeDataRate(
631631
return getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory);
632632
}
633633

634+
/**
635+
* Executes the provided custom evaluator for the given job vertex. Calls {@link
636+
* CustomEvaluator#evaluateVertexMetrics} to evaluate scaling metrics.
637+
*
638+
* @param vertex The job vertex being evaluated.
639+
* @param evaluatedMetrics Current evaluated metrics.
640+
* @param customEvaluationSession A tuple containing the custom evaluator and evaluation
641+
* context.
642+
* @return A map of scaling metrics, with its corresponding evaluated scaling metric.
643+
*/
634644
@VisibleForTesting
635645
protected static Map<ScalingMetric, EvaluatedScalingMetric> runCustomEvaluator(
636646
JobVertexID vertex,
@@ -654,6 +664,13 @@ protected static Map<ScalingMetric, EvaluatedScalingMetric> runCustomEvaluator(
654664
return Collections.emptyMap();
655665
}
656666

667+
/**
668+
* Merges the incoming evaluated metrics into actual evaluated metrics.
669+
*
670+
* @param actual The target evaluated metrics map to merge into.
671+
* @param incoming The incoming map containing new evaluated metrics map to be merged
672+
* (nullable).
673+
*/
657674
@VisibleForTesting
658675
protected static void mergeEvaluatedMetricsMaps(
659676
Map<ScalingMetric, EvaluatedScalingMetric> actual,
@@ -670,6 +687,13 @@ protected static void mergeEvaluatedMetricsMaps(
670687
::mergeEvaluatedScalingMetric)));
671688
}
672689

690+
/**
691+
* Merges two {@link EvaluatedScalingMetric} instances.
692+
*
693+
* @param actual The existing evaluated scaling metric.
694+
* @param incoming The incoming evaluated scaling metric.
695+
* @return A new {@link EvaluatedScalingMetric} instance with merged values.
696+
*/
673697
@VisibleForTesting
674698
protected static EvaluatedScalingMetric mergeEvaluatedScalingMetric(
675699
EvaluatedScalingMetric actual, EvaluatedScalingMetric incoming) {

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@
3737
*/
3838
public interface CustomEvaluator extends Plugin {
3939

40+
/**
41+
* Returns the name of the custom evaluator.
42+
*
43+
* @return The name of the custom evaluator.
44+
*/
45+
String getName();
46+
4047
/**
4148
* Evaluates scaling metrics for a given job vertex based on the internally evaluated metrics
4249
* and context.

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -466,29 +466,15 @@ private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> getEven
466466

467467
@Test
468468
void testGetCustomEvaluatorIfRequiredWithCustomEvaluator() {
469-
var topology = new JobTopology();
470-
471469
CustomEvaluator testCustomEvaluator = new TestCustomEvaluator();
472470
testCustomEvaluator.configure(new Configuration());
473-
var testCustomEvaluators =
474-
Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator);
471+
var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator);
475472

476-
String testCustomEvaluatorName = "testCustomEvaluator";
477-
String testCustomEvaluatorClassName = TestCustomEvaluator.class.getName();
473+
String testCustomEvaluatorName = "test-custom-evaluator";
478474

479475
var defaultConf = context.getConfiguration();
480476
defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName);
481477

482-
defaultConf.set(
483-
ConfigOptions.key(
484-
AutoScalerOptions.AUTOSCALER_CONF_PREFIX
485-
+ AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX
486-
+ testCustomEvaluatorName
487-
+ ".class")
488-
.stringType()
489-
.noDefaultValue(),
490-
testCustomEvaluatorClassName);
491-
492478
defaultConf.set(
493479
ConfigOptions.key(
494480
AutoScalerOptions.AUTOSCALER_CONF_PREFIX
@@ -525,17 +511,15 @@ void testGetCustomEvaluatorIfRequiredWithCustomEvaluator() {
525511
assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0);
526512
var customEvaluatorConfig = customEvaluatorWithConfig.f1;
527513
assertNotNull(customEvaluatorConfig);
528-
int expectedKeyCount = 3;
514+
int expectedKeyCount = 2;
529515
assertEquals(expectedKeyCount, customEvaluatorConfig.keySet().size());
530516

531-
Set<String> expectedKeys = Set.of("class", "k1", "k2");
517+
Set<String> expectedKeys = Set.of("k1", "k2");
532518
assertTrue(customEvaluatorConfig.keySet().containsAll(expectedKeys));
533519
}
534520

535521
@Test
536522
void testGetCustomEvaluatorIfRequiredWithoutCustomEvaluator() {
537-
var topology = new JobTopology();
538-
539523
var autoscaler =
540524
new JobAutoScalerImpl<>(
541525
null,
@@ -555,8 +539,33 @@ void testGetCustomEvaluatorIfRequiredWithoutCustomEvaluator() {
555539
void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNotConfigured() {
556540
CustomEvaluator testCustomEvaluator = new TestCustomEvaluator();
557541
testCustomEvaluator.configure(new Configuration());
558-
var testCustomEvaluators =
559-
Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator);
542+
var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator);
543+
544+
var autoscaler =
545+
new JobAutoScalerImpl<>(
546+
null,
547+
null,
548+
null,
549+
eventCollector,
550+
scalingRealizer,
551+
stateStore,
552+
testCustomEvaluators);
553+
554+
var customEvaluatorWithConfig =
555+
autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration());
556+
assertNull(customEvaluatorWithConfig);
557+
}
558+
559+
@Test
560+
void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNoMatches() {
561+
CustomEvaluator testCustomEvaluator = new TestCustomEvaluator();
562+
testCustomEvaluator.configure(new Configuration());
563+
var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator);
564+
565+
String testCustomEvaluatorName = "test-custom-evaluator-no-match";
566+
567+
var defaultConf = context.getConfiguration();
568+
defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName);
560569

561570
var autoscaler =
562571
new JobAutoScalerImpl<>(
@@ -572,4 +581,38 @@ void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNotConfigured() {
572581
autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration());
573582
assertNull(customEvaluatorWithConfig);
574583
}
584+
585+
@Test
586+
void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorNoConfig() {
587+
CustomEvaluator testCustomEvaluator = new TestCustomEvaluator();
588+
testCustomEvaluator.configure(new Configuration());
589+
var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator);
590+
591+
String testCustomEvaluatorName = "test-custom-evaluator";
592+
593+
var defaultConf = context.getConfiguration();
594+
defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName);
595+
596+
var autoscaler =
597+
new JobAutoScalerImpl<>(
598+
null,
599+
null,
600+
null,
601+
eventCollector,
602+
scalingRealizer,
603+
stateStore,
604+
testCustomEvaluators);
605+
606+
var customEvaluatorWithConfig =
607+
autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration());
608+
assertNotNull(customEvaluatorWithConfig);
609+
assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0);
610+
var customEvaluatorConfig = customEvaluatorWithConfig.f1;
611+
assertNotNull(customEvaluatorConfig);
612+
int expectedKeyCount = 0;
613+
assertEquals(expectedKeyCount, customEvaluatorConfig.keySet().size());
614+
615+
Set<String> expectedKeys = Set.of();
616+
assertTrue(customEvaluatorConfig.keySet().containsAll(expectedKeys));
617+
}
575618
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
* target data rate for source job vertices, enabling more responsive scaling decisions.
3333
*/
3434
public class SimpleTrendAdjustor implements CustomEvaluator {
35+
@Override
36+
public String getName() {
37+
return "simple-trend-adjustor";
38+
}
39+
3540
@Override
3641
public Map<ScalingMetric, EvaluatedScalingMetric> evaluateVertexMetrics(
3742
JobVertexID vertex,

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
* metric evaluations for job vertices in a Flink job.
3030
*/
3131
public class TestCustomEvaluator implements CustomEvaluator {
32+
@Override
33+
public String getName() {
34+
return "test-custom-evaluator";
35+
}
36+
3237
@Override
3338
public Map<ScalingMetric, EvaluatedScalingMetric> evaluateVertexMetrics(
3439
JobVertexID vertex,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,37 @@ public class AutoscalerUtils {
3737
* discovers custom evaluator's for autoscaler.
3838
*
3939
* @param configManager Flink Config manager
40-
* @return A map of discovered custom evaluators, where the key is the fully qualified class
41-
* name of the custom evaluator and the value is the corresponding instance.
40+
* @return A map of discovered custom evaluators, where the key is the evaluator name provided
41+
* by {@link CustomEvaluator#getName()}) and the value is the corresponding instance.
4242
*/
4343
public static Map<String, CustomEvaluator> discoverCustomEvaluators(
4444
FlinkConfigManager configManager) {
4545
var conf = configManager.getDefaultConfig();
4646
Map<String, CustomEvaluator> customEvaluators = new HashMap<>();
47-
4847
PluginUtils.createPluginManagerFromRootFolder(conf)
4948
.load(CustomEvaluator.class)
5049
.forEachRemaining(
5150
customEvaluator -> {
52-
String customEvaluatorClass = customEvaluator.getClass().getName();
51+
String customEvaluatorName = customEvaluator.getName();
5352
LOG.info(
5453
"Discovered custom evaluator for autoscaler from plugin directory[{}]: {}.",
5554
System.getenv()
5655
.getOrDefault(
5756
ConfigConstants.ENV_FLINK_PLUGINS_DIR,
5857
ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS),
59-
customEvaluatorClass);
58+
customEvaluatorName);
6059
customEvaluator.configure(conf);
61-
customEvaluators.put(customEvaluatorClass, customEvaluator);
60+
if (customEvaluators.containsKey(customEvaluatorName)) {
61+
LOG.warn(
62+
"Duplicate custom evaluator name [{}] detected. Overwriting existing [{}] with [{}].",
63+
customEvaluatorName,
64+
customEvaluators
65+
.get(customEvaluatorName)
66+
.getClass()
67+
.getName(),
68+
customEvaluator.getClass().getName());
69+
}
70+
customEvaluators.put(customEvaluatorName, customEvaluator);
6271
});
6372
return customEvaluators;
6473
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.flink.kubernetes.operator.autoscaler;
2020

21-
import org.apache.flink.autoscaler.metrics.TestCustomEvaluator;
2221
import org.apache.flink.configuration.ConfigConstants;
2322
import org.apache.flink.configuration.Configuration;
2423
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -56,7 +55,7 @@ public void testDiscoverCustomEvaluators() throws IOException {
5655
new FlinkConfigManager(new Configuration()))
5756
.keySet();
5857
// Expected evaluators
59-
var expectedEvaluators = new HashSet<>(List.of(TestCustomEvaluator.class.getName()));
58+
var expectedEvaluators = new HashSet<>(List.of("test-custom-evaluator"));
6059

6160
assertEquals(expectedEvaluators, discoveredEvaluators);
6261
} finally {

0 commit comments

Comments
 (0)