Skip to content

Commit 27daa5a

Browse files
authored
[FLINK-30571] Estimate scalability coefficient from past scaling history using linear regression (#966)
Currently, target parallelism computation assumes perfect linear scaling. However, real-time workloads often exhibit nonlinear scalability due to factors like network overhead and coordination costs. This change introduces an observed scalability coefficient, estimated using linear regression on past (parallelism, processing rate) data, to improve the accuracy of scaling decisions.
1 parent 9619ae6 commit 27daa5a

File tree

7 files changed

+462
-0
lines changed

7 files changed

+462
-0
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,24 @@
9898
<td>Duration</td>
9999
<td>Scaling metrics aggregation window size.</td>
100100
</tr>
101+
<tr>
102+
<td><h5>job.autoscaler.observed-scalability.coefficient-min</h5></td>
103+
<td style="word-wrap: break-word;">0.5</td>
104+
<td>Double</td>
105+
<td>Minimum allowed value for the observed scalability coefficient. Prevents aggressive scaling by clamping low coefficient estimates. If the estimated coefficient falls below this value, it is capped at the configured minimum.</td>
106+
</tr>
107+
<tr>
108+
<td><h5>job.autoscaler.observed-scalability.enabled</h5></td>
109+
<td style="word-wrap: break-word;">false</td>
110+
<td>Boolean</td>
111+
<td>Enables the use of an observed scalability coefficient when computing target parallelism. If enabled, the system will estimate the scalability coefficient based on historical scaling data instead of assuming perfect linear scaling. This helps account for real-world inefficiencies such as network overhead and coordination costs.</td>
112+
</tr>
113+
<tr>
114+
<td><h5>job.autoscaler.observed-scalability.min-observations</h5></td>
115+
<td style="word-wrap: break-word;">3</td>
116+
<td>Integer</td>
117+
<td>Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. If the number of available observations is below this threshold, the system falls back to assuming linear scaling. Note: To effectively use a higher minimum observation count, you need to increase job.autoscaler.history.max.count. Avoid setting job.autoscaler.history.max.count to a very high value, as the number of retained data points is limited by the size of the state store—particularly when using Kubernetes-based state store.</td>
118+
</tr>
101119
<tr>
102120
<td><h5>job.autoscaler.observed-true-processing-rate.lag-threshold</h5></td>
103121
<td style="word-wrap: break-word;">30 s</td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,24 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37+
import java.math.BigDecimal;
38+
import java.math.RoundingMode;
3739
import java.time.Clock;
3840
import java.time.Duration;
3941
import java.time.Instant;
4042
import java.time.ZoneId;
43+
import java.util.ArrayList;
4144
import java.util.Collection;
45+
import java.util.List;
4246
import java.util.Map;
4347
import java.util.Objects;
4448
import java.util.SortedMap;
4549

4650
import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
4751
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
4852
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
53+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED;
54+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS;
4955
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
5056
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5157
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
@@ -178,6 +184,13 @@ public ParallelismChange computeScaleTargetParallelism(
178184

179185
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
180186
double scaleFactor = targetCapacity / averageTrueProcessingRate;
187+
if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
188+
189+
double scalingCoefficient =
190+
JobVertexScaler.calculateObservedScalingCoefficient(history, conf);
191+
192+
scaleFactor = scaleFactor / scalingCoefficient;
193+
}
181194
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
182195
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
183196
if (scaleFactor < minScaleFactor) {
@@ -236,6 +249,83 @@ public ParallelismChange computeScaleTargetParallelism(
236249
delayedScaleDown);
237250
}
238251

252+
/**
253+
* Calculates the scaling coefficient based on historical scaling data.
254+
*
255+
* <p>The scaling coefficient is computed using the least squares approach. If there are not
256+
* enough observations, or if the computed coefficient is invalid, a default value of {@code
257+
* 1.0} is returned, assuming linear scaling.
258+
*
259+
* @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary}
260+
* @param conf Deployment configuration.
261+
* @return The computed scaling coefficient.
262+
*/
263+
@VisibleForTesting
264+
protected static double calculateObservedScalingCoefficient(
265+
SortedMap<Instant, ScalingSummary> history, Configuration conf) {
266+
/*
267+
* The scaling coefficient is computed using the least squares approach
268+
* to fit a linear model:
269+
*
270+
* R_i = β * P_i * α
271+
*
272+
* where:
273+
* - R_i = observed processing rate
274+
* - P_i = parallelism
275+
* - β = baseline processing rate
276+
* - α = scaling coefficient to optimize
277+
*
278+
* The optimization minimizes the **sum of squared errors**:
279+
*
280+
* Loss = ∑ (R_i - β * α * P_i)^2
281+
*
282+
* Differentiating w.r.t. α and solving for α:
283+
*
284+
* α = ∑ (P_i * R_i) / (∑ (P_i^2) * β)
285+
*
286+
* We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0.
287+
*/
288+
289+
var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS);
290+
291+
// not enough data to compute scaling coefficient; we assume linear scaling.
292+
if (history.isEmpty() || history.size() < minObservations) {
293+
return 1.0;
294+
}
295+
296+
var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history);
297+
298+
if (Double.isNaN(baselineProcessingRate)) {
299+
return 1.0;
300+
}
301+
302+
List<Double> parallelismList = new ArrayList<>();
303+
List<Double> processingRateList = new ArrayList<>();
304+
305+
for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
306+
ScalingSummary summary = entry.getValue();
307+
double parallelism = summary.getCurrentParallelism();
308+
double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
309+
310+
if (Double.isNaN(processingRate)) {
311+
LOG.warn(
312+
"True processing rate is not available in scaling history. Cannot compute scaling coefficient.");
313+
return 1.0;
314+
}
315+
316+
parallelismList.add(parallelism);
317+
processingRateList.add(processingRate);
318+
}
319+
320+
double lowerBound = conf.get(AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN);
321+
322+
var coefficient =
323+
AutoScalerUtils.optimizeLinearScalingCoefficient(
324+
parallelismList, processingRateList, baselineProcessingRate, 1, lowerBound);
325+
326+
return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue();
327+
}
328+
239329
private ParallelismChange detectBlockScaling(
240330
Context context,
241331
JobVertexID vertex,

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,4 +382,40 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
382382
"scaling.key-group.partitions.adjust.mode"))
383383
.withDescription(
384384
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
385+
386+
public static final ConfigOption<Boolean> OBSERVED_SCALABILITY_ENABLED =
387+
autoScalerConfig("observed-scalability.enabled")
388+
.booleanType()
389+
.defaultValue(false)
390+
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.enabled"))
391+
.withDescription(
392+
"Enables the use of an observed scalability coefficient when computing target parallelism. "
393+
+ "If enabled, the system will estimate the scalability coefficient based on historical scaling data "
394+
+ "instead of assuming perfect linear scaling. "
395+
+ "This helps account for real-world inefficiencies such as network overhead and coordination costs.");
396+
397+
public static final ConfigOption<Integer> OBSERVED_SCALABILITY_MIN_OBSERVATIONS =
398+
autoScalerConfig("observed-scalability.min-observations")
399+
.intType()
400+
.defaultValue(3)
401+
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.min-observations"))
402+
.withDescription(
403+
"Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. "
404+
+ "If the number of available observations is below this threshold, the system falls back to assuming linear scaling. "
405+
+ "Note: To effectively use a higher minimum observation count, you need to increase "
406+
+ VERTEX_SCALING_HISTORY_COUNT.key()
407+
+ ". Avoid setting "
408+
+ VERTEX_SCALING_HISTORY_COUNT.key()
409+
+ " to a very high value, as the number of retained data points is limited by the size of the state store—"
410+
+ "particularly when using Kubernetes-based state store.");
411+
412+
public static final ConfigOption<Double> OBSERVED_SCALABILITY_COEFFICIENT_MIN =
413+
autoScalerConfig("observed-scalability.coefficient-min")
414+
.doubleType()
415+
.defaultValue(0.5)
416+
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.coefficient-min"))
417+
.withDescription(
418+
"Minimum allowed value for the observed scalability coefficient. "
419+
+ "Prevents aggressive scaling by clamping low coefficient estimates. "
420+
+ "If the estimated coefficient falls below this value, it is capped at the configured minimum.");
385421
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,27 @@
1717

1818
package org.apache.flink.autoscaler.utils;
1919

20+
import org.apache.flink.autoscaler.ScalingSummary;
2021
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2122
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
2223
import org.apache.flink.autoscaler.metrics.ScalingMetric;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.runtime.jobgraph.JobVertexID;
2526

2627
import java.time.Duration;
28+
import java.time.Instant;
2729
import java.util.ArrayList;
2830
import java.util.Collection;
2931
import java.util.HashSet;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.NavigableMap;
3235
import java.util.Set;
36+
import java.util.SortedMap;
3337

3438
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
3539
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
40+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
3641

3742
/** AutoScaler utilities. */
3843
public class AutoScalerUtils {
@@ -94,4 +99,89 @@ public static boolean excludeVerticesFromScaling(
9499
conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new ArrayList<>(excludedIds));
95100
return anyAdded;
96101
}
102+
103+
/**
104+
* Computes the optimized linear scaling coefficient (α) by minimizing the least squares error.
105+
*
106+
* <p>This method estimates the scaling coefficient in a linear scaling model by fitting
107+
* observed processing rates and parallelism levels.
108+
*
109+
* <p>The computed coefficient is clamped within the specified lower and upper bounds to ensure
110+
* stability and prevent extreme scaling adjustments.
111+
*
112+
* @param parallelismLevels List of parallelism levels.
113+
* @param processingRates List of observed processing rates.
114+
* @param baselineProcessingRate Baseline processing rate.
115+
* @param upperBound Maximum allowable value for the scaling coefficient.
116+
* @param lowerBound Minimum allowable value for the scaling coefficient.
117+
* @return The optimized scaling coefficient (α), constrained within {@code [lowerBound,
118+
* upperBound]}.
119+
*/
120+
public static double optimizeLinearScalingCoefficient(
121+
List<Double> parallelismLevels,
122+
List<Double> processingRates,
123+
double baselineProcessingRate,
124+
double upperBound,
125+
double lowerBound) {
126+
127+
double sum = 0.0;
128+
double squaredSum = 0.0;
129+
130+
for (int i = 0; i < parallelismLevels.size(); i++) {
131+
double parallelism = parallelismLevels.get(i);
132+
double processingRate = processingRates.get(i);
133+
134+
sum += parallelism * processingRate;
135+
squaredSum += parallelism * parallelism;
136+
}
137+
138+
if (squaredSum == 0.0) {
139+
return 1.0; // Fallback to linear scaling if denominator is zero
140+
}
141+
142+
double alpha = sum / (squaredSum * baselineProcessingRate);
143+
144+
return Math.max(lowerBound, Math.min(upperBound, alpha));
145+
}
146+
147+
/**
148+
* Computes the baseline processing rate from historical scaling data.
149+
*
150+
* <p>The baseline processing rate represents the **processing rate per unit of parallelism**.
151+
* It is determined using the smallest observed parallelism in the history.
152+
*
153+
* @param history A {@code SortedMap} where keys are timestamps ({@code Instant}), and values
154+
* are {@code ScalingSummary} objects.
155+
* @return The computed baseline processing rate (processing rate per unit of parallelism).
156+
*/
157+
public static double computeBaselineProcessingRate(SortedMap<Instant, ScalingSummary> history) {
158+
ScalingSummary latestSmallestParallelismSummary = null;
159+
160+
for (Map.Entry<Instant, ScalingSummary> entry :
161+
((NavigableMap<Instant, ScalingSummary>) history).descendingMap().entrySet()) {
162+
ScalingSummary summary = entry.getValue();
163+
double parallelism = summary.getCurrentParallelism();
164+
165+
if (parallelism == 1) {
166+
return summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
167+
}
168+
169+
if (latestSmallestParallelismSummary == null
170+
|| parallelism < latestSmallestParallelismSummary.getCurrentParallelism()) {
171+
latestSmallestParallelismSummary = entry.getValue();
172+
}
173+
}
174+
175+
if (latestSmallestParallelismSummary == null) {
176+
return Double.NaN;
177+
}
178+
179+
double parallelism = latestSmallestParallelismSummary.getCurrentParallelism();
180+
double processingRate =
181+
latestSmallestParallelismSummary
182+
.getMetrics()
183+
.get(TRUE_PROCESSING_RATE)
184+
.getAverage();
185+
return processingRate / parallelism;
186+
}
97187
}

0 commit comments

Comments
 (0)