From 4da1b3069f946fac104673e96f92eff3ce8fcee8 Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Wed, 4 Dec 2024 19:22:50 +0800 Subject: [PATCH 1/2] [FLINK-36836][Autoscaler] Supports config the upper and lower limits of target utilization --- .../generated/auto_scaler_configuration.html | 18 +- .../flink/autoscaler/JobVertexScaler.java | 4 +- .../autoscaler/ScalingMetricEvaluator.java | 19 +- .../autoscaler/config/AutoScalerOptions.java | 24 ++- .../autoscaler/BacklogBasedScalingTest.java | 5 +- .../flink/autoscaler/JobVertexScalerTest.java | 33 +-- .../MetricsCollectionAndEvaluationTest.java | 10 +- .../RecommendedParallelismTest.java | 5 +- .../flink/autoscaler/ScalingExecutorTest.java | 83 ++++++-- .../ScalingMetricEvaluatorTest.java | 13 +- .../operator/validation/DefaultValidator.java | 16 +- .../validation/DefaultValidatorTest.java | 201 +++++++++++++++--- 12 files changed, 338 insertions(+), 93 deletions(-) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index 09baf21717..cc7e810d51 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -201,16 +201,22 @@ Stabilization period in which no new scaling will be executed -
job.autoscaler.target.utilization
- 0.7 +
job.autoscaler.utilization.max
+ 1.0 Double - Target vertex utilization + Max vertex utilization + + +
job.autoscaler.utilization.min
+ 0.4 + Double + Min vertex utilization -
job.autoscaler.target.utilization.boundary
- 0.3 +
job.autoscaler.utilization.target
+ 0.7 Double - Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)] + Target vertex utilization
job.autoscaler.vertex.exclude.ids
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 40f25c7782..41075b7f63 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -49,7 +49,7 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; @@ -158,7 +158,7 @@ public ParallelismChange computeScaleTargetParallelism( double targetCapacity = AutoScalerUtils.getTargetProcessingCapacity( - evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true, restartTime); + evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), true, restartTime); if (Double.isNaN(targetCapacity)) { LOG.warn( "Target data rate is not available for {}, cannot compute new parallelism", diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 5bbc09a3e2..91ea5fb638 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -44,8 +44,10 @@ import java.util.SortedMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO; @@ -284,8 +286,7 @@ protected static void computeProcessingRateThresholds( boolean processingBacklog, Duration restartTime) { - double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY); - double targetUtilization = conf.get(TARGET_UTILIZATION); + double targetUtilization = conf.get(UTILIZATION_TARGET); double upperUtilization; double lowerUtilization; @@ -296,8 +297,16 @@ protected static void computeProcessingRateThresholds( upperUtilization = 1.0; lowerUtilization = 0.0; } else { - upperUtilization = targetUtilization + utilizationBoundary; - lowerUtilization = targetUtilization - utilizationBoundary; + if (conf.getOptional(UTILIZATION_MAX).isPresent() + || conf.getOptional(UTILIZATION_MIN).isPresent() + || conf.getOptional(TARGET_UTILIZATION_BOUNDARY).isEmpty()) { + upperUtilization = conf.get(UTILIZATION_MAX); + lowerUtilization = conf.get(UTILIZATION_MIN); + } else { + Double boundary = conf.get(TARGET_UTILIZATION_BOUNDARY); + upperUtilization = targetUtilization + boundary; + lowerUtilization = targetUtilization - boundary; + } } double scaleUpThreshold = diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index a5ffb0f9d8..82904e3522 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -89,13 +89,17 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { + "seconds suffix, daily expression's formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 9:30:30-10:50:20 in Monday and Thursday " + "we can express it as 9:30:30-10:50:20 && * * * ? * 2,5"); - public static final ConfigOption TARGET_UTILIZATION = - autoScalerConfig("target.utilization") + public static final ConfigOption UTILIZATION_TARGET = + autoScalerConfig("utilization.target") .doubleType() .defaultValue(0.7) - .withFallbackKeys(oldOperatorConfigKey("target.utilization")) + .withDeprecatedKeys(autoScalerConfigKey("target.utilization")) + .withFallbackKeys( + oldOperatorConfigKey("utilization.target"), + oldOperatorConfigKey("target.utilization")) .withDescription("Target vertex utilization"); + @Deprecated public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() @@ -104,6 +108,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]"); + public static final ConfigOption UTILIZATION_MAX = + autoScalerConfig("utilization.max") + .doubleType() + .defaultValue(1.) + .withFallbackKeys(oldOperatorConfigKey("utilization.max")) + .withDescription("Max vertex utilization"); + + public static final ConfigOption UTILIZATION_MIN = + autoScalerConfig("utilization.min") + .doubleType() + .defaultValue(0.4) + .withFallbackKeys(oldOperatorConfigKey("utilization.min")) + .withDescription("Min vertex utilization"); + public static final ConfigOption SCALE_DOWN_INTERVAL = autoScalerConfig("scale-down.interval") .durationType() diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index 0aae09c99d..4599d6345b 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -95,8 +95,9 @@ public void setup() { defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true); defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8); + defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1)); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 3c557ab397..af01ce34fd 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -49,6 +49,7 @@ import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING; import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT; import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -98,7 +99,7 @@ public void setup() { @MethodSource("adjustmentInputsProvider") public void testParallelismScaling(Collection inputShipStrategies) { var op = new JobVertexID(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var delayedScaleDown = new DelayedScaleDown(); @@ -113,7 +114,7 @@ public void testParallelismScaling(Collection inputShipStrategies) restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(UTILIZATION_TARGET, .8); assertEquals( ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( @@ -125,7 +126,7 @@ public void testParallelismScaling(Collection inputShipStrategies) restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(UTILIZATION_TARGET, .8); assertEquals( ParallelismChange.noChange(), vertexScaler.computeScaleTargetParallelism( @@ -137,7 +138,7 @@ public void testParallelismScaling(Collection inputShipStrategies) restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(UTILIZATION_TARGET, .8); assertEquals( ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( @@ -160,7 +161,7 @@ public void testParallelismScaling(Collection inputShipStrategies) restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); + conf.set(UTILIZATION_TARGET, 0.5); assertEquals( ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( @@ -172,7 +173,7 @@ public void testParallelismScaling(Collection inputShipStrategies) restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); + conf.set(UTILIZATION_TARGET, 0.6); assertEquals( ParallelismChange.build(4), vertexScaler.computeScaleTargetParallelism( @@ -184,7 +185,7 @@ public void testParallelismScaling(Collection inputShipStrategies) restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( ParallelismChange.build(5), @@ -209,7 +210,7 @@ public void testParallelismScaling(Collection inputShipStrategies) restartTime, delayedScaleDown)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5); assertEquals( ParallelismChange.build(15), @@ -558,7 +559,7 @@ public void testMinParallelismLimitIsUsed() { @Test public void testMaxParallelismLimitIsUsed() { conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); var delayedScaleDown = new DelayedScaleDown(); assertEquals( @@ -587,7 +588,7 @@ public void testMaxParallelismLimitIsUsed() { @Test public void testDisableScaleDownInterval() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0)); var delayedScaleDown = new DelayedScaleDown(); @@ -597,7 +598,7 @@ public void testDisableScaleDownInterval() { @Test public void testScaleDownAfterInterval() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); @@ -629,7 +630,7 @@ public void testScaleDownAfterInterval() { @Test public void testImmediateScaleUpWithinScaleDownInterval() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); @@ -655,7 +656,7 @@ public void testImmediateScaleUpWithinScaleDownInterval() { @Test public void testCancelDelayedScaleDownAfterNewParallelismIsSame() { - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); @@ -701,7 +702,7 @@ private void assertParallelismChange( public void testIneffectiveScalingDetection() { var op = new JobVertexID(); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(UTILIZATION_TARGET, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var evaluated = evaluated(5, 100, 50); @@ -826,7 +827,7 @@ public void testIneffectiveScalingDetection() { public void testSendingIneffectiveScalingEvents(Collection inputShipStrategies) { var jobVertexID = new JobVertexID(); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(UTILIZATION_TARGET, 1.0); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var evaluated = evaluated(5, 100, 50); @@ -1082,7 +1083,7 @@ public void testNumPartitionsAdjustment() { @Test public void testSendingScalingLimitedEvents() { var jobVertexID = new JobVertexID(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(UTILIZATION_TARGET, 1.0); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO); var evaluated = evaluated(10, 200, 100); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 102586a3ef..ad2f5c7225 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -123,8 +123,9 @@ public void setup() { @Test public void testEndToEnd() throws Exception { var conf = context.getConfiguration(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.); setDefaultMetrics(metricsCollector); @@ -344,8 +345,9 @@ public void testMetricCollectorWindow() throws Exception { @Test public void testClearHistoryOnTopoChange() throws Exception { var conf = context.getConfiguration(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.); setDefaultMetrics(metricsCollector); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java index 4b35d72a1c..8637e126f7 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java @@ -86,8 +86,9 @@ public void setup() { defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true); defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8); - defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8); + defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); autoscaler = diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index d3fc167492..5efd13c7e5 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -128,15 +128,17 @@ public void testUtilizationBoundariesForAllRequiredVertices() throws Exception { var op1 = new JobVertexID(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); var evaluated = Map.of(op1, evaluated(1, 70, 100)); assertFalse( ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.4); evaluated = Map.of(op1, evaluated(1, 70, 100)); assertTrue( ScalingExecutor.allChangedVerticesWithinUtilizationTarget( @@ -178,8 +180,9 @@ public void testUtilizationBoundariesWithOptionalVertex() { var op2 = new JobVertexID(); // All vertices are optional - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); var evaluated = Map.of( @@ -194,7 +197,8 @@ op1, evaluated(1, 70, 100), // One vertex is required, and it's within the range. // The op2 is optional, so it shouldn't affect the scaling even if it is out of range, - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); evaluated = Map.of( op1, evaluated(1, 65, 100), @@ -206,15 +210,17 @@ op1, evaluated(1, 65, 100), @Test public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception { var conf = context.getConfiguration(); - // Target utilization and boundary are identical + // Utilization min max is set from 0 to 1 // which will set the scale down boundary to infinity - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.2); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.); var vertex = new JobVertexID(); int parallelism = 100; int expectedParallelism = 1; int targetRate = 1000; + // Intentionally also set the true processing rate to infinity // to test the boundaries of the scaling condition. double trueProcessingRate = Double.POSITIVE_INFINITY; @@ -249,6 +255,56 @@ public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception { new DelayedScaleDown())); } + @Test + public void testUtilizationBoundariesAndUtilizationMinMaxCompatibility() { + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO); + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + var op1 = new JobVertexID(); + var op2 = new JobVertexID(); + + // All vertices are optional + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1); + var evaluated = + Map.of( + op1, evaluated(1, 70, 100), + op2, evaluated(1, 85, 100)); + + // target boundary 0.1, target 0.6, max 0.7, min 0.5 + boolean boundaryOp1 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)); + boolean boundaryOp2 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2)); + + // Remove target boundary and use min max, should get the same result + conf.removeConfig(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.5); + boolean minMaxOp1 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)); + boolean minMaxOp2 = + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2)); + assertEquals(boundaryOp1, minMaxOp1); + assertEquals(boundaryOp2, minMaxOp2); + + // When the target boundary parameter is used, + // but the min max parameter is also set, + // the min max parameter shall prevail. + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3); + + evaluated = + Map.of( + op1, evaluated(2, 150, 100), + op2, evaluated(1, 85, 100)); + + assertFalse( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + } + @Test public void testVertexesExclusionForScaling() throws Exception { var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; @@ -266,7 +322,7 @@ public void testVertexesExclusionForScaling() throws Exception { var conf = context.getConfiguration(); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0)); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8); var metrics = new EvaluatedMetrics( Map.of( @@ -720,7 +776,7 @@ public void testAdjustByMaxParallelism() throws Exception { null)); var conf = context.getConfiguration(); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.d); // The expected new parallelism is 7 without adjustment by max parallelism. var metrics = @@ -774,8 +830,9 @@ public void testQuota( conf.setString("taskmanager.numberOfTaskSlots", "2"); cpuQuota.ifPresent(v -> conf.set(AutoScalerOptions.CPU_QUOTA, v)); memoryQuota.ifPresent(v -> conf.set(AutoScalerOptions.MEMORY_QUOTA, MemorySize.parse(v))); - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); - conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6); testQuotaReached(slotSharingGroupId1, slotSharingGroupId2, quotaReached, ctx); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index 969d70ee18..03e8fce587 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -41,8 +41,7 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.PREFER_TRACKED_RESTART_TIME; import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -265,8 +264,9 @@ public void testLagBasedSourceScaling() { public void testUtilizationBoundaryComputation() { var conf = new Configuration(); - conf.set(TARGET_UTILIZATION, 0.8); - conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1); + conf.set(UTILIZATION_TARGET, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); conf.set(RESTART_TIME, Duration.ofSeconds(1)); conf.set(CATCH_UP_DURATION, Duration.ZERO); @@ -287,8 +287,9 @@ public void testUtilizationBoundaryComputation() { public void testUtilizationBoundaryComputationWithRestartTimesTracking() { var conf = new Configuration(); - conf.set(TARGET_UTILIZATION, 0.8); - conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1); + conf.set(UTILIZATION_TARGET, 0.8); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); conf.set(RESTART_TIME, Duration.ofMinutes(10)); conf.set(CATCH_UP_DURATION, Duration.ZERO); conf.set(PREFER_TRACKED_RESTART_TIME, true); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index fad4553d6f..e2b5db8564 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -65,6 +65,10 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; + /** Default validator implementation for {@link FlinkDeployment}. */ public class DefaultValidator implements FlinkResourceValidator { @@ -605,9 +609,19 @@ public static Optional validateAutoScalerFlinkConfiguration( return firstPresent( validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d), validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d), - validateNumber(flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION, 0.0d), + validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d), validateNumber( flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d), + validateNumber( + flinkConfiguration, + UTILIZATION_MAX, + flinkConfiguration.get(UTILIZATION_TARGET), + 1.0d), + validateNumber( + flinkConfiguration, + UTILIZATION_MIN, + 0.0d, + flinkConfiguration.get(UTILIZATION_TARGET)), CalendarUtils.validateExcludedPeriods(flinkConfiguration)); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java index 5bf5033afb..8caa24eb28 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java @@ -802,22 +802,34 @@ public void testAutoScalerDeploymentWithInvalidNegativeUtilization() { var result = testAutoScalerConfiguration( flinkConf -> - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6")); assertErrorContains( - result, getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d)); + result, getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0d)); } @Test public void testAutoScalerDeploymentWithInvalidNegativeUtilizationBoundary() { - var result = + var resultMaxUtilization = testAutoScalerConfiguration( flinkConf -> - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), - "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6")); + assertErrorContains( + resultMaxUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0)); + + var resultMinUtilization = + testAutoScalerConfiguration( + flinkConf -> + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6")); assertErrorContains( - result, - getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d)); + resultMinUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0d, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); } @Test @@ -838,9 +850,9 @@ public void testNonEnabledAutoScalerDeploymentJob() { flinkConf.remove(AutoScalerOptions.AUTOSCALER_ENABLED.key()); flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6"); flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6"); - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6"); - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6"); }); assertErrorNotContains(result); } @@ -853,9 +865,9 @@ public void testDisabledEnabledAutoScalerDeploymentJob() { flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false"); flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6"); flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6"); - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6"); - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6"); }); assertErrorNotContains(result); } @@ -891,35 +903,59 @@ public void testValidateSessionJobWithInvalidNegativeUtilization() { var result = testSessionJobAutoScalerConfiguration( flinkConf -> - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-0.6")); assertErrorContains( - result, getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, 0.0d)); + result, getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0)); } @Test public void testValidateSessionJobWithInvalidNegativeUtilizationBoundary() { - var result = + var resultMaxUtilization = testSessionJobAutoScalerConfiguration( flinkConf -> - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), - "-0.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6")); assertErrorContains( - result, - getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d)); + resultMaxUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0d)); + + var resultMinUtilization = + testSessionJobAutoScalerConfiguration( + flinkConf -> + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6")); + assertErrorContains( + resultMinUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0d, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); } @Test public void testValidateSessionJobWithInvalidUtilizationBoundary() { - var result = + var resultMaxUtilization = testSessionJobAutoScalerConfiguration( flinkConf -> - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), - "-1.6")); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-0.6")); + assertErrorContains( + resultMaxUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0d)); + + var resultMinUtilization = + testSessionJobAutoScalerConfiguration( + flinkConf -> + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-0.6")); assertErrorContains( - result, - getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d)); + resultMinUtilization, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); } @Test @@ -940,13 +976,100 @@ public void testNonEnabledAutoScalerSessionJob() { flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false"); flinkConf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6"); flinkConf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6"); - flinkConf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6"); - flinkConf.put( - AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "-1.6"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "-1.6"); }); assertErrorNotContains(result); } + @Test + public void testAutoScalerUtilizationConfiguration() { + var deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4"); + }); + assertErrorContains( + deploymentResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 0.5, 1.0)); + + deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8"); + }); + assertErrorContains( + deploymentResult, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MIN, + 0.0, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue())); + + deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"); + }); + + assertErrorContains( + deploymentResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0, 1.0)); + + deploymentResult = + testAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"); + }); + assertErrorNotContains(deploymentResult); + + var sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.4"); + }); + assertErrorContains( + sessionResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_MAX, 0.5, 1.0)); + + sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"); + }); + assertErrorContains( + sessionResult, + getFormattedErrorMessage( + AutoScalerOptions.UTILIZATION_MAX, + AutoScalerOptions.UTILIZATION_TARGET.defaultValue(), + 1.0)); + + sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"); + }); + + assertErrorContains( + sessionResult, + getFormattedErrorMessage(AutoScalerOptions.UTILIZATION_TARGET, 0.0, 1.0)); + + sessionResult = + testSessionJobAutoScalerConfiguration( + flinkConf -> { + flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.2"); + flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.5"); + flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"); + }); + assertErrorNotContains(sessionResult); + } + private Optional testSessionJobAutoScalerConfiguration( Consumer> flinkConfigurationModifier) { var sessionCluster = TestUtils.buildSessionCluster(); @@ -972,8 +1095,9 @@ private Map getDefaultTestAutoScalerFlinkConfigurationMap() { conf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0"); conf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6"); conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(), "0.1"); - conf.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "0.7"); - conf.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "0.4"); + conf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.7"); + conf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "1.0"); + conf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3"); return conf; } @@ -986,6 +1110,17 @@ private static String getFormattedErrorMessage( max != null ? max.toString() : "+Infinity"); } + private static String getFormattedNumberOrderErrorMessage( + ConfigOption configValueLeft, ConfigOption configValueRight) { + return String.format( + "The AutoScalerOption %s or %s is invalid, %s must be less than or equal to the value of " + + "%s", + configValueLeft.key(), + configValueRight.key(), + configValueLeft.key(), + configValueRight.key()); + } + private static String getFormattedErrorMessage(ConfigOption configValue, Double min) { return getFormattedErrorMessage(configValue, min, null); } From 08315cd89a6a8b677606efae0d4309d85b9e5346 Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Wed, 18 Dec 2024 18:00:52 +0800 Subject: [PATCH 2/2] [FLINK-36836][Autoscaler] Remove the default value of max min, calculated by `utilization.target.boundary` --- .../generated/auto_scaler_configuration.html | 4 +-- .../autoscaler/ScalingMetricEvaluator.java | 17 ++++----- .../autoscaler/config/AutoScalerOptions.java | 4 +-- .../flink/autoscaler/ScalingExecutorTest.java | 36 ++++++++++++++++++- 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index cc7e810d51..ab2bbcb283 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -202,13 +202,13 @@
job.autoscaler.utilization.max
- 1.0 + (none) Double Max vertex utilization
job.autoscaler.utilization.min
- 0.4 + (none) Double Min vertex utilization diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 91ea5fb638..58c5dbe4a4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -287,6 +287,7 @@ protected static void computeProcessingRateThresholds( Duration restartTime) { double targetUtilization = conf.get(UTILIZATION_TARGET); + double utilizationBoundary = conf.get(TARGET_UTILIZATION_BOUNDARY); double upperUtilization; double lowerUtilization; @@ -297,16 +298,12 @@ protected static void computeProcessingRateThresholds( upperUtilization = 1.0; lowerUtilization = 0.0; } else { - if (conf.getOptional(UTILIZATION_MAX).isPresent() - || conf.getOptional(UTILIZATION_MIN).isPresent() - || conf.getOptional(TARGET_UTILIZATION_BOUNDARY).isEmpty()) { - upperUtilization = conf.get(UTILIZATION_MAX); - lowerUtilization = conf.get(UTILIZATION_MIN); - } else { - Double boundary = conf.get(TARGET_UTILIZATION_BOUNDARY); - upperUtilization = targetUtilization + boundary; - lowerUtilization = targetUtilization - boundary; - } + upperUtilization = + conf.getOptional(UTILIZATION_MAX) + .orElse(targetUtilization + utilizationBoundary); + lowerUtilization = + conf.getOptional(UTILIZATION_MIN) + .orElse(targetUtilization - utilizationBoundary); } double scaleUpThreshold = diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 82904e3522..980db2f4cc 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -111,14 +111,14 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption UTILIZATION_MAX = autoScalerConfig("utilization.max") .doubleType() - .defaultValue(1.) + .noDefaultValue() .withFallbackKeys(oldOperatorConfigKey("utilization.max")) .withDescription("Max vertex utilization"); public static final ConfigOption UTILIZATION_MIN = autoScalerConfig("utilization.min") .doubleType() - .defaultValue(0.4) + .noDefaultValue() .withFallbackKeys(oldOperatorConfigKey("utilization.min")) .withDescription("Min vertex utilization"); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 5efd13c7e5..a3bc674611 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -294,12 +294,46 @@ op1, evaluated(1, 70, 100), conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7); conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3); - evaluated = Map.of( op1, evaluated(2, 150, 100), op2, evaluated(1, 85, 100)); + assertFalse( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + + // When the target boundary parameter is used, + // but the max parameter is also set, + conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN); + conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.); + conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5); + conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6); + + evaluated = + Map.of( + op1, evaluated(2, 100, 99999), + op2, evaluated(1, 80, 99999)); + assertTrue( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + + evaluated = Map.of(op2, evaluated(1, 85, 100)); + assertFalse( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + + conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX); + conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3); + + evaluated = + Map.of( + op1, evaluated(2, 80, 81), + op2, evaluated(1, 100, 101)); + assertTrue( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( + evaluated, evaluated.keySet())); + evaluated = Map.of(op1, evaluated(1, 80, 79)); assertFalse( ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet()));