Skip to content

Commit 21bc13c

Browse files
author
huyuanfeng
committed
[FLINK-36836][Autoscaler] Remove the default value of max min, calculated by utilization.target.boundary
1 parent 65c53e7 commit 21bc13c

File tree

4 files changed

+46
-16
lines changed

4 files changed

+46
-16
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,13 @@
202202
</tr>
203203
<tr>
204204
<td><h5>job.autoscaler.utilization.max</h5></td>
205-
<td style="word-wrap: break-word;">1.0</td>
205+
<td style="word-wrap: break-word;">(none)</td>
206206
<td>Double</td>
207207
<td>Max vertex utilization</td>
208208
</tr>
209209
<tr>
210210
<td><h5>job.autoscaler.utilization.min</h5></td>
211-
<td style="word-wrap: break-word;">0.4</td>
211+
<td style="word-wrap: break-word;">(none)</td>
212212
<td>Double</td>
213213
<td>Min vertex utilization</td>
214214
</tr>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ protected static void computeProcessingRateThresholds(
287287
Duration restartTime) {
288288

289289
double targetUtilization = conf.get(UTILIZATION_TARGET);
290+
double utilizationBoundary = conf.get(TARGET_UTILIZATION_BOUNDARY);
290291

291292
double upperUtilization;
292293
double lowerUtilization;
@@ -297,16 +298,12 @@ protected static void computeProcessingRateThresholds(
297298
upperUtilization = 1.0;
298299
lowerUtilization = 0.0;
299300
} else {
300-
if (conf.getOptional(UTILIZATION_MAX).isPresent()
301-
|| conf.getOptional(UTILIZATION_MIN).isPresent()
302-
|| conf.getOptional(TARGET_UTILIZATION_BOUNDARY).isEmpty()) {
303-
upperUtilization = conf.get(UTILIZATION_MAX);
304-
lowerUtilization = conf.get(UTILIZATION_MIN);
305-
} else {
306-
Double boundary = conf.get(TARGET_UTILIZATION_BOUNDARY);
307-
upperUtilization = targetUtilization + boundary;
308-
lowerUtilization = targetUtilization - boundary;
309-
}
301+
upperUtilization =
302+
conf.getOptional(UTILIZATION_MAX)
303+
.orElse(targetUtilization + utilizationBoundary);
304+
lowerUtilization =
305+
conf.getOptional(UTILIZATION_MIN)
306+
.orElse(targetUtilization - utilizationBoundary);
310307
}
311308

312309
double scaleUpThreshold =

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
104104
autoScalerConfig("target.utilization.boundary")
105105
.doubleType()
106106
.defaultValue(0.3)
107-
.withFallbackKeys(oldOperatorConfigKey("target.utilization.boundary"))
108107
.withDescription(
109108
"Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]");
110109

111110
public static final ConfigOption<Double> UTILIZATION_MAX =
112111
autoScalerConfig("utilization.max")
113112
.doubleType()
114-
.defaultValue(1.)
113+
.noDefaultValue()
115114
.withFallbackKeys(oldOperatorConfigKey("utilization.max"))
116115
.withDescription("Max vertex utilization");
117116

118117
public static final ConfigOption<Double> UTILIZATION_MIN =
119118
autoScalerConfig("utilization.min")
120119
.doubleType()
121-
.defaultValue(0.4)
120+
.noDefaultValue()
122121
.withFallbackKeys(oldOperatorConfigKey("utilization.min"))
123122
.withDescription("Min vertex utilization");
124123

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,12 +294,46 @@ op1, evaluated(1, 70, 100),
294294
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
295295
conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
296296
conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
297-
298297
evaluated =
299298
Map.of(
300299
op1, evaluated(2, 150, 100),
301300
op2, evaluated(1, 85, 100));
301+
assertFalse(
302+
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
303+
evaluated, evaluated.keySet()));
304+
305+
// When the target boundary parameter is used,
306+
// but the max parameter is also set,
307+
conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN);
308+
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
309+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
310+
conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
311+
312+
evaluated =
313+
Map.of(
314+
op1, evaluated(2, 100, 99999),
315+
op2, evaluated(1, 80, 99999));
316+
assertTrue(
317+
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
318+
evaluated, evaluated.keySet()));
319+
320+
evaluated = Map.of(op2, evaluated(1, 85, 100));
321+
assertFalse(
322+
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
323+
evaluated, evaluated.keySet()));
324+
325+
conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX);
326+
conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
327+
328+
evaluated =
329+
Map.of(
330+
op1, evaluated(2, 80, 81),
331+
op2, evaluated(1, 100, 101));
332+
assertTrue(
333+
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
334+
evaluated, evaluated.keySet()));
302335

336+
evaluated = Map.of(op1, evaluated(1, 80, 79));
303337
assertFalse(
304338
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
305339
evaluated, evaluated.keySet()));

0 commit comments

Comments
 (0)