Skip to content

Commit 4368ebe

Browse files
author
huyuanfeng
committed
[FLINK-36836][Autoscaler] Supports config the upper and lower limits of target utilization.
1 parent d9e8cce commit 4368ebe

File tree

11 files changed

+99
-80
lines changed

11 files changed

+99
-80
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
5050
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5151
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
52-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
52+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
5353
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
5454
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
@@ -158,7 +158,7 @@ public ParallelismChange computeScaleTargetParallelism(
158158

159159
double targetCapacity =
160160
AutoScalerUtils.getTargetProcessingCapacity(
161-
evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true, restartTime);
161+
evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), true, restartTime);
162162
if (Double.isNaN(targetCapacity)) {
163163
LOG.warn(
164164
"Target data rate is not available for {}, cannot compute new parallelism",

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
import java.util.SortedMap;
4545

4646
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
47-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
48-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
47+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_MAX;
48+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_MIN;
49+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
50+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY;
4951
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
5052
import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
5153
import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
@@ -284,8 +286,8 @@ protected static void computeProcessingRateThresholds(
284286
boolean processingBacklog,
285287
Duration restartTime) {
286288

287-
double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
288-
double targetUtilization = conf.get(TARGET_UTILIZATION);
289+
double utilizationBoundary = conf.getDouble(UTILIZATION_TARGET_BOUNDARY);
290+
double targetUtilization = conf.get(UTILIZATION_TARGET);
289291

290292
double upperUtilization;
291293
double lowerUtilization;
@@ -296,8 +298,17 @@ protected static void computeProcessingRateThresholds(
296298
upperUtilization = 1.0;
297299
lowerUtilization = 0.0;
298300
} else {
299-
upperUtilization = targetUtilization + utilizationBoundary;
300-
lowerUtilization = targetUtilization - utilizationBoundary;
301+
if (conf.get(TARGET_UTILIZATION_MAX) > 0) {
302+
upperUtilization = conf.get(TARGET_UTILIZATION_MAX);
303+
} else {
304+
upperUtilization = targetUtilization + utilizationBoundary;
305+
}
306+
307+
if (conf.get(TARGET_UTILIZATION_MIN) > 0) {
308+
lowerUtilization = conf.get(TARGET_UTILIZATION_MIN);
309+
} else {
310+
lowerUtilization = targetUtilization - utilizationBoundary;
311+
}
301312
}
302313

303314
double scaleUpThreshold =

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,35 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
8989
+ "seconds suffix, daily expression's formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 9:30:30-10:50:20 in Monday and Thursday "
9090
+ "we can express it as 9:30:30-10:50:20 && * * * ? * 2,5");
9191

92-
public static final ConfigOption<Double> TARGET_UTILIZATION =
93-
autoScalerConfig("target.utilization")
92+
public static final ConfigOption<Double> UTILIZATION_TARGET =
93+
autoScalerConfig("utilization.target")
9494
.doubleType()
9595
.defaultValue(0.7)
9696
.withFallbackKeys(oldOperatorConfigKey("target.utilization"))
9797
.withDescription("Target vertex utilization");
9898

99-
public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
100-
autoScalerConfig("target.utilization.boundary")
99+
public static final ConfigOption<Double> UTILIZATION_TARGET_BOUNDARY =
100+
autoScalerConfig("utilization.target.boundary")
101101
.doubleType()
102102
.defaultValue(0.3)
103103
.withFallbackKeys(oldOperatorConfigKey("target.utilization.boundary"))
104104
.withDescription(
105105
"Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]");
106106

107+
public static final ConfigOption<Double> TARGET_UTILIZATION_MAX =
108+
autoScalerConfig("utilization.target.max")
109+
.doubleType()
110+
.defaultValue(-1d)
111+
.withFallbackKeys(oldOperatorConfigKey("utilization.target.max"))
112+
.withDescription("Max vertex utilization");
113+
114+
public static final ConfigOption<Double> TARGET_UTILIZATION_MIN =
115+
autoScalerConfig("utilization.target.min")
116+
.doubleType()
117+
.defaultValue(-1d)
118+
.withFallbackKeys(oldOperatorConfigKey("utilization.target.min"))
119+
.withDescription("Min vertex utilization");
120+
107121
public static final ConfigOption<Duration> SCALE_DOWN_INTERVAL =
108122
autoScalerConfig("scale-down.interval")
109123
.durationType()

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ public void setup() {
9595
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
9696
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
9797
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
98-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
99-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
98+
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
99+
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.1);
100100
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
101101
defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1));
102102

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void setup() {
9898
@MethodSource("adjustmentInputsProvider")
9999
public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies) {
100100
var op = new JobVertexID();
101-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
101+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
102102
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
103103
var delayedScaleDown = new DelayedScaleDown();
104104

@@ -113,7 +113,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
113113
restartTime,
114114
delayedScaleDown));
115115

116-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
116+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8);
117117
assertEquals(
118118
ParallelismChange.build(8),
119119
vertexScaler.computeScaleTargetParallelism(
@@ -125,7 +125,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
125125
restartTime,
126126
delayedScaleDown));
127127

128-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
128+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8);
129129
assertEquals(
130130
ParallelismChange.noChange(),
131131
vertexScaler.computeScaleTargetParallelism(
@@ -137,7 +137,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
137137
restartTime,
138138
delayedScaleDown));
139139

140-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
140+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8);
141141
assertEquals(
142142
ParallelismChange.build(8),
143143
vertexScaler.computeScaleTargetParallelism(
@@ -160,7 +160,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
160160
restartTime,
161161
delayedScaleDown));
162162

163-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
163+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
164164
assertEquals(
165165
ParallelismChange.build(10),
166166
vertexScaler.computeScaleTargetParallelism(
@@ -172,7 +172,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
172172
restartTime,
173173
delayedScaleDown));
174174

175-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
175+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
176176
assertEquals(
177177
ParallelismChange.build(4),
178178
vertexScaler.computeScaleTargetParallelism(
@@ -184,7 +184,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
184184
restartTime,
185185
delayedScaleDown));
186186

187-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
187+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
188188
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
189189
assertEquals(
190190
ParallelismChange.build(5),
@@ -209,7 +209,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
209209
restartTime,
210210
delayedScaleDown));
211211

212-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
212+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
213213
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
214214
assertEquals(
215215
ParallelismChange.build(15),
@@ -558,7 +558,7 @@ public void testMinParallelismLimitIsUsed() {
558558
@Test
559559
public void testMaxParallelismLimitIsUsed() {
560560
conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
561-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
561+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
562562
var delayedScaleDown = new DelayedScaleDown();
563563

564564
assertEquals(
@@ -587,7 +587,7 @@ public void testMaxParallelismLimitIsUsed() {
587587

588588
@Test
589589
public void testDisableScaleDownInterval() {
590-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
590+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
591591
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0));
592592

593593
var delayedScaleDown = new DelayedScaleDown();
@@ -597,7 +597,7 @@ public void testDisableScaleDownInterval() {
597597

598598
@Test
599599
public void testScaleDownAfterInterval() {
600-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
600+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
601601
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
602602
var instant = Instant.now();
603603

@@ -629,7 +629,7 @@ public void testScaleDownAfterInterval() {
629629

630630
@Test
631631
public void testImmediateScaleUpWithinScaleDownInterval() {
632-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
632+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
633633
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
634634
var instant = Instant.now();
635635

@@ -655,7 +655,7 @@ public void testImmediateScaleUpWithinScaleDownInterval() {
655655

656656
@Test
657657
public void testCancelDelayedScaleDownAfterNewParallelismIsSame() {
658-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
658+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
659659
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
660660
var instant = Instant.now();
661661

@@ -701,7 +701,7 @@ private void assertParallelismChange(
701701
public void testIneffectiveScalingDetection() {
702702
var op = new JobVertexID();
703703
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
704-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
704+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
705705
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
706706

707707
var evaluated = evaluated(5, 100, 50);
@@ -826,7 +826,7 @@ public void testIneffectiveScalingDetection() {
826826
public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> inputShipStrategies) {
827827
var jobVertexID = new JobVertexID();
828828
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
829-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
829+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.0);
830830
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
831831

832832
var evaluated = evaluated(5, 100, 50);
@@ -1082,7 +1082,7 @@ public void testNumPartitionsAdjustment() {
10821082
@Test
10831083
public void testSendingScalingLimitedEvents() {
10841084
var jobVertexID = new JobVertexID();
1085-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
1085+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.0);
10861086
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
10871087
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
10881088
var evaluated = evaluated(10, 200, 100);

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ public void setup() {
123123
@Test
124124
public void testEndToEnd() throws Exception {
125125
var conf = context.getConfiguration();
126-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
127-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
126+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
127+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.);
128128

129129
setDefaultMetrics(metricsCollector);
130130

@@ -344,8 +344,8 @@ public void testMetricCollectorWindow() throws Exception {
344344
@Test
345345
public void testClearHistoryOnTopoChange() throws Exception {
346346
var conf = context.getConfiguration();
347-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
348-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
347+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
348+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.);
349349

350350
setDefaultMetrics(metricsCollector);
351351

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ public void setup() {
8686
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
8787
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
8888
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
89-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
90-
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
89+
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
90+
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.1);
9191
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
9292

9393
autoscaler =

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ public void testUtilizationBoundariesForAllRequiredVertices() throws Exception {
128128

129129
var op1 = new JobVertexID();
130130

131-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
132-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
131+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
132+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.);
133133

134134
var evaluated = Map.of(op1, evaluated(1, 70, 100));
135135
assertFalse(
136136
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
137137
evaluated, evaluated.keySet()));
138138

139-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
139+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.2);
140140
evaluated = Map.of(op1, evaluated(1, 70, 100));
141141
assertTrue(
142142
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
@@ -160,12 +160,6 @@ op1, evaluated(1, 70, 100),
160160
assertTrue(
161161
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
162162
evaluated, evaluated.keySet()));
163-
164-
// Test with backlog based scaling
165-
evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
166-
assertFalse(
167-
ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
168-
evaluated, evaluated.keySet()));
169163
}
170164

171165
@Test
@@ -178,8 +172,8 @@ public void testUtilizationBoundariesWithOptionalVertex() {
178172
var op2 = new JobVertexID();
179173

180174
// All vertices are optional
181-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
182-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
175+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
176+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.);
183177

184178
var evaluated =
185179
Map.of(
@@ -194,7 +188,7 @@ op1, evaluated(1, 70, 100),
194188

195189
// One vertex is required, and it's within the range.
196190
// The op2 is optional, so it shouldn't affect the scaling even if it is out of range,
197-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
191+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.1);
198192
evaluated =
199193
Map.of(
200194
op1, evaluated(1, 65, 100),
@@ -208,8 +202,8 @@ public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception {
208202
var conf = context.getConfiguration();
209203
// Target utilization and boundary are identical
210204
// which will set the scale down boundary to infinity
211-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
212-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.6);
205+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
206+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.6);
213207

214208
var vertex = new JobVertexID();
215209
int parallelism = 100;
@@ -266,7 +260,7 @@ public void testVertexesExclusionForScaling() throws Exception {
266260

267261
var conf = context.getConfiguration();
268262
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofSeconds(0));
269-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
263+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, .8);
270264
var metrics =
271265
new EvaluatedMetrics(
272266
Map.of(
@@ -720,7 +714,7 @@ public void testAdjustByMaxParallelism() throws Exception {
720714
null));
721715

722716
var conf = context.getConfiguration();
723-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
717+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.d);
724718

725719
// The expected new parallelism is 7 without adjustment by max parallelism.
726720
var metrics =
@@ -774,8 +768,8 @@ public void testQuota(
774768
conf.setString("taskmanager.numberOfTaskSlots", "2");
775769
cpuQuota.ifPresent(v -> conf.set(AutoScalerOptions.CPU_QUOTA, v));
776770
memoryQuota.ifPresent(v -> conf.set(AutoScalerOptions.MEMORY_QUOTA, MemorySize.parse(v)));
777-
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
778-
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
771+
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
772+
conf.set(AutoScalerOptions.UTILIZATION_TARGET_BOUNDARY, 0.);
779773

780774
testQuotaReached(slotSharingGroupId1, slotSharingGroupId2, quotaReached, ctx);
781775
}

0 commit comments

Comments
 (0)