Skip to content

Commit 66f7b42

Browse files
[FLINK-36836][Autoscaler] Supports config the upper and lower limits of target utilization (#921)
1 parent dfd66c2 commit 66f7b42

File tree

12 files changed

+369
-93
lines changed

12 files changed

+369
-93
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,16 +201,22 @@
201201
<td>Stabilization period in which no new scaling will be executed</td>
202202
</tr>
203203
<tr>
204-
<td><h5>job.autoscaler.target.utilization</h5></td>
205-
<td style="word-wrap: break-word;">0.7</td>
204+
<td><h5>job.autoscaler.utilization.max</h5></td>
205+
<td style="word-wrap: break-word;">(none)</td>
206206
<td>Double</td>
207-
<td>Target vertex utilization</td>
207+
<td>Max vertex utilization</td>
208208
</tr>
209209
<tr>
210-
<td><h5>job.autoscaler.target.utilization.boundary</h5></td>
211-
<td style="word-wrap: break-word;">0.3</td>
210+
<td><h5>job.autoscaler.utilization.min</h5></td>
211+
<td style="word-wrap: break-word;">(none)</td>
212212
<td>Double</td>
213-
<td>Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]</td>
213+
<td>Min vertex utilization</td>
214+
</tr>
215+
<tr>
216+
<td><h5>job.autoscaler.utilization.target</h5></td>
217+
<td style="word-wrap: break-word;">0.7</td>
218+
<td>Double</td>
219+
<td>Target vertex utilization</td>
214220
</tr>
215221
<tr>
216222
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
5050
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5151
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
52-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
52+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
5353
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
5454
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
@@ -158,7 +158,7 @@ public ParallelismChange computeScaleTargetParallelism(
158158

159159
double targetCapacity =
160160
AutoScalerUtils.getTargetProcessingCapacity(
161-
evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true, restartTime);
161+
evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), true, restartTime);
162162
if (Double.isNaN(targetCapacity)) {
163163
LOG.warn(
164164
"Target data rate is not available for {}, cannot compute new parallelism",

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
import java.util.SortedMap;
4545

4646
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
47-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
4847
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
48+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
49+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
50+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
4951
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
5052
import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
5153
import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
@@ -284,8 +286,8 @@ protected static void computeProcessingRateThresholds(
284286
boolean processingBacklog,
285287
Duration restartTime) {
286288

287-
double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
288-
double targetUtilization = conf.get(TARGET_UTILIZATION);
289+
double targetUtilization = conf.get(UTILIZATION_TARGET);
290+
double utilizationBoundary = conf.get(TARGET_UTILIZATION_BOUNDARY);
289291

290292
double upperUtilization;
291293
double lowerUtilization;
@@ -296,8 +298,12 @@ protected static void computeProcessingRateThresholds(
296298
upperUtilization = 1.0;
297299
lowerUtilization = 0.0;
298300
} else {
299-
upperUtilization = targetUtilization + utilizationBoundary;
300-
lowerUtilization = targetUtilization - utilizationBoundary;
301+
upperUtilization =
302+
conf.getOptional(UTILIZATION_MAX)
303+
.orElse(targetUtilization + utilizationBoundary);
304+
lowerUtilization =
305+
conf.getOptional(UTILIZATION_MIN)
306+
.orElse(targetUtilization - utilizationBoundary);
301307
}
302308

303309
double scaleUpThreshold =

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,17 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
8989
+ "seconds suffix, daily expression's formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 9:30:30-10:50:20 in Monday and Thursday "
9090
+ "we can express it as 9:30:30-10:50:20 && * * * ? * 2,5");
9191

92-
public static final ConfigOption<Double> TARGET_UTILIZATION =
93-
autoScalerConfig("target.utilization")
92+
public static final ConfigOption<Double> UTILIZATION_TARGET =
93+
autoScalerConfig("utilization.target")
9494
.doubleType()
9595
.defaultValue(0.7)
96-
.withFallbackKeys(oldOperatorConfigKey("target.utilization"))
96+
.withDeprecatedKeys(autoScalerConfigKey("target.utilization"))
97+
.withFallbackKeys(
98+
oldOperatorConfigKey("utilization.target"),
99+
oldOperatorConfigKey("target.utilization"))
97100
.withDescription("Target vertex utilization");
98101

102+
@Deprecated
99103
public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
100104
autoScalerConfig("target.utilization.boundary")
101105
.doubleType()
@@ -104,6 +108,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
104108
.withDescription(
105109
"Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]");
106110

111+
public static final ConfigOption<Double> UTILIZATION_MAX =
112+
autoScalerConfig("utilization.max")
113+
.doubleType()
114+
.noDefaultValue()
115+
.withFallbackKeys(oldOperatorConfigKey("utilization.max"))
116+
.withDescription("Max vertex utilization");
117+
118+
public static final ConfigOption<Double> UTILIZATION_MIN =
119+
autoScalerConfig("utilization.min")
120+
.doubleType()
121+
.noDefaultValue()
122+
.withFallbackKeys(oldOperatorConfigKey("utilization.min"))
123+
.withDescription("Min vertex utilization");
124+
107125
public static final ConfigOption<Duration> SCALE_DOWN_INTERVAL =
108126
autoScalerConfig("scale-down.interval")
109127
.durationType()

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ public void setup() {
9595
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
9696
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
9797
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
98-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
99-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
98+
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
99+
defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
100+
defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
100101
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
101102
defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1));
102103

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
5050
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
5151
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
52+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
5253
import static org.assertj.core.api.Assertions.assertThat;
5354
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
5455
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -98,7 +99,7 @@ public void setup() {
9899
@MethodSource("adjustmentInputsProvider")
99100
public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies) {
100101
var op = new JobVertexID();
101-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
102+
conf.set(UTILIZATION_TARGET, 1.);
102103
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
103104
var delayedScaleDown = new DelayedScaleDown();
104105

@@ -113,7 +114,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
113114
restartTime,
114115
delayedScaleDown));
115116

116-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
117+
conf.set(UTILIZATION_TARGET, .8);
117118
assertEquals(
118119
ParallelismChange.build(8),
119120
vertexScaler.computeScaleTargetParallelism(
@@ -125,7 +126,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
125126
restartTime,
126127
delayedScaleDown));
127128

128-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
129+
conf.set(UTILIZATION_TARGET, .8);
129130
assertEquals(
130131
ParallelismChange.noChange(),
131132
vertexScaler.computeScaleTargetParallelism(
@@ -137,7 +138,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
137138
restartTime,
138139
delayedScaleDown));
139140

140-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
141+
conf.set(UTILIZATION_TARGET, .8);
141142
assertEquals(
142143
ParallelismChange.build(8),
143144
vertexScaler.computeScaleTargetParallelism(
@@ -160,7 +161,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
160161
restartTime,
161162
delayedScaleDown));
162163

163-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
164+
conf.set(UTILIZATION_TARGET, 0.5);
164165
assertEquals(
165166
ParallelismChange.build(10),
166167
vertexScaler.computeScaleTargetParallelism(
@@ -172,7 +173,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
172173
restartTime,
173174
delayedScaleDown));
174175

175-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
176+
conf.set(UTILIZATION_TARGET, 0.6);
176177
assertEquals(
177178
ParallelismChange.build(4),
178179
vertexScaler.computeScaleTargetParallelism(
@@ -184,7 +185,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
184185
restartTime,
185186
delayedScaleDown));
186187

187-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
188+
conf.set(UTILIZATION_TARGET, 1.);
188189
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
189190
assertEquals(
190191
ParallelismChange.build(5),
@@ -209,7 +210,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
209210
restartTime,
210211
delayedScaleDown));
211212

212-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
213+
conf.set(UTILIZATION_TARGET, 1.);
213214
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
214215
assertEquals(
215216
ParallelismChange.build(15),
@@ -558,7 +559,7 @@ public void testMinParallelismLimitIsUsed() {
558559
@Test
559560
public void testMaxParallelismLimitIsUsed() {
560561
conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
561-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
562+
conf.set(UTILIZATION_TARGET, 1.);
562563
var delayedScaleDown = new DelayedScaleDown();
563564

564565
assertEquals(
@@ -587,7 +588,7 @@ public void testMaxParallelismLimitIsUsed() {
587588

588589
@Test
589590
public void testDisableScaleDownInterval() {
590-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
591+
conf.set(UTILIZATION_TARGET, 1.);
591592
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0));
592593

593594
var delayedScaleDown = new DelayedScaleDown();
@@ -597,7 +598,7 @@ public void testDisableScaleDownInterval() {
597598

598599
@Test
599600
public void testScaleDownAfterInterval() {
600-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
601+
conf.set(UTILIZATION_TARGET, 1.);
601602
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
602603
var instant = Instant.now();
603604

@@ -629,7 +630,7 @@ public void testScaleDownAfterInterval() {
629630

630631
@Test
631632
public void testImmediateScaleUpWithinScaleDownInterval() {
632-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
633+
conf.set(UTILIZATION_TARGET, 1.);
633634
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
634635
var instant = Instant.now();
635636

@@ -655,7 +656,7 @@ public void testImmediateScaleUpWithinScaleDownInterval() {
655656

656657
@Test
657658
public void testCancelDelayedScaleDownAfterNewParallelismIsSame() {
658-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
659+
conf.set(UTILIZATION_TARGET, 1.);
659660
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
660661
var instant = Instant.now();
661662

@@ -701,7 +702,7 @@ private void assertParallelismChange(
701702
public void testIneffectiveScalingDetection() {
702703
var op = new JobVertexID();
703704
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
704-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
705+
conf.set(UTILIZATION_TARGET, 1.);
705706
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
706707

707708
var evaluated = evaluated(5, 100, 50);
@@ -826,7 +827,7 @@ public void testIneffectiveScalingDetection() {
826827
public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> inputShipStrategies) {
827828
var jobVertexID = new JobVertexID();
828829
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
829-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
830+
conf.set(UTILIZATION_TARGET, 1.0);
830831
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
831832

832833
var evaluated = evaluated(5, 100, 50);
@@ -1082,7 +1083,7 @@ public void testNumPartitionsAdjustment() {
10821083
@Test
10831084
public void testSendingScalingLimitedEvents() {
10841085
var jobVertexID = new JobVertexID();
1085-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
1086+
conf.set(UTILIZATION_TARGET, 1.0);
10861087
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
10871088
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
10881089
var evaluated = evaluated(10, 200, 100);

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,9 @@ public void setup() {
123123
@Test
124124
public void testEndToEnd() throws Exception {
125125
var conf = context.getConfiguration();
126-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
127-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
126+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
127+
conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);
128+
conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);
128129

129130
setDefaultMetrics(metricsCollector);
130131

@@ -344,8 +345,9 @@ public void testMetricCollectorWindow() throws Exception {
344345
@Test
345346
public void testClearHistoryOnTopoChange() throws Exception {
346347
var conf = context.getConfiguration();
347-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
348-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
348+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
349+
conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);
350+
conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);
349351

350352
setDefaultMetrics(metricsCollector);
351353

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ public void setup() {
8686
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
8787
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
8888
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
89-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
90-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
89+
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
90+
defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
91+
defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
9192
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
9293

9394
autoscaler =

0 commit comments

Comments
 (0)