Skip to content

Commit 6385389

Browse files
committed
1. Clamped lowerBound for scaling coefficient to 0.5
1 parent ee9a953 commit 6385389

File tree

2 files changed

+15
-26
lines changed

2 files changed

+15
-26
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,9 @@ public ParallelismChange computeScaleTargetParallelism(
187187
if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
188188

189189
double scalingCoefficient =
190-
JobVertexScaler.calculateObservedScalingCoefficient(history, conf);
190+
JobVertexScaler.calculateObservedScalingCoefficient(
191+
history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS));
192+
191193
scaleFactor = scaleFactor / scalingCoefficient;
192194
}
193195
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
@@ -256,12 +258,14 @@ public ParallelismChange computeScaleTargetParallelism(
256258
* 1.0} is returned, assuming linear scaling.
257259
*
258260
* @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary}
259-
* @param conf Deployment configuration.
261+
* @param minObservations The minimum number of observations required to compute the scaling
262+
* coefficient. If the number of historical entries is less than this threshold, a default
263+
* coefficient of {@code 1.0} is returned.
260264
* @return The computed scaling coefficient.
261265
*/
262266
@VisibleForTesting
263267
protected static double calculateObservedScalingCoefficient(
264-
SortedMap<Instant, ScalingSummary> history, Configuration conf) {
268+
SortedMap<Instant, ScalingSummary> history, int minObservations) {
265269
/*
266270
* The scaling coefficient is computed using the least squares approach
267271
* to fit a linear model:
@@ -283,11 +287,8 @@ protected static double calculateObservedScalingCoefficient(
283287
* α = ∑ (P_i * R_i) / (∑ (P_i^2) * β)
284288
*
285289
* We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0.
286-
* If the computed coefficient falls below threshold, the system falls back to assuming linear scaling (α = 1.0).
287290
*/
288291

289-
var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS);
290-
291292
// not enough data to compute scaling coefficient; we assume linear scaling.
292293
if (history.isEmpty() || history.size() < minObservations) {
293294
return 1.0;
@@ -319,17 +320,7 @@ protected static double calculateObservedScalingCoefficient(
319320

320321
var coefficient =
321322
AutoScalerUtils.optimizeLinearScalingCoefficient(
322-
parallelismList, processingRateList, baselineProcessingRate, 1, 0.01);
323-
324-
double threshold =
325-
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)
326-
? conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD)
327-
: 0.5;
328-
329-
if (coefficient < threshold) {
330-
LOG.warn("Scaling coefficient is below threshold. Falling back to linear scaling.");
331-
return 1.0;
332-
}
323+
parallelismList, processingRateList, baselineProcessingRate, 1, 0.5);
333324

334325
return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue();
335326
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED;
5353
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS;
5454
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
55-
5655
import static org.assertj.core.api.Assertions.assertThat;
5756
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
5857
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -1179,7 +1178,7 @@ public void testCalculateScalingCoefficient() {
11791178
currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3));
11801179

11811180
double linearScalingScalingCoefficient =
1182-
JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, conf);
1181+
JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, 3);
11831182

11841183
assertEquals(1.0, linearScalingScalingCoefficient);
11851184

@@ -1199,7 +1198,7 @@ public void testCalculateScalingCoefficient() {
11991198

12001199
double slightDiminishingReturnsScalingCoefficient =
12011200
JobVertexScaler.calculateObservedScalingCoefficient(
1202-
slightDiminishingReturnsScalingHistory, conf);
1201+
slightDiminishingReturnsScalingHistory, 3);
12031202

12041203
assertTrue(
12051204
slightDiminishingReturnsScalingCoefficient > 0.9
@@ -1221,7 +1220,7 @@ public void testCalculateScalingCoefficient() {
12211220

12221221
double sharpDiminishingReturnsScalingCoefficient =
12231222
JobVertexScaler.calculateObservedScalingCoefficient(
1224-
sharpDiminishingReturnsScalingHistory, conf);
1223+
sharpDiminishingReturnsScalingHistory, 3);
12251224

12261225
assertTrue(
12271226
sharpDiminishingReturnsScalingCoefficient < 0.9
@@ -1245,7 +1244,7 @@ public void testCalculateScalingCoefficient() {
12451244

12461245
double sharpDiminishingReturnsWithOneParallelismScalingCoefficient =
12471246
JobVertexScaler.calculateObservedScalingCoefficient(
1248-
sharpDiminishingReturnsWithOneParallelismScalingHistory, conf);
1247+
sharpDiminishingReturnsWithOneParallelismScalingHistory, 3);
12491248

12501249
assertTrue(
12511250
sharpDiminishingReturnsWithOneParallelismScalingCoefficient < 0.9
@@ -1261,8 +1260,7 @@ public void testCalculateScalingCoefficient() {
12611260
currentTime, new ScalingSummary(4, 8, withOneScalingHistoryRecordEvaluatedData1));
12621261

12631262
double withOneScalingHistoryRecordScalingCoefficient =
1264-
JobVertexScaler.calculateObservedScalingCoefficient(
1265-
withOneScalingHistoryRecord, conf);
1263+
JobVertexScaler.calculateObservedScalingCoefficient(withOneScalingHistoryRecord, 1);
12661264

12671265
assertEquals(1, withOneScalingHistoryRecordScalingCoefficient);
12681266

@@ -1282,7 +1280,7 @@ public void testCalculateScalingCoefficient() {
12821280

12831281
double diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient =
12841282
JobVertexScaler.calculateObservedScalingCoefficient(
1285-
diminishingReturnWithTwoScalingHistoryRecord, conf);
1283+
diminishingReturnWithTwoScalingHistoryRecord, 1);
12861284

12871285
assertTrue(
12881286
diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient < 0.9
@@ -1302,7 +1300,7 @@ public void testCalculateScalingCoefficient() {
13021300

13031301
double linearReturnWithTwoScalingHistoryRecordScalingCoefficient =
13041302
JobVertexScaler.calculateObservedScalingCoefficient(
1305-
linearReturnWithTwoScalingHistoryRecord, conf);
1303+
linearReturnWithTwoScalingHistoryRecord, 1);
13061304

13071305
assertEquals(1, linearReturnWithTwoScalingHistoryRecordScalingCoefficient);
13081306
}

0 commit comments

Comments
 (0)