Skip to content

Commit 84c7683

Browse files
[FLINK-37126] Add Validator for Autoscaler
1 parent 3e3cb58 commit 84c7683

File tree

4 files changed

+230
-62
lines changed

4 files changed

+230
-62
lines changed

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.autoscaler.JobAutoScaler;
2222
import org.apache.flink.autoscaler.JobAutoScalerContext;
2323
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
24+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
2425
import org.apache.flink.configuration.Configuration;
2526
import org.apache.flink.configuration.UnmodifiableConfiguration;
2627
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -40,6 +41,7 @@
4041
import java.util.HashSet;
4142
import java.util.List;
4243
import java.util.Map;
44+
import java.util.Optional;
4345
import java.util.Set;
4446
import java.util.concurrent.CompletableFuture;
4547
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +52,7 @@
5052
import java.util.function.Function;
5153
import java.util.stream.Collectors;
5254

55+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5356
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
5457
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;
5558

@@ -68,6 +71,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
6871
private final ScheduledExecutorService scheduledExecutorService;
6972
private final ExecutorService scalingThreadPool;
7073
private final UnmodifiableConfiguration baseConf;
74+
private final AutoscalerValidator autoscalerValidator;
7175

7276
/**
7377
* Maintain a set of job keys that during scaling, it should be accessed at {@link
@@ -103,6 +107,7 @@ public StandaloneAutoscalerExecutor(
103107
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
104108
this.scalingJobKeys = new HashSet<>();
105109
this.baseConf = new UnmodifiableConfiguration(conf);
110+
this.autoscalerValidator = new AutoscalerValidator();
106111
}
107112

108113
public void start() {
@@ -188,7 +193,19 @@ private void cleanupStoppedJob(Collection<Context> jobList) {
188193
protected void scalingSingleJob(Context jobContext) {
189194
try {
190195
MDC.put("job.key", jobContext.getJobKey().toString());
191-
autoScaler.scale(jobContext);
196+
Optional<String> validationError =
197+
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
198+
if (validationError.isPresent()) {
199+
eventHandler.handleEvent(
200+
jobContext,
201+
AutoScalerEventHandler.Type.Warning,
202+
"AutoScaler Options Validation",
203+
validationError.get(),
204+
null,
205+
baseConf.get(SCALING_EVENT_INTERVAL));
206+
} else {
207+
autoScaler.scale(jobContext);
208+
}
192209
} catch (Throwable e) {
193210
LOG.error("Error while scaling job", e);
194211
eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.standalone;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.autoscaler.JobAutoScaler;
23+
import org.apache.flink.autoscaler.JobAutoScalerContext;
24+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
25+
import org.apache.flink.autoscaler.event.TestingEventCollector;
26+
import org.apache.flink.configuration.Configuration;
27+
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
class StandaloneAutoscalerValidatorTest {
39+
private List<JobAutoScalerContext<JobID>> jobList;
40+
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
41+
private ConcurrentHashMap<JobID, Integer> scaleCounter;
42+
private Configuration correctConfiguration;
43+
private Configuration invalidConfiguration;
44+
45+
@BeforeEach
46+
void setUp() {
47+
jobList = new ArrayList<>();
48+
eventCollector = new TestingEventCollector<>();
49+
scaleCounter = new ConcurrentHashMap<>();
50+
51+
correctConfiguration = new Configuration();
52+
correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
53+
54+
invalidConfiguration = new Configuration();
55+
invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
56+
invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, -1.0);
57+
}
58+
59+
@Test
60+
void testAutoScalerWithInvalidConfig() throws Exception {
61+
JobAutoScalerContext<JobID> validJob = createJobAutoScalerContext(correctConfiguration);
62+
JobAutoScalerContext<JobID> invalidJob = createJobAutoScalerContext(invalidConfiguration);
63+
64+
jobList.add(validJob);
65+
jobList.add(invalidJob);
66+
67+
final var jobAutoScaler =
68+
new JobAutoScaler<JobID, JobAutoScalerContext<JobID>>() {
69+
@Override
70+
public void scale(JobAutoScalerContext<JobID> context)
71+
throws InterruptedException {
72+
scaleCounter.merge(context.getJobKey(), 1, Integer::sum);
73+
}
74+
75+
@Override
76+
public void cleanup(JobAutoScalerContext<JobID> context) {
77+
// No cleanup required for the test
78+
}
79+
};
80+
81+
try (var autoscalerExecutor =
82+
new StandaloneAutoscalerExecutor<>(
83+
new Configuration(), baseConf -> jobList, eventCollector, jobAutoScaler)) {
84+
85+
List<CompletableFuture<Void>> scaledFutures = autoscalerExecutor.scaling();
86+
87+
for (CompletableFuture<Void> scaledFuture : scaledFutures) {
88+
scaledFuture.get();
89+
}
90+
91+
// Verification triggers two scaling tasks
92+
assertThat(scaledFutures).hasSize(2);
93+
94+
// Only legally configured tasks are scaled
95+
assertThat(scaleCounter).hasSize(1).containsKey(validJob.getJobKey());
96+
97+
// Verification Event Collector captures an event
98+
assertThat(eventCollector.events).hasSize(1);
99+
assertThat(eventCollector.events)
100+
.allMatch(event -> event.getContext().equals(invalidJob));
101+
}
102+
}
103+
104+
private JobAutoScalerContext<JobID> createJobAutoScalerContext(Configuration configuration) {
105+
JobID jobID = new JobID();
106+
return new JobAutoScalerContext<>(
107+
jobID, jobID, JobStatus.RUNNING, configuration, null, null);
108+
}
109+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.validation;
20+
21+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
22+
import org.apache.flink.autoscaler.utils.CalendarUtils;
23+
import org.apache.flink.configuration.ConfigOption;
24+
import org.apache.flink.configuration.Configuration;
25+
26+
import java.util.Optional;
27+
28+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
29+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
30+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
31+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
32+
33+
/** Validator for Autoscaler. */
34+
public class AutoscalerValidator {
35+
36+
/**
37+
* Validate autoscaler config and return optional error.
38+
*
39+
* @param flinkConf autoscaler config
40+
* @return Optional error string, should be present iff validation resulted in an error
41+
*/
42+
public Optional<String> validateAutoscalerOptions(Configuration flinkConf) {
43+
44+
if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
45+
return Optional.empty();
46+
}
47+
return firstPresent(
48+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
49+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
50+
validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
51+
validateNumber(flinkConf, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
52+
validateNumber(flinkConf, UTILIZATION_MAX, flinkConf.get(UTILIZATION_TARGET), 1.0d),
53+
validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, flinkConf.get(UTILIZATION_TARGET)),
54+
validateNumber(flinkConf, OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
55+
CalendarUtils.validateExcludedPeriods(flinkConf));
56+
}
57+
58+
@SafeVarargs
59+
private static Optional<String> firstPresent(Optional<String>... errOpts) {
60+
for (Optional<String> opt : errOpts) {
61+
if (opt.isPresent()) {
62+
return opt;
63+
}
64+
}
65+
return Optional.empty();
66+
}
67+
68+
private static <T extends Number> Optional<String> validateNumber(
69+
Configuration flinkConfiguration,
70+
ConfigOption<T> autoScalerConfig,
71+
Double min,
72+
Double max) {
73+
try {
74+
var configValue = flinkConfiguration.get(autoScalerConfig);
75+
if (configValue != null) {
76+
double value = configValue.doubleValue();
77+
if ((min != null && value < min) || (max != null && value > max)) {
78+
return Optional.of(
79+
String.format(
80+
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
81+
autoScalerConfig.key(),
82+
min != null ? min.toString() : "-Infinity",
83+
max != null ? max.toString() : "+Infinity"));
84+
}
85+
}
86+
return Optional.empty();
87+
} catch (IllegalArgumentException e) {
88+
return Optional.of(
89+
String.format(
90+
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
91+
}
92+
}
93+
94+
private static <T extends Number> Optional<String> validateNumber(
95+
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
96+
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
97+
}
98+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

Lines changed: 5 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717

1818
package org.apache.flink.kubernetes.operator.validation;
1919

20-
import org.apache.flink.autoscaler.config.AutoScalerOptions;
21-
import org.apache.flink.autoscaler.utils.CalendarUtils;
20+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
2221
import org.apache.flink.configuration.CheckpointingOptions;
23-
import org.apache.flink.configuration.ConfigOption;
2422
import org.apache.flink.configuration.Configuration;
2523
import org.apache.flink.configuration.HighAvailabilityOptions;
2624
import org.apache.flink.configuration.JobManagerOptions;
@@ -65,11 +63,6 @@
6563
import java.util.regex.Matcher;
6664
import java.util.regex.Pattern;
6765

68-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
69-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
70-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
71-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
72-
7366
/** Default validator implementation for {@link FlinkDeployment}. */
7467
public class DefaultValidator implements FlinkResourceValidator {
7568

@@ -88,9 +81,11 @@ public class DefaultValidator implements FlinkResourceValidator {
8881
Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
8982

9083
private final FlinkConfigManager configManager;
84+
private final AutoscalerValidator autoscalerValidator;
9185

9286
public DefaultValidator(FlinkConfigManager configManager) {
9387
this.configManager = configManager;
88+
this.autoscalerValidator = new AutoscalerValidator();
9489
}
9590

9691
@Override
@@ -598,63 +593,12 @@ private Optional<String> validateServiceAccount(String serviceAccount) {
598593
return Optional.empty();
599594
}
600595

601-
public static Optional<String> validateAutoScalerFlinkConfiguration(
596+
public Optional<String> validateAutoScalerFlinkConfiguration(
602597
Map<String, String> effectiveConfig) {
603598
if (effectiveConfig == null) {
604599
return Optional.empty();
605600
}
606601
Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig);
607-
if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
608-
return Optional.empty();
609-
}
610-
return firstPresent(
611-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
612-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
613-
validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d),
614-
validateNumber(
615-
flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
616-
validateNumber(
617-
flinkConfiguration,
618-
UTILIZATION_MAX,
619-
flinkConfiguration.get(UTILIZATION_TARGET),
620-
1.0d),
621-
validateNumber(
622-
flinkConfiguration,
623-
UTILIZATION_MIN,
624-
0.0d,
625-
flinkConfiguration.get(UTILIZATION_TARGET)),
626-
validateNumber(flinkConfiguration, OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
627-
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
628-
}
629-
630-
private static <T extends Number> Optional<String> validateNumber(
631-
Configuration flinkConfiguration,
632-
ConfigOption<T> autoScalerConfig,
633-
Double min,
634-
Double max) {
635-
try {
636-
var configValue = flinkConfiguration.get(autoScalerConfig);
637-
if (configValue != null) {
638-
double value = configValue.doubleValue();
639-
if ((min != null && value < min) || (max != null && value > max)) {
640-
return Optional.of(
641-
String.format(
642-
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
643-
autoScalerConfig.key(),
644-
min != null ? min.toString() : "-Infinity",
645-
max != null ? max.toString() : "+Infinity"));
646-
}
647-
}
648-
return Optional.empty();
649-
} catch (IllegalArgumentException e) {
650-
return Optional.of(
651-
String.format(
652-
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
653-
}
654-
}
655-
656-
private static <T extends Number> Optional<String> validateNumber(
657-
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
658-
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
602+
return autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
659603
}
660604
}

0 commit comments

Comments
 (0)