diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 09baf21717..ab2bbcb283 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -201,16 +201,22 @@
Stabilization period in which no new scaling will be executed |
- job.autoscaler.target.utilization |
- 0.7 |
+ job.autoscaler.utilization.max |
+ (none) |
Double |
- Target vertex utilization |
+ Max vertex utilization |
- job.autoscaler.target.utilization.boundary |
- 0.3 |
+ job.autoscaler.utilization.min |
+ (none) |
Double |
- Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)] |
+ Min vertex utilization |
+
+
+ job.autoscaler.utilization.target |
+ 0.7 |
+ Double |
+ Target vertex utilization |
job.autoscaler.vertex.exclude.ids |
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 40f25c7782..41075b7f63 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -49,7 +49,7 @@
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
-import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
@@ -158,7 +158,7 @@ public ParallelismChange computeScaleTargetParallelism(
double targetCapacity =
AutoScalerUtils.getTargetProcessingCapacity(
- evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true, restartTime);
+ evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), true, restartTime);
if (Double.isNaN(targetCapacity)) {
LOG.warn(
"Target data rate is not available for {}, cannot compute new parallelism",
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index 5bbc09a3e2..58c5dbe4a4 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -44,8 +44,10 @@
import java.util.SortedMap;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
-import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
@@ -284,8 +286,8 @@ protected static void computeProcessingRateThresholds(
boolean processingBacklog,
Duration restartTime) {
- double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
- double targetUtilization = conf.get(TARGET_UTILIZATION);
+ double targetUtilization = conf.get(UTILIZATION_TARGET);
+ double utilizationBoundary = conf.get(TARGET_UTILIZATION_BOUNDARY);
double upperUtilization;
double lowerUtilization;
@@ -296,8 +298,12 @@ protected static void computeProcessingRateThresholds(
upperUtilization = 1.0;
lowerUtilization = 0.0;
} else {
- upperUtilization = targetUtilization + utilizationBoundary;
- lowerUtilization = targetUtilization - utilizationBoundary;
+ upperUtilization =
+ conf.getOptional(UTILIZATION_MAX)
+ .orElse(targetUtilization + utilizationBoundary);
+ lowerUtilization =
+ conf.getOptional(UTILIZATION_MIN)
+ .orElse(targetUtilization - utilizationBoundary);
}
double scaleUpThreshold =
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index a5ffb0f9d8..980db2f4cc 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -89,13 +89,17 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
+ "seconds suffix, daily expression's formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 9:30:30-10:50:20 in Monday and Thursday "
+ "we can express it as 9:30:30-10:50:20 && * * * ? * 2,5");
- public static final ConfigOption TARGET_UTILIZATION =
- autoScalerConfig("target.utilization")
+ public static final ConfigOption UTILIZATION_TARGET =
+ autoScalerConfig("utilization.target")
.doubleType()
.defaultValue(0.7)
- .withFallbackKeys(oldOperatorConfigKey("target.utilization"))
+ .withDeprecatedKeys(autoScalerConfigKey("target.utilization"))
+ .withFallbackKeys(
+ oldOperatorConfigKey("utilization.target"),
+ oldOperatorConfigKey("target.utilization"))
.withDescription("Target vertex utilization");
+ @Deprecated
public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
autoScalerConfig("target.utilization.boundary")
.doubleType()
@@ -104,6 +108,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]");
+ public static final ConfigOption UTILIZATION_MAX =
+ autoScalerConfig("utilization.max")
+ .doubleType()
+ .noDefaultValue()
+ .withFallbackKeys(oldOperatorConfigKey("utilization.max"))
+ .withDescription("Max vertex utilization");
+
+ public static final ConfigOption UTILIZATION_MIN =
+ autoScalerConfig("utilization.min")
+ .doubleType()
+ .noDefaultValue()
+ .withFallbackKeys(oldOperatorConfigKey("utilization.min"))
+ .withDescription("Min vertex utilization");
+
public static final ConfigOption SCALE_DOWN_INTERVAL =
autoScalerConfig("scale-down.interval")
.durationType()
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 0aae09c99d..4599d6345b 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -95,8 +95,9 @@ public void setup() {
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
- defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
- defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1));
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 3c557ab397..af01ce34fd 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -49,6 +49,7 @@
import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -98,7 +99,7 @@ public void setup() {
@MethodSource("adjustmentInputsProvider")
public void testParallelismScaling(Collection inputShipStrategies) {
var op = new JobVertexID();
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
var delayedScaleDown = new DelayedScaleDown();
@@ -113,7 +114,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
restartTime,
delayedScaleDown));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ conf.set(UTILIZATION_TARGET, .8);
assertEquals(
ParallelismChange.build(8),
vertexScaler.computeScaleTargetParallelism(
@@ -125,7 +126,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
restartTime,
delayedScaleDown));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ conf.set(UTILIZATION_TARGET, .8);
assertEquals(
ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
@@ -137,7 +138,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
restartTime,
delayedScaleDown));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ conf.set(UTILIZATION_TARGET, .8);
assertEquals(
ParallelismChange.build(8),
vertexScaler.computeScaleTargetParallelism(
@@ -160,7 +161,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
restartTime,
delayedScaleDown));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+ conf.set(UTILIZATION_TARGET, 0.5);
assertEquals(
ParallelismChange.build(10),
vertexScaler.computeScaleTargetParallelism(
@@ -172,7 +173,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
restartTime,
delayedScaleDown));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+ conf.set(UTILIZATION_TARGET, 0.6);
assertEquals(
ParallelismChange.build(4),
vertexScaler.computeScaleTargetParallelism(
@@ -184,7 +185,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
restartTime,
delayedScaleDown));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
assertEquals(
ParallelismChange.build(5),
@@ -209,7 +210,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
restartTime,
delayedScaleDown));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
assertEquals(
ParallelismChange.build(15),
@@ -558,7 +559,7 @@ public void testMinParallelismLimitIsUsed() {
@Test
public void testMaxParallelismLimitIsUsed() {
conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
@@ -587,7 +588,7 @@ public void testMaxParallelismLimitIsUsed() {
@Test
public void testDisableScaleDownInterval() {
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0));
var delayedScaleDown = new DelayedScaleDown();
@@ -597,7 +598,7 @@ public void testDisableScaleDownInterval() {
@Test
public void testScaleDownAfterInterval() {
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
var instant = Instant.now();
@@ -629,7 +630,7 @@ public void testScaleDownAfterInterval() {
@Test
public void testImmediateScaleUpWithinScaleDownInterval() {
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
var instant = Instant.now();
@@ -655,7 +656,7 @@ public void testImmediateScaleUpWithinScaleDownInterval() {
@Test
public void testCancelDelayedScaleDownAfterNewParallelismIsSame() {
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
var instant = Instant.now();
@@ -701,7 +702,7 @@ private void assertParallelismChange(
public void testIneffectiveScalingDetection() {
var op = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
var evaluated = evaluated(5, 100, 50);
@@ -826,7 +827,7 @@ public void testIneffectiveScalingDetection() {
public void testSendingIneffectiveScalingEvents(Collection inputShipStrategies) {
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
+ conf.set(UTILIZATION_TARGET, 1.0);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
var evaluated = evaluated(5, 100, 50);
@@ -1082,7 +1083,7 @@ public void testNumPartitionsAdjustment() {
@Test
public void testSendingScalingLimitedEvents() {
var jobVertexID = new JobVertexID();
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
+ conf.set(UTILIZATION_TARGET, 1.0);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
var evaluated = evaluated(10, 200, 100);
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index 102586a3ef..ad2f5c7225 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -123,8 +123,9 @@ public void setup() {
@Test
public void testEndToEnd() throws Exception {
var conf = context.getConfiguration();
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);
setDefaultMetrics(metricsCollector);
@@ -344,8 +345,9 @@ public void testMetricCollectorWindow() throws Exception {
@Test
public void testClearHistoryOnTopoChange() throws Exception {
var conf = context.getConfiguration();
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);
setDefaultMetrics(metricsCollector);
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 4b35d72a1c..8637e126f7 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -86,8 +86,9 @@ public void setup() {
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
- defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
- defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
autoscaler =
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index d3fc167492..a3bc674611 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -128,15 +128,17 @@ public void testUtilizationBoundariesForAllRequiredVertices() throws Exception {
var op1 = new JobVertexID();
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
var evaluated = Map.of(op1, evaluated(1, 70, 100));
assertFalse(
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
evaluated, evaluated.keySet()));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.4);
evaluated = Map.of(op1, evaluated(1, 70, 100));
assertTrue(
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
@@ -178,8 +180,9 @@ public void testUtilizationBoundariesWithOptionalVertex() {
var op2 = new JobVertexID();
// All vertices are optional
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
var evaluated =
Map.of(
@@ -194,7 +197,8 @@ op1, evaluated(1, 70, 100),
// One vertex is required, and it's within the range.
// The op2 is optional, so it shouldn't affect the scaling even if it is out of range,
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
evaluated =
Map.of(
op1, evaluated(1, 65, 100),
@@ -206,15 +210,17 @@ op1, evaluated(1, 65, 100),
@Test
public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception {
var conf = context.getConfiguration();
- // Target utilization and boundary are identical
+ // Utilization min max is set from 0 to 1
// which will set the scale down boundary to infinity
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.2);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.);
var vertex = new JobVertexID();
int parallelism = 100;
int expectedParallelism = 1;
int targetRate = 1000;
+
// Intentionally also set the true processing rate to infinity
// to test the boundaries of the scaling condition.
double trueProcessingRate = Double.POSITIVE_INFINITY;
@@ -249,6 +255,90 @@ public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception {
new DelayedScaleDown()));
}
+ @Test
+ public void testUtilizationBoundariesAndUtilizationMinMaxCompatibility() {
+ var conf = context.getConfiguration();
+ conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+ var op1 = new JobVertexID();
+ var op2 = new JobVertexID();
+
+ // All vertices are optional
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ var evaluated =
+ Map.of(
+ op1, evaluated(1, 70, 100),
+ op2, evaluated(1, 85, 100));
+
+ // target boundary 0.1, target 0.6, max 0.7, min 0.5
+ boolean boundaryOp1 =
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1));
+ boolean boundaryOp2 =
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2));
+
+ // Remove target boundary and use min max, should get the same result
+ conf.removeConfig(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.5);
+ boolean minMaxOp1 =
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1));
+ boolean minMaxOp2 =
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2));
+ assertEquals(boundaryOp1, minMaxOp1);
+ assertEquals(boundaryOp2, minMaxOp2);
+
+ // When the target boundary parameter is used,
+ // but the min max parameter is also set,
+ // the min max parameter shall prevail.
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
+ evaluated =
+ Map.of(
+ op1, evaluated(2, 150, 100),
+ op2, evaluated(1, 85, 100));
+ assertFalse(
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
+
+ // When the target boundary parameter is used,
+ // but the max parameter is also set,
+ conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+
+ evaluated =
+ Map.of(
+ op1, evaluated(2, 100, 99999),
+ op2, evaluated(1, 80, 99999));
+ assertTrue(
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
+
+ evaluated = Map.of(op2, evaluated(1, 85, 100));
+ assertFalse(
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
+
+ conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
+
+ evaluated =
+ Map.of(
+ op1, evaluated(2, 80, 81),
+ op2, evaluated(1, 100, 101));
+ assertTrue(
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
+
+ evaluated = Map.of(op1, evaluated(1, 80, 79));
+ assertFalse(
+ ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
+ evaluated, evaluated.keySet()));
+ }
+
@Test
public void testVertexesExclusionForScaling() throws Exception {
var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
@@ -266,7 +356,7 @@ public void testVertexesExclusionForScaling() throws Exception {
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8);
var metrics =
new EvaluatedMetrics(
Map.of(
@@ -720,7 +810,7 @@ public void testAdjustByMaxParallelism() throws Exception {
null));
var conf = context.getConfiguration();
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.d);
// The expected new parallelism is 7 without adjustment by max parallelism.
var metrics =
@@ -774,8 +864,9 @@ public void testQuota(
conf.setString("taskmanager.numberOfTaskSlots", "2");
cpuQuota.ifPresent(v -> conf.set(AutoScalerOptions.CPU_QUOTA, v));
memoryQuota.ifPresent(v -> conf.set(AutoScalerOptions.MEMORY_QUOTA, MemorySize.parse(v)));
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
testQuotaReached(slotSharingGroupId1, slotSharingGroupId2, quotaReached, ctx);
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 969d70ee18..03e8fce587 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -41,8 +41,7 @@
import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PREFER_TRACKED_RESTART_TIME;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME;
-import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -265,8 +264,9 @@ public void testLagBasedSourceScaling() {
public void testUtilizationBoundaryComputation() {
var conf = new Configuration();
- conf.set(TARGET_UTILIZATION, 0.8);
- conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1);
+ conf.set(UTILIZATION_TARGET, 0.8);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
conf.set(RESTART_TIME, Duration.ofSeconds(1));
conf.set(CATCH_UP_DURATION, Duration.ZERO);
@@ -287,8 +287,9 @@ public void testUtilizationBoundaryComputation() {
public void testUtilizationBoundaryComputationWithRestartTimesTracking() {
var conf = new Configuration();
- conf.set(TARGET_UTILIZATION, 0.8);
- conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1);
+ conf.set(UTILIZATION_TARGET, 0.8);
+ conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
+ conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
conf.set(RESTART_TIME, Duration.ofMinutes(10));
conf.set(CATCH_UP_DURATION, Duration.ZERO);
conf.set(PREFER_TRACKED_RESTART_TIME, true);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index fad4553d6f..e2b5db8564 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -65,6 +65,10 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
+
/** Default validator implementation for {@link FlinkDeployment}. */
public class DefaultValidator implements FlinkResourceValidator {
@@ -605,9 +609,19 @@ public static Optional validateAutoScalerFlinkConfiguration(
return firstPresent(
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
- validateNumber(flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION, 0.0d),
+ validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d),
validateNumber(
flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
+ validateNumber(
+ flinkConfiguration,
+ UTILIZATION_MAX,
+ flinkConfiguration.get(UTILIZATION_TARGET),
+ 1.0d),
+ validateNumber(
+ flinkConfiguration,
+ UTILIZATION_MIN,
+ 0.0d,
+ flinkConfiguration.get(UTILIZATION_TARGET)),
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5bf5033afb..8caa24eb28 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -802,22 +802,34 @@ public void testAutoScalerDeploymentWithInvalidNegativeUtilization() {
var result =
testAutoScalerConfiguration(
flinkConf ->
- flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6"));
assertErrorContains(
- result, getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+ result, getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0d));
}
@Test
public void testAutoScalerDeploymentWithInvalidNegativeUtilizationBoundary() {
- var result =
+ var resultMaxUtilization =
testAutoScalerConfiguration(
flinkConf ->
- flinkConf.put(
- AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
- "-0.6"));
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6"));
+ assertErrorContains(
+ resultMaxUtilization,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MAX,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+ 1.0));
+
+ var resultMinUtilization =
+ testAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6"));
assertErrorContains(
- result,
- getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+ resultMinUtilization,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MIN,
+ 0.0d,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
}
@Test
@@ -838,9 +850,9 @@ public void testNonEnabledAutoScalerDeploymentJob() {
flinkConf.remove(AutoScalerOptions.AUTOSCALER_ENABLED.key());
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
- flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
- flinkConf.put(
- AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6");
});
assertErrorNotContains(result);
}
@@ -853,9 +865,9 @@ public void testDisabledEnabledAutoScalerDeploymentJob() {
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
- flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
- flinkConf.put(
- AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6");
});
assertErrorNotContains(result);
}
@@ -891,35 +903,59 @@ public void testValidateSessionJobWithInvalidNegativeUtilization() {
var result =
testSessionJobAutoScalerConfiguration(
flinkConf ->
- flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6"));
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6"));
assertErrorContains(
- result, getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d));
+ result, getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0));
}
@Test
public void testValidateSessionJobWithInvalidNegativeUtilizationBoundary() {
- var result =
+ var resultMaxUtilization =
testSessionJobAutoScalerConfiguration(
flinkConf ->
- flinkConf.put(
- AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
- "-0.6"));
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6"));
assertErrorContains(
- result,
- getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+ resultMaxUtilization,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MAX,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+ 1.0d));
+
+ var resultMinUtilization =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6"));
+ assertErrorContains(
+ resultMinUtilization,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MIN,
+ 0.0d,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
}
@Test
public void testValidateSessionJobWithInvalidUtilizationBoundary() {
- var result =
+ var resultMaxUtilization =
testSessionJobAutoScalerConfiguration(
flinkConf ->
- flinkConf.put(
- AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(),
- "-1.6"));
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6"));
+ assertErrorContains(
+ resultMaxUtilization,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MAX,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+ 1.0d));
+
+ var resultMinUtilization =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6"));
assertErrorContains(
- result,
- getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d));
+ resultMinUtilization,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MIN,
+ 0.0,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
}
@Test
@@ -940,13 +976,100 @@ public void testNonEnabledAutoScalerSessionJob() {
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
- flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
- flinkConf.put(
- AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6");
});
assertErrorNotContains(result);
}
+ @Test
+ public void testAutoScalerUtilizationConfiguration() {
+ var deploymentResult =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4");
+ });
+ assertErrorContains(
+ deploymentResult,
+ getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 0.5, 1.0));
+
+ deploymentResult =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8");
+ });
+ assertErrorContains(
+ deploymentResult,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MIN,
+ 0.0,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue()));
+
+ deploymentResult =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
+ });
+
+ assertErrorContains(
+ deploymentResult,
+ getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0, 1.0));
+
+ deploymentResult =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
+ });
+ assertErrorNotContains(deploymentResult);
+
+ var sessionResult =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4");
+ });
+ assertErrorContains(
+ sessionResult,
+ getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 0.5, 1.0));
+
+ sessionResult =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
+ });
+ assertErrorContains(
+ sessionResult,
+ getFormattedErrorMessage(
+ AutoScalerOptions.UTILIZATION_MAX,
+ AutoScalerOptions.UTILIZATION_TARGET.defaultValue(),
+ 1.0));
+
+ sessionResult =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
+ });
+
+ assertErrorContains(
+ sessionResult,
+ getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0, 1.0));
+
+ sessionResult =
+ testSessionJobAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5");
+ flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
+ });
+ assertErrorNotContains(sessionResult);
+ }
+
private Optional testSessionJobAutoScalerConfiguration(
Consumer