Skip to content

Commit ee9a953

Browse files
committed
1. Updated scaling coefficient compute logic to remove the weighting. 2. Check scaling coefficient with threshold before returning. 3. Refactored tests for point [1] and [2].
1 parent 6f01454 commit ee9a953

File tree

4 files changed

+153
-225
lines changed

4 files changed

+153
-225
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,9 @@ public ParallelismChange computeScaleTargetParallelism(
185185
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
186186
double scaleFactor = targetCapacity / averageTrueProcessingRate;
187187
if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
188+
188189
double scalingCoefficient =
189-
JobVertexScaler.calculateObservedScalingCoefficient(
190-
history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS));
190+
JobVertexScaler.calculateObservedScalingCoefficient(history, conf);
191191
scaleFactor = scaleFactor / scalingCoefficient;
192192
}
193193
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
@@ -251,22 +251,19 @@ public ParallelismChange computeScaleTargetParallelism(
251251
/**
252252
* Calculates the scaling coefficient based on historical scaling data.
253253
*
254-
* <p>The scaling coefficient is computed using a weighted least squares approach, where more
255-
* recent data points and those with higher parallelism are given higher weights. If there are
256-
* not enough observations, or if the computed coefficient is invalid, a default value of {@code
254+
* <p>The scaling coefficient is computed using the least squares approach. If there are not
255+
* enough observations, or if the computed coefficient is invalid, a default value of {@code
257256
* 1.0} is returned, assuming linear scaling.
258257
*
259258
* @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary}
260-
* @param minObservations The minimum number of observations required to compute the scaling
261-
* coefficient. If the number of historical entries is less than this threshold, a default
262-
* coefficient of {@code 1.0} is returned.
259+
* @param conf Deployment configuration.
263260
* @return The computed scaling coefficient.
264261
*/
265262
@VisibleForTesting
266263
protected static double calculateObservedScalingCoefficient(
267-
SortedMap<Instant, ScalingSummary> history, int minObservations) {
264+
SortedMap<Instant, ScalingSummary> history, Configuration conf) {
268265
/*
269-
* The scaling coefficient is computed using a **weighted least squares** approach
266+
* The scaling coefficient is computed using the least squares approach
270267
* to fit a linear model:
271268
*
272269
* R_i = β * P_i * α
@@ -277,18 +274,21 @@ protected static double calculateObservedScalingCoefficient(
277274
* - β = baseline processing rate
278275
* - α = scaling coefficient to optimize
279276
*
280-
* The optimization minimizes the **weighted sum of squared errors**:
277+
* The optimization minimizes the **sum of squared errors**:
281278
*
282-
* Loss = ∑ w_i * (R_i - β * α * P_i)^2
279+
* Loss = ∑ (R_i - β * α * P_i)^2
283280
*
284281
* Differentiating w.r.t. α and solving for α:
285282
*
286-
* α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β)
283+
* α = ∑ (P_i * R_i) / (∑ (P_i^2) * β)
287284
*
288-
* We keep the system conservative for higher returns scenario by clamping computed α within 1.0.
285+
* We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0.
286+
* If the computed coefficient falls below threshold, the system falls back to assuming linear scaling (α = 1.0).
289287
*/
290288

291-
// not enough data to compute scaling coefficient. we assume linear scaling.
289+
var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS);
290+
291+
// not enough data to compute scaling coefficient; we assume linear scaling.
292292
if (history.isEmpty() || history.size() < minObservations) {
293293
return 1.0;
294294
}
@@ -299,14 +299,10 @@ protected static double calculateObservedScalingCoefficient(
299299
return 1.0;
300300
}
301301

302-
Instant latestTimestamp = history.lastKey();
303-
304302
List<Double> parallelismList = new ArrayList<>();
305303
List<Double> processingRateList = new ArrayList<>();
306-
List<Double> weightList = new ArrayList<>();
307304

308305
for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
309-
Instant timestamp = entry.getKey();
310306
ScalingSummary summary = entry.getValue();
311307
double parallelism = summary.getCurrentParallelism();
312308
double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
@@ -317,25 +313,24 @@ protected static double calculateObservedScalingCoefficient(
317313
return 1.0;
318314
}
319315

320-
// Compute weight based on recency & parallelism
321-
double timeDiff =
322-
Duration.between(timestamp, latestTimestamp).getSeconds()
323-
+ 1; // Avoid division by zero
324-
double weight = parallelism / timeDiff;
325-
326316
parallelismList.add(parallelism);
327317
processingRateList.add(processingRate);
328-
weightList.add(weight);
329318
}
330319

331320
var coefficient =
332321
AutoScalerUtils.optimizeLinearScalingCoefficient(
333-
parallelismList,
334-
processingRateList,
335-
weightList,
336-
baselineProcessingRate,
337-
1,
338-
0.01);
322+
parallelismList, processingRateList, baselineProcessingRate, 1, 0.01);
323+
324+
double threshold =
325+
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)
326+
? conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD)
327+
: 0.5;
328+
329+
if (coefficient < threshold) {
330+
LOG.warn("Scaling coefficient is below threshold. Falling back to linear scaling.");
331+
return 1.0;
332+
}
333+
339334
return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue();
340335
}
341336

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,8 +394,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
394394
public static final ConfigOption<Integer> OBSERVED_SCALABILITY_MIN_OBSERVATIONS =
395395
autoScalerConfig("observed-scalability.min-observations")
396396
.intType()
397-
.defaultValue(5)
397+
.defaultValue(3)
398398
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.min-observations"))
399399
.withDescription(
400-
"Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. If the number of available observations is below this threshold, the system falls back to assuming linear scaling.");
400+
"Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. "
401+
+ "If the number of available observations is below this threshold, the system falls back to assuming linear scaling. "
402+
+ "Note: To effectively use a higher minimum observation count, you need to increase "
403+
+ VERTEX_SCALING_HISTORY_COUNT.key()
404+
+ ". Avoid setting "
405+
+ VERTEX_SCALING_HISTORY_COUNT.key()
406+
+ " to a very high value, as the number of retained data points is limited by the size of the state store—"
407+
+ "particularly when using Kubernetes-based state store.");
401408
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,16 @@ public static boolean excludeVerticesFromScaling(
101101
}
102102

103103
/**
104-
* Computes the optimized linear scaling coefficient (α) by minimizing the weighted least
105-
* squares error.
104+
* Computes the optimized linear scaling coefficient (α) by minimizing the least squares error.
106105
*
107106
* <p>This method estimates the scaling coefficient in a linear scaling model by fitting
108-
* observed processing rates and parallelism levels while applying weights to account for
109-
* recency and significance.
107+
* observed processing rates and parallelism levels.
110108
*
111109
* <p>The computed coefficient is clamped within the specified lower and upper bounds to ensure
112110
* stability and prevent extreme scaling adjustments.
113111
*
114112
* @param parallelismLevels List of parallelism levels.
115113
* @param processingRates List of observed processing rates.
116-
* @param weights List of weights for each observation.
117114
* @param baselineProcessingRate Baseline processing rate.
118115
* @param upperBound Maximum allowable value for the scaling coefficient.
119116
* @param lowerBound Minimum allowable value for the scaling coefficient.
@@ -123,28 +120,26 @@ public static boolean excludeVerticesFromScaling(
123120
public static double optimizeLinearScalingCoefficient(
124121
List<Double> parallelismLevels,
125122
List<Double> processingRates,
126-
List<Double> weights,
127123
double baselineProcessingRate,
128124
double upperBound,
129125
double lowerBound) {
130126

131-
double weightedSum = 0.0;
132-
double weightedSquaredSum = 0.0;
127+
double sum = 0.0;
128+
double squaredSum = 0.0;
133129

134130
for (int i = 0; i < parallelismLevels.size(); i++) {
135131
double parallelism = parallelismLevels.get(i);
136132
double processingRate = processingRates.get(i);
137-
double weight = weights.get(i);
138133

139-
weightedSum += weight * parallelism * processingRate;
140-
weightedSquaredSum += weight * parallelism * parallelism;
134+
sum += parallelism * processingRate;
135+
squaredSum += parallelism * parallelism;
141136
}
142137

143-
if (weightedSquaredSum == 0.0) {
138+
if (squaredSum == 0.0) {
144139
return 1.0; // Fallback to linear scaling if denominator is zero
145140
}
146141

147-
double alpha = weightedSum / (weightedSquaredSum * baselineProcessingRate);
142+
double alpha = sum / (squaredSum * baselineProcessingRate);
148143

149144
return Math.max(lowerBound, Math.min(upperBound, alpha));
150145
}

0 commit comments

Comments
 (0)