Skip to content

Commit 3adc6b3

Browse files
committed
1. Adding config for min value of Scaling coefficient. 2. Updated validator to validate the min scaling coefficient config.
1 parent 6385389 commit 3adc6b3

File tree

5 files changed

+47
-15
lines changed

5 files changed

+47
-15
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,7 @@ public ParallelismChange computeScaleTargetParallelism(
187187
if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
188188

189189
double scalingCoefficient =
190-
JobVertexScaler.calculateObservedScalingCoefficient(
191-
history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS));
190+
JobVertexScaler.calculateObservedScalingCoefficient(history, conf);
192191

193192
scaleFactor = scaleFactor / scalingCoefficient;
194193
}
@@ -258,14 +257,12 @@ public ParallelismChange computeScaleTargetParallelism(
258257
* 1.0} is returned, assuming linear scaling.
259258
*
260259
* @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary}
261-
* @param minObservations The minimum number of observations required to compute the scaling
262-
* coefficient. If the number of historical entries is less than this threshold, a default
263-
* coefficient of {@code 1.0} is returned.
260+
* @param conf Deployment configuration.
264261
* @return The computed scaling coefficient.
265262
*/
266263
@VisibleForTesting
267264
protected static double calculateObservedScalingCoefficient(
268-
SortedMap<Instant, ScalingSummary> history, int minObservations) {
265+
SortedMap<Instant, ScalingSummary> history, Configuration conf) {
269266
/*
270267
* The scaling coefficient is computed using the least squares approach
271268
* to fit a linear model:
@@ -289,6 +286,8 @@ protected static double calculateObservedScalingCoefficient(
289286
* We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0.
290287
*/
291288

289+
var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS);
290+
292291
// not enough data to compute scaling coefficient; we assume linear scaling.
293292
if (history.isEmpty() || history.size() < minObservations) {
294293
return 1.0;
@@ -318,9 +317,11 @@ protected static double calculateObservedScalingCoefficient(
318317
processingRateList.add(processingRate);
319318
}
320319

320+
double lowerBound = conf.get(AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN);
321+
321322
var coefficient =
322323
AutoScalerUtils.optimizeLinearScalingCoefficient(
323-
parallelismList, processingRateList, baselineProcessingRate, 1, 0.5);
324+
parallelismList, processingRateList, baselineProcessingRate, 1, lowerBound);
324325

325326
return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue();
326327
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,10 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
389389
.defaultValue(false)
390390
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.enabled"))
391391
.withDescription(
392-
"Enables the use of an observed scalability coefficient when computing target parallelism. If enabled, the system will estimate the scalability coefficient based on historical scaling data instead of assuming perfect linear scaling. This helps account for real-world inefficiencies such as network overhead and coordination costs.");
392+
"Enables the use of an observed scalability coefficient when computing target parallelism. "
393+
+ "If enabled, the system will estimate the scalability coefficient based on historical scaling data "
394+
+ "instead of assuming perfect linear scaling. "
395+
+ "This helps account for real-world inefficiencies such as network overhead and coordination costs.");
393396

394397
public static final ConfigOption<Integer> OBSERVED_SCALABILITY_MIN_OBSERVATIONS =
395398
autoScalerConfig("observed-scalability.min-observations")
@@ -405,4 +408,14 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
405408
+ VERTEX_SCALING_HISTORY_COUNT.key()
406409
+ " to a very high value, as the number of retained data points is limited by the size of the state store—"
407410
+ "particularly when using Kubernetes-based state store.");
411+
412+
public static final ConfigOption<Double> OBSERVED_SCALABILITY_COEFFICIENT_MIN =
413+
autoScalerConfig("observed-scalability.coefficient-min")
414+
.doubleType()
415+
.defaultValue(0.5)
416+
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.coefficient-min"))
417+
.withDescription(
418+
"Minimum allowed value for the observed scalability coefficient. "
419+
+ "Prevents aggressive scaling by clamping low coefficient estimates. "
420+
+ "If the estimated coefficient falls below this value, it is capped at the configured minimum.");
408421
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,7 +1178,7 @@ public void testCalculateScalingCoefficient() {
11781178
currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3));
11791179

11801180
double linearScalingScalingCoefficient =
1181-
JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, 3);
1181+
JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, conf);
11821182

11831183
assertEquals(1.0, linearScalingScalingCoefficient);
11841184

@@ -1198,7 +1198,7 @@ public void testCalculateScalingCoefficient() {
11981198

11991199
double slightDiminishingReturnsScalingCoefficient =
12001200
JobVertexScaler.calculateObservedScalingCoefficient(
1201-
slightDiminishingReturnsScalingHistory, 3);
1201+
slightDiminishingReturnsScalingHistory, conf);
12021202

12031203
assertTrue(
12041204
slightDiminishingReturnsScalingCoefficient > 0.9
@@ -1220,7 +1220,7 @@ public void testCalculateScalingCoefficient() {
12201220

12211221
double sharpDiminishingReturnsScalingCoefficient =
12221222
JobVertexScaler.calculateObservedScalingCoefficient(
1223-
sharpDiminishingReturnsScalingHistory, 3);
1223+
sharpDiminishingReturnsScalingHistory, conf);
12241224

12251225
assertTrue(
12261226
sharpDiminishingReturnsScalingCoefficient < 0.9
@@ -1244,7 +1244,7 @@ public void testCalculateScalingCoefficient() {
12441244

12451245
double sharpDiminishingReturnsWithOneParallelismScalingCoefficient =
12461246
JobVertexScaler.calculateObservedScalingCoefficient(
1247-
sharpDiminishingReturnsWithOneParallelismScalingHistory, 3);
1247+
sharpDiminishingReturnsWithOneParallelismScalingHistory, conf);
12481248

12491249
assertTrue(
12501250
sharpDiminishingReturnsWithOneParallelismScalingCoefficient < 0.9
@@ -1260,7 +1260,8 @@ public void testCalculateScalingCoefficient() {
12601260
currentTime, new ScalingSummary(4, 8, withOneScalingHistoryRecordEvaluatedData1));
12611261

12621262
double withOneScalingHistoryRecordScalingCoefficient =
1263-
JobVertexScaler.calculateObservedScalingCoefficient(withOneScalingHistoryRecord, 1);
1263+
JobVertexScaler.calculateObservedScalingCoefficient(
1264+
withOneScalingHistoryRecord, conf);
12641265

12651266
assertEquals(1, withOneScalingHistoryRecordScalingCoefficient);
12661267

@@ -1280,7 +1281,7 @@ public void testCalculateScalingCoefficient() {
12801281

12811282
double diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient =
12821283
JobVertexScaler.calculateObservedScalingCoefficient(
1283-
diminishingReturnWithTwoScalingHistoryRecord, 1);
1284+
diminishingReturnWithTwoScalingHistoryRecord, conf);
12841285

12851286
assertTrue(
12861287
diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient < 0.9
@@ -1300,7 +1301,7 @@ public void testCalculateScalingCoefficient() {
13001301

13011302
double linearReturnWithTwoScalingHistoryRecordScalingCoefficient =
13021303
JobVertexScaler.calculateObservedScalingCoefficient(
1303-
linearReturnWithTwoScalingHistoryRecord, 1);
1304+
linearReturnWithTwoScalingHistoryRecord, conf);
13041305

13051306
assertEquals(1, linearReturnWithTwoScalingHistoryRecordScalingCoefficient);
13061307
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.regex.Matcher;
6666
import java.util.regex.Pattern;
6767

68+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
6869
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
6970
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
7071
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
@@ -622,6 +623,7 @@ public static Optional<String> validateAutoScalerFlinkConfiguration(
622623
UTILIZATION_MIN,
623624
0.0d,
624625
flinkConfiguration.get(UTILIZATION_TARGET)),
626+
validateNumber(flinkConfiguration, OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
625627
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
626628
}
627629

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,21 @@ public void testAutoScalerDeploymentWithInvalidExcludedPeriods() {
842842
assertTrue(result.isPresent());
843843
}
844844

845+
@Test
846+
public void testAutoScalerDeploymentWithInvalidScalingCoefficientMin() {
847+
var result =
848+
testAutoScalerConfiguration(
849+
flinkConf ->
850+
flinkConf.put(
851+
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN
852+
.key(),
853+
"1.2"));
854+
assertErrorContains(
855+
result,
856+
getFormattedErrorMessage(
857+
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d));
858+
}
859+
845860
@Test
846861
public void testNonEnabledAutoScalerDeploymentJob() {
847862
var result =

0 commit comments

Comments
 (0)