|
34 | 34 | import org.slf4j.Logger; |
35 | 35 | import org.slf4j.LoggerFactory; |
36 | 36 |
|
| 37 | +import java.math.BigDecimal; |
| 38 | +import java.math.RoundingMode; |
37 | 39 | import java.time.Clock; |
38 | 40 | import java.time.Duration; |
39 | 41 | import java.time.Instant; |
40 | 42 | import java.time.ZoneId; |
| 43 | +import java.util.ArrayList; |
41 | 44 | import java.util.Collection; |
| 45 | +import java.util.List; |
42 | 46 | import java.util.Map; |
43 | 47 | import java.util.Objects; |
44 | 48 | import java.util.SortedMap; |
45 | 49 |
|
46 | 50 | import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION; |
47 | 51 | import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; |
48 | 52 | import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; |
| 53 | +import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED; |
| 54 | +import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS; |
49 | 55 | import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL; |
50 | 56 | import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; |
51 | 57 | import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE; |
@@ -178,6 +184,12 @@ public ParallelismChange computeScaleTargetParallelism( |
178 | 184 |
|
179 | 185 | LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity); |
180 | 186 | double scaleFactor = targetCapacity / averageTrueProcessingRate; |
| 187 | + if (conf.get(OBSERVED_SCALABILITY_ENABLED)) { |
| 188 | + double scalingCoefficient = |
| 189 | + JobVertexScaler.calculateObservedScalingCoefficient( |
| 190 | + history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS)); |
| 191 | + scaleFactor = scaleFactor / scalingCoefficient; |
| 192 | + } |
181 | 193 | double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR); |
182 | 194 | double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR); |
183 | 195 | if (scaleFactor < minScaleFactor) { |
@@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism( |
236 | 248 | delayedScaleDown); |
237 | 249 | } |
238 | 250 |
|
| 251 | + /** |
| 252 | + * Calculates the scaling coefficient based on historical scaling data. |
| 253 | + * |
| 254 | + * <p>The scaling coefficient is computed using a weighted least squares approach, where more |
| 255 | + * recent data points and those with higher parallelism are given higher weights. If there are |
| 256 | + * not enough observations, or if the computed coefficient is invalid, a default value of {@code |
| 257 | + * 1.0} is returned, assuming linear scaling. |
| 258 | + * |
| 259 | + * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} |
| 260 | + * @param minObservations The minimum number of observations required to compute the scaling |
| 261 | + * coefficient. If the number of historical entries is less than this threshold, a default |
| 262 | + * coefficient of {@code 1.0} is returned. |
| 263 | + * @return The computed scaling coefficient. |
| 264 | + */ |
| 265 | + @VisibleForTesting |
| 266 | + protected static double calculateObservedScalingCoefficient( |
| 267 | + SortedMap<Instant, ScalingSummary> history, int minObservations) { |
| 268 | + /* |
| 269 | + * The scaling coefficient is computed using a **weighted least squares** approach |
| 270 | + * to fit a linear model: |
| 271 | + * |
| 272 | + * R_i = β * P_i * α |
| 273 | + * |
| 274 | + * where: |
| 275 | + * - R_i = observed processing rate |
| 276 | + * - P_i = parallelism |
| 277 | + * - β = baseline processing rate |
| 278 | + * - α = scaling coefficient to optimize |
| 279 | + * |
| 280 | + * The optimization minimizes the **weighted sum of squared errors**: |
| 281 | + * |
| 282 | + * Loss = ∑ w_i * (R_i - β * α * P_i)^2 |
| 283 | + * |
| 284 | + * Differentiating w.r.t. α and solving for α: |
| 285 | + * |
| 286 | + * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β) |
| 287 | + * |
| 288 | + * We keep the system conservative for higher returns scenario by clamping computed α within 1.0. |
| 289 | + */ |
| 290 | + |
| 291 | + // not enough data to compute scaling coefficient. we assume linear scaling. |
| 292 | + if (history.isEmpty() || history.size() < minObservations) { |
| 293 | + return 1.0; |
| 294 | + } |
| 295 | + |
| 296 | + var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history); |
| 297 | + |
| 298 | + if (Double.isNaN(baselineProcessingRate)) { |
| 299 | + return 1.0; |
| 300 | + } |
| 301 | + |
| 302 | + Instant latestTimestamp = history.lastKey(); |
| 303 | + |
| 304 | + List<Double> parallelismList = new ArrayList<>(); |
| 305 | + List<Double> processingRateList = new ArrayList<>(); |
| 306 | + List<Double> weightList = new ArrayList<>(); |
| 307 | + |
| 308 | + for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) { |
| 309 | + Instant timestamp = entry.getKey(); |
| 310 | + ScalingSummary summary = entry.getValue(); |
| 311 | + double parallelism = summary.getCurrentParallelism(); |
| 312 | + double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); |
| 313 | + |
| 314 | + if (Double.isNaN(processingRate)) { |
| 315 | + LOG.warn( |
| 316 | + "True processing rate is not available in scaling history. Cannot compute scaling coefficient."); |
| 317 | + return 1.0; |
| 318 | + } |
| 319 | + |
| 320 | + // Compute weight based on recency & parallelism |
| 321 | + double timeDiff = |
| 322 | + Duration.between(timestamp, latestTimestamp).getSeconds() |
| 323 | + + 1; // Avoid division by zero |
| 324 | + double weight = parallelism / timeDiff; |
| 325 | + |
| 326 | + parallelismList.add(parallelism); |
| 327 | + processingRateList.add(processingRate); |
| 328 | + weightList.add(weight); |
| 329 | + } |
| 330 | + |
| 331 | + var coefficient = |
| 332 | + AutoScalerUtils.optimizeLinearScalingCoefficient( |
| 333 | + parallelismList, |
| 334 | + processingRateList, |
| 335 | + weightList, |
| 336 | + baselineProcessingRate, |
| 337 | + 1, |
| 338 | + 0.01); |
| 339 | + return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue(); |
| 340 | + } |
| 341 | + |
239 | 342 | private ParallelismChange detectBlockScaling( |
240 | 343 | Context context, |
241 | 344 | JobVertexID vertex, |
|
0 commit comments