From 6f01454437acc8c15933e08bed5e239b94c7edf7 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Wed, 2 Apr 2025 17:46:10 +0530 Subject: [PATCH 1/5] [FLINK-30571] Estimate scalability coefficient from past scaling history using linear regression --- .../flink/autoscaler/JobVertexScaler.java | 103 +++++++ .../autoscaler/config/AutoScalerOptions.java | 16 + .../autoscaler/utils/AutoScalerUtils.java | 95 ++++++ .../flink/autoscaler/JobVertexScalerTest.java | 281 ++++++++++++++++++ 4 files changed, 495 insertions(+) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 4c185f89ea..14bf9322c3 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -34,11 +34,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.SortedMap; @@ -46,6 +50,8 @@ import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE; @@ -178,6 +184,12 @@ public ParallelismChange computeScaleTargetParallelism( LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity); double scaleFactor = targetCapacity / averageTrueProcessingRate; + if (conf.get(OBSERVED_SCALABILITY_ENABLED)) { + double scalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS)); + scaleFactor = scaleFactor / scalingCoefficient; + } double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR); double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR); if (scaleFactor < minScaleFactor) { @@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism( delayedScaleDown); } + /** + * Calculates the scaling coefficient based on historical scaling data. + * + *

The scaling coefficient is computed using a weighted least squares approach, where more + * recent data points and those with higher parallelism are given higher weights. If there are + * not enough observations, or if the computed coefficient is invalid, a default value of {@code + * 1.0} is returned, assuming linear scaling. + * + * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} + * @param minObservations The minimum number of observations required to compute the scaling + * coefficient. If the number of historical entries is less than this threshold, a default + * coefficient of {@code 1.0} is returned. + * @return The computed scaling coefficient. + */ + @VisibleForTesting + protected static double calculateObservedScalingCoefficient( + SortedMap history, int minObservations) { + /* + * The scaling coefficient is computed using a **weighted least squares** approach + * to fit a linear model: + * + * R_i = β * P_i * α + * + * where: + * - R_i = observed processing rate + * - P_i = parallelism + * - β = baseline processing rate + * - α = scaling coefficient to optimize + * + * The optimization minimizes the **weighted sum of squared errors**: + * + * Loss = ∑ w_i * (R_i - β * α * P_i)^2 + * + * Differentiating w.r.t. α and solving for α: + * + * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β) + * + * We keep the system conservative for higher returns scenario by clamping computed α within 1.0. + */ + + // not enough data to compute scaling coefficient. we assume linear scaling. + if (history.isEmpty() || history.size() < minObservations) { + return 1.0; + } + + var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history); + + if (Double.isNaN(baselineProcessingRate)) { + return 1.0; + } + + Instant latestTimestamp = history.lastKey(); + + List parallelismList = new ArrayList<>(); + List processingRateList = new ArrayList<>(); + List weightList = new ArrayList<>(); + + for (Map.Entry entry : history.entrySet()) { + Instant timestamp = entry.getKey(); + ScalingSummary summary = entry.getValue(); + double parallelism = summary.getCurrentParallelism(); + double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); + + if (Double.isNaN(processingRate)) { + LOG.warn( + "True processing rate is not available in scaling history. Cannot compute scaling coefficient."); + return 1.0; + } + + // Compute weight based on recency & parallelism + double timeDiff = + Duration.between(timestamp, latestTimestamp).getSeconds() + + 1; // Avoid division by zero + double weight = parallelism / timeDiff; + + parallelismList.add(parallelism); + processingRateList.add(processingRate); + weightList.add(weight); + } + + var coefficient = + AutoScalerUtils.optimizeLinearScalingCoefficient( + parallelismList, + processingRateList, + weightList, + baselineProcessingRate, + 1, + 0.01); + return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue(); + } + private ParallelismChange detectBlockScaling( Context context, JobVertexID vertex, diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 980db2f4cc..bd55a59bd7 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -382,4 +382,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { "scaling.key-group.partitions.adjust.mode")) .withDescription( "How to adjust the parallelism of Source vertex or upstream shuffle is keyBy"); + + public static final ConfigOption OBSERVED_SCALABILITY_ENABLED = + autoScalerConfig("observed-scalability.enabled") + .booleanType() + .defaultValue(false) + .withFallbackKeys(oldOperatorConfigKey("observed-scalability.enabled")) + .withDescription( + "Enables the use of an observed scalability coefficient when computing target parallelism. If enabled, the system will estimate the scalability coefficient based on historical scaling data instead of assuming perfect linear scaling. This helps account for real-world inefficiencies such as network overhead and coordination costs."); + + public static final ConfigOption OBSERVED_SCALABILITY_MIN_OBSERVATIONS = + autoScalerConfig("observed-scalability.min-observations") + .intType() + .defaultValue(5) + .withFallbackKeys(oldOperatorConfigKey("observed-scalability.min-observations")) + .withDescription( + "Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. If the number of available observations is below this threshold, the system falls back to assuming linear scaling."); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java index 411ab9b20d..da23df3386 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.autoscaler.utils; +import org.apache.flink.autoscaler.ScalingSummary; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; @@ -24,15 +25,19 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Set; +import java.util.SortedMap; import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** AutoScaler utilities. */ public class AutoScalerUtils { @@ -94,4 +99,94 @@ public static boolean excludeVerticesFromScaling( conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new ArrayList<>(excludedIds)); return anyAdded; } + + /** + * Computes the optimized linear scaling coefficient (α) by minimizing the weighted least + * squares error. + * + *

This method estimates the scaling coefficient in a linear scaling model by fitting + * observed processing rates and parallelism levels while applying weights to account for + * recency and significance. + * + *

The computed coefficient is clamped within the specified lower and upper bounds to ensure + * stability and prevent extreme scaling adjustments. + * + * @param parallelismLevels List of parallelism levels. + * @param processingRates List of observed processing rates. + * @param weights List of weights for each observation. + * @param baselineProcessingRate Baseline processing rate. + * @param upperBound Maximum allowable value for the scaling coefficient. + * @param lowerBound Minimum allowable value for the scaling coefficient. + * @return The optimized scaling coefficient (α), constrained within {@code [lowerBound, + * upperBound]}. + */ + public static double optimizeLinearScalingCoefficient( + List parallelismLevels, + List processingRates, + List weights, + double baselineProcessingRate, + double upperBound, + double lowerBound) { + + double weightedSum = 0.0; + double weightedSquaredSum = 0.0; + + for (int i = 0; i < parallelismLevels.size(); i++) { + double parallelism = parallelismLevels.get(i); + double processingRate = processingRates.get(i); + double weight = weights.get(i); + + weightedSum += weight * parallelism * processingRate; + weightedSquaredSum += weight * parallelism * parallelism; + } + + if (weightedSquaredSum == 0.0) { + return 1.0; // Fallback to linear scaling if denominator is zero + } + + double alpha = weightedSum / (weightedSquaredSum * baselineProcessingRate); + + return Math.max(lowerBound, Math.min(upperBound, alpha)); + } + + /** + * Computes the baseline processing rate from historical scaling data. + * + *

The baseline processing rate represents the **processing rate per unit of parallelism**. + * It is determined using the smallest observed parallelism in the history. + * + * @param history A {@code SortedMap} where keys are timestamps ({@code Instant}), and values + * are {@code ScalingSummary} objects. + * @return The computed baseline processing rate (processing rate per unit of parallelism). + */ + public static double computeBaselineProcessingRate(SortedMap history) { + ScalingSummary latestSmallestParallelismSummary = null; + + for (Map.Entry entry : + ((NavigableMap) history).descendingMap().entrySet()) { + ScalingSummary summary = entry.getValue(); + double parallelism = summary.getCurrentParallelism(); + + if (parallelism == 1) { + return summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); + } + + if (latestSmallestParallelismSummary == null + || parallelism < latestSmallestParallelismSummary.getCurrentParallelism()) { + latestSmallestParallelismSummary = entry.getValue(); + } + } + + if (latestSmallestParallelismSummary == null) { + return Double.NaN; + } + + double parallelism = latestSmallestParallelismSummary.getCurrentParallelism(); + double processingRate = + latestSmallestParallelismSummary + .getMetrics() + .get(TRUE_PROCESSING_RATE) + .getAverage(); + return processingRate / parallelism; + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 9cdc71596b..a8108b2ce2 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -49,6 +49,7 @@ import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING; import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT; import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -1156,4 +1157,284 @@ private Map evaluated( ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf, false, restartTime); return metrics; } + + @Test + public void testCalculateScalingCoefficient() { + var currentTime = Instant.now(); + + var linearScalingHistory = new TreeMap(); + var linearScalingEvaluatedData1 = evaluated(1, 100, 50); + var linearScalingEvaluatedData2 = evaluated(2, 200, 100); + var linearScalingEvaluatedData3 = evaluated(4, 100, 200); + var linearScalingEvaluatedData4 = evaluated(2, 400, 100); + var linearScalingEvaluatedData5 = evaluated(8, 800, 400); + + linearScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary(1, 2, linearScalingEvaluatedData1)); + linearScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary(2, 4, linearScalingEvaluatedData2)); + linearScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary(4, 2, linearScalingEvaluatedData3)); + linearScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 8, linearScalingEvaluatedData4)); + linearScalingHistory.put( + currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData5)); + + double linearScalingScalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, 5); + + assertEquals(1.0, linearScalingScalingCoefficient); + + var slightDiminishingReturnsScalingHistory = new TreeMap(); + var slightDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); + var slightDiminishingReturnsEvaluatedData2 = evaluated(2, 196, 98); + var slightDiminishingReturnsEvaluatedData3 = evaluated(4, 98, 196); + var slightDiminishingReturnsEvaluatedData4 = evaluated(2, 396, 99); + var slightDiminishingReturnsEvaluatedData5 = evaluated(8, 780, 390); + + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary(1, 2, slightDiminishingReturnsEvaluatedData1)); + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary(2, 4, slightDiminishingReturnsEvaluatedData2)); + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary(4, 2, slightDiminishingReturnsEvaluatedData3)); + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 8, slightDiminishingReturnsEvaluatedData4)); + slightDiminishingReturnsScalingHistory.put( + currentTime, new ScalingSummary(8, 16, slightDiminishingReturnsEvaluatedData5)); + + double slightDiminishingReturnsScalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + slightDiminishingReturnsScalingHistory, 5); + + assertTrue( + slightDiminishingReturnsScalingCoefficient > 0.9 + && slightDiminishingReturnsScalingCoefficient < 1); + + var sharpDiminishingReturnsScalingHistory = new TreeMap(); + var sharpDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); + var sharpDiminishingReturnsEvaluatedData2 = evaluated(2, 192, 96); + var sharpDiminishingReturnsEvaluatedData3 = evaluated(4, 80, 160); + var sharpDiminishingReturnsEvaluatedData4 = evaluated(2, 384, 96); + var sharpDiminishingReturnsEvaluatedData5 = evaluated(8, 480, 240); + + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary(1, 2, sharpDiminishingReturnsEvaluatedData1)); + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary(2, 4, sharpDiminishingReturnsEvaluatedData2)); + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary(4, 2, sharpDiminishingReturnsEvaluatedData3)); + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 8, sharpDiminishingReturnsEvaluatedData4)); + sharpDiminishingReturnsScalingHistory.put( + currentTime, new ScalingSummary(8, 16, sharpDiminishingReturnsEvaluatedData5)); + + double sharpDiminishingReturnsScalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + sharpDiminishingReturnsScalingHistory, 5); + + assertTrue( + sharpDiminishingReturnsScalingCoefficient < 0.9 + && sharpDiminishingReturnsScalingCoefficient > 0.4); + + var sharpDiminishingReturnsWithoutOneParallelismScalingHistory = + new TreeMap(); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1 = evaluated(4, 80, 160); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2 = evaluated(8, 480, 240); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3 = evaluated(16, 140, 280); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4 = evaluated(8, 480, 240); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5 = evaluated(16, 560, 280); + + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary( + 4, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary( + 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary( + 16, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary( + 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime, + new ScalingSummary( + 16, 32, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5)); + + double sharpDiminishingReturnsWithoutOneParallelismScalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + sharpDiminishingReturnsWithoutOneParallelismScalingHistory, 5); + + assertTrue( + sharpDiminishingReturnsWithoutOneParallelismScalingCoefficient < 0.9 + && sharpDiminishingReturnsWithoutOneParallelismScalingCoefficient > 0.4); + } + + @ParameterizedTest + @MethodSource("adjustmentInputsProvider") + public void testParallelismScalingWithObservedScalingCoefficient( + Collection inputShipStrategies) { + var op = new JobVertexID(); + var delayedScaleDown = new DelayedScaleDown(); + var currentTime = Instant.now(); + + conf.set(UTILIZATION_TARGET, 0.5); + conf.set(OBSERVED_SCALABILITY_ENABLED, true); + + var linearScalingHistory = new TreeMap(); + var linearScalingEvaluatedData1 = evaluated(1, 100, 50); + var linearScalingEvaluatedData2 = evaluated(2, 200, 100); + var linearScalingEvaluatedData3 = evaluated(4, 100, 200); + var linearScalingEvaluatedData4 = evaluated(2, 400, 100); + var linearScalingEvaluatedData5 = evaluated(8, 800, 400); + + linearScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary(1, 2, linearScalingEvaluatedData1)); + linearScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary(2, 4, linearScalingEvaluatedData2)); + linearScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary(4, 2, linearScalingEvaluatedData3)); + linearScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 8, linearScalingEvaluatedData4)); + linearScalingHistory.put( + currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData5)); + + assertEquals( + ParallelismChange.build(10, true), + vertexScaler.computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(2, 100, 40), + linearScalingHistory, + restartTime, + delayedScaleDown)); + + var slightDiminishingReturnsScalingHistory = new TreeMap(); + var slightDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); + var slightDiminishingReturnsEvaluatedData2 = evaluated(2, 196, 98); + var slightDiminishingReturnsEvaluatedData3 = evaluated(4, 98, 196); + var slightDiminishingReturnsEvaluatedData4 = evaluated(2, 396, 99); + var slightDiminishingReturnsEvaluatedData5 = evaluated(8, 780, 390); + + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary(1, 2, slightDiminishingReturnsEvaluatedData1)); + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary(2, 4, slightDiminishingReturnsEvaluatedData2)); + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary(4, 2, slightDiminishingReturnsEvaluatedData3)); + slightDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 8, slightDiminishingReturnsEvaluatedData4)); + slightDiminishingReturnsScalingHistory.put( + currentTime, new ScalingSummary(8, 16, slightDiminishingReturnsEvaluatedData5)); + + assertEquals( + ParallelismChange.build(12, true), + vertexScaler.computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(2, 100, 40), + slightDiminishingReturnsScalingHistory, + restartTime, + delayedScaleDown)); + + var sharpDiminishingReturnsScalingHistory = new TreeMap(); + var sharpDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); + var sharpDiminishingReturnsEvaluatedData2 = evaluated(2, 192, 96); + var sharpDiminishingReturnsEvaluatedData3 = evaluated(4, 80, 160); + var sharpDiminishingReturnsEvaluatedData4 = evaluated(2, 384, 96); + var sharpDiminishingReturnsEvaluatedData5 = evaluated(8, 480, 240); + + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary(1, 2, sharpDiminishingReturnsEvaluatedData1)); + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary(2, 4, sharpDiminishingReturnsEvaluatedData2)); + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary(4, 2, sharpDiminishingReturnsEvaluatedData3)); + sharpDiminishingReturnsScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 8, sharpDiminishingReturnsEvaluatedData4)); + sharpDiminishingReturnsScalingHistory.put( + currentTime, new ScalingSummary(8, 16, sharpDiminishingReturnsEvaluatedData5)); + + assertEquals( + ParallelismChange.build(18, true), + vertexScaler.computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(2, 100, 40), + sharpDiminishingReturnsScalingHistory, + restartTime, + delayedScaleDown)); + + var sharpDiminishingReturnsWithoutOneParallelismScalingHistory = + new TreeMap(); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1 = evaluated(4, 80, 160); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2 = evaluated(8, 480, 240); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3 = evaluated(16, 140, 280); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4 = evaluated(8, 480, 240); + var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5 = evaluated(16, 560, 280); + + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(40), + new ScalingSummary( + 4, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(30), + new ScalingSummary( + 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(20), + new ScalingSummary( + 16, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary( + 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4)); + sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + currentTime, + new ScalingSummary( + 16, 32, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5)); + + assertEquals( + ParallelismChange.build(24, true), + vertexScaler.computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(2, 100, 40), + sharpDiminishingReturnsWithoutOneParallelismScalingHistory, + restartTime, + delayedScaleDown)); + } } From ee9a953e869fa6fa08246564c288f96387be0cb7 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Wed, 30 Apr 2025 14:47:02 +0530 Subject: [PATCH 2/5] 1. Updated scaling coefficient compute logic to remove the weighting. 2. Check scaling coefficient with threshold before returning. 3. Refactored tests for point [1] and [2]. --- .../flink/autoscaler/JobVertexScaler.java | 59 ++-- .../autoscaler/config/AutoScalerOptions.java | 11 +- .../autoscaler/utils/AutoScalerUtils.java | 21 +- .../flink/autoscaler/JobVertexScalerTest.java | 287 +++++++----------- 4 files changed, 153 insertions(+), 225 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 14bf9322c3..2d8c9b8c9b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -185,9 +185,9 @@ public ParallelismChange computeScaleTargetParallelism( LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity); double scaleFactor = targetCapacity / averageTrueProcessingRate; if (conf.get(OBSERVED_SCALABILITY_ENABLED)) { + double scalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient( - history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS)); + JobVertexScaler.calculateObservedScalingCoefficient(history, conf); scaleFactor = scaleFactor / scalingCoefficient; } double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR); @@ -251,22 +251,19 @@ public ParallelismChange computeScaleTargetParallelism( /** * Calculates the scaling coefficient based on historical scaling data. * - *

The scaling coefficient is computed using a weighted least squares approach, where more - * recent data points and those with higher parallelism are given higher weights. If there are - * not enough observations, or if the computed coefficient is invalid, a default value of {@code + *

The scaling coefficient is computed using the least squares approach. If there are not + * enough observations, or if the computed coefficient is invalid, a default value of {@code * 1.0} is returned, assuming linear scaling. * * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} - * @param minObservations The minimum number of observations required to compute the scaling - * coefficient. If the number of historical entries is less than this threshold, a default - * coefficient of {@code 1.0} is returned. + * @param conf Deployment configuration. * @return The computed scaling coefficient. */ @VisibleForTesting protected static double calculateObservedScalingCoefficient( - SortedMap history, int minObservations) { + SortedMap history, Configuration conf) { /* - * The scaling coefficient is computed using a **weighted least squares** approach + * The scaling coefficient is computed using the least squares approach * to fit a linear model: * * R_i = β * P_i * α @@ -277,18 +274,21 @@ protected static double calculateObservedScalingCoefficient( * - β = baseline processing rate * - α = scaling coefficient to optimize * - * The optimization minimizes the **weighted sum of squared errors**: + * The optimization minimizes the **sum of squared errors**: * - * Loss = ∑ w_i * (R_i - β * α * P_i)^2 + * Loss = ∑ (R_i - β * α * P_i)^2 * * Differentiating w.r.t. α and solving for α: * - * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β) + * α = ∑ (P_i * R_i) / (∑ (P_i^2) * β) * - * We keep the system conservative for higher returns scenario by clamping computed α within 1.0. + * We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0. + * If the computed coefficient falls below threshold, the system falls back to assuming linear scaling (α = 1.0). */ - // not enough data to compute scaling coefficient. we assume linear scaling. + var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS); + + // not enough data to compute scaling coefficient; we assume linear scaling. if (history.isEmpty() || history.size() < minObservations) { return 1.0; } @@ -299,14 +299,10 @@ protected static double calculateObservedScalingCoefficient( return 1.0; } - Instant latestTimestamp = history.lastKey(); - List parallelismList = new ArrayList<>(); List processingRateList = new ArrayList<>(); - List weightList = new ArrayList<>(); for (Map.Entry entry : history.entrySet()) { - Instant timestamp = entry.getKey(); ScalingSummary summary = entry.getValue(); double parallelism = summary.getCurrentParallelism(); double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); @@ -317,25 +313,24 @@ protected static double calculateObservedScalingCoefficient( return 1.0; } - // Compute weight based on recency & parallelism - double timeDiff = - Duration.between(timestamp, latestTimestamp).getSeconds() - + 1; // Avoid division by zero - double weight = parallelism / timeDiff; - parallelismList.add(parallelism); processingRateList.add(processingRate); - weightList.add(weight); } var coefficient = AutoScalerUtils.optimizeLinearScalingCoefficient( - parallelismList, - processingRateList, - weightList, - baselineProcessingRate, - 1, - 0.01); + parallelismList, processingRateList, baselineProcessingRate, 1, 0.01); + + double threshold = + conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED) + ? conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD) + : 0.5; + + if (coefficient < threshold) { + LOG.warn("Scaling coefficient is below threshold. Falling back to linear scaling."); + return 1.0; + } + return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue(); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index bd55a59bd7..c62616fc0f 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -394,8 +394,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption OBSERVED_SCALABILITY_MIN_OBSERVATIONS = autoScalerConfig("observed-scalability.min-observations") .intType() - .defaultValue(5) + .defaultValue(3) .withFallbackKeys(oldOperatorConfigKey("observed-scalability.min-observations")) .withDescription( - "Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. If the number of available observations is below this threshold, the system falls back to assuming linear scaling."); + "Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. " + + "If the number of available observations is below this threshold, the system falls back to assuming linear scaling. " + + "Note: To effectively use a higher minimum observation count, you need to increase " + + VERTEX_SCALING_HISTORY_COUNT.key() + + ". Avoid setting " + + VERTEX_SCALING_HISTORY_COUNT.key() + + " to a very high value, as the number of retained data points is limited by the size of the state store—" + + "particularly when using Kubernetes-based state store."); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java index da23df3386..837d429b42 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java @@ -101,19 +101,16 @@ public static boolean excludeVerticesFromScaling( } /** - * Computes the optimized linear scaling coefficient (α) by minimizing the weighted least - * squares error. + * Computes the optimized linear scaling coefficient (α) by minimizing the least squares error. * *

This method estimates the scaling coefficient in a linear scaling model by fitting - * observed processing rates and parallelism levels while applying weights to account for - * recency and significance. + * observed processing rates and parallelism levels. * *

The computed coefficient is clamped within the specified lower and upper bounds to ensure * stability and prevent extreme scaling adjustments. * * @param parallelismLevels List of parallelism levels. * @param processingRates List of observed processing rates. - * @param weights List of weights for each observation. * @param baselineProcessingRate Baseline processing rate. * @param upperBound Maximum allowable value for the scaling coefficient. * @param lowerBound Minimum allowable value for the scaling coefficient. @@ -123,28 +120,26 @@ public static boolean excludeVerticesFromScaling( public static double optimizeLinearScalingCoefficient( List parallelismLevels, List processingRates, - List weights, double baselineProcessingRate, double upperBound, double lowerBound) { - double weightedSum = 0.0; - double weightedSquaredSum = 0.0; + double sum = 0.0; + double squaredSum = 0.0; for (int i = 0; i < parallelismLevels.size(); i++) { double parallelism = parallelismLevels.get(i); double processingRate = processingRates.get(i); - double weight = weights.get(i); - weightedSum += weight * parallelism * processingRate; - weightedSquaredSum += weight * parallelism * parallelism; + sum += parallelism * processingRate; + squaredSum += parallelism * parallelism; } - if (weightedSquaredSum == 0.0) { + if (squaredSum == 0.0) { return 1.0; // Fallback to linear scaling if denominator is zero } - double alpha = weightedSum / (weightedSquaredSum * baselineProcessingRate); + double alpha = sum / (squaredSum * baselineProcessingRate); return Math.max(lowerBound, Math.min(upperBound, alpha)); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index a8108b2ce2..c475095936 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -50,7 +50,9 @@ import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT; import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS; import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -1163,128 +1165,146 @@ public void testCalculateScalingCoefficient() { var currentTime = Instant.now(); var linearScalingHistory = new TreeMap(); - var linearScalingEvaluatedData1 = evaluated(1, 100, 50); - var linearScalingEvaluatedData2 = evaluated(2, 200, 100); - var linearScalingEvaluatedData3 = evaluated(4, 100, 200); - var linearScalingEvaluatedData4 = evaluated(2, 400, 100); - var linearScalingEvaluatedData5 = evaluated(8, 800, 400); + var linearScalingEvaluatedData1 = evaluated(4, 100, 200); + var linearScalingEvaluatedData2 = evaluated(2, 400, 100); + var linearScalingEvaluatedData3 = evaluated(8, 800, 400); - linearScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary(1, 2, linearScalingEvaluatedData1)); - linearScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary(2, 4, linearScalingEvaluatedData2)); linearScalingHistory.put( currentTime.minusSeconds(20), - new ScalingSummary(4, 2, linearScalingEvaluatedData3)); + new ScalingSummary(4, 2, linearScalingEvaluatedData1)); linearScalingHistory.put( currentTime.minusSeconds(10), - new ScalingSummary(2, 8, linearScalingEvaluatedData4)); + new ScalingSummary(2, 8, linearScalingEvaluatedData2)); linearScalingHistory.put( - currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData5)); + currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3)); double linearScalingScalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, 5); + JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, conf); assertEquals(1.0, linearScalingScalingCoefficient); var slightDiminishingReturnsScalingHistory = new TreeMap(); - var slightDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); - var slightDiminishingReturnsEvaluatedData2 = evaluated(2, 196, 98); - var slightDiminishingReturnsEvaluatedData3 = evaluated(4, 98, 196); - var slightDiminishingReturnsEvaluatedData4 = evaluated(2, 396, 99); - var slightDiminishingReturnsEvaluatedData5 = evaluated(8, 780, 390); + var slightDiminishingReturnsEvaluatedData1 = evaluated(4, 98, 196); + var slightDiminishingReturnsEvaluatedData2 = evaluated(2, 396, 99); + var slightDiminishingReturnsEvaluatedData3 = evaluated(8, 780, 390); - slightDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary(1, 2, slightDiminishingReturnsEvaluatedData1)); - slightDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary(2, 4, slightDiminishingReturnsEvaluatedData2)); slightDiminishingReturnsScalingHistory.put( currentTime.minusSeconds(20), - new ScalingSummary(4, 2, slightDiminishingReturnsEvaluatedData3)); + new ScalingSummary(4, 2, slightDiminishingReturnsEvaluatedData1)); slightDiminishingReturnsScalingHistory.put( currentTime.minusSeconds(10), - new ScalingSummary(2, 8, slightDiminishingReturnsEvaluatedData4)); + new ScalingSummary(2, 8, slightDiminishingReturnsEvaluatedData2)); slightDiminishingReturnsScalingHistory.put( - currentTime, new ScalingSummary(8, 16, slightDiminishingReturnsEvaluatedData5)); + currentTime, new ScalingSummary(8, 16, slightDiminishingReturnsEvaluatedData3)); double slightDiminishingReturnsScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - slightDiminishingReturnsScalingHistory, 5); + slightDiminishingReturnsScalingHistory, conf); assertTrue( slightDiminishingReturnsScalingCoefficient > 0.9 && slightDiminishingReturnsScalingCoefficient < 1); var sharpDiminishingReturnsScalingHistory = new TreeMap(); - var sharpDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); - var sharpDiminishingReturnsEvaluatedData2 = evaluated(2, 192, 96); - var sharpDiminishingReturnsEvaluatedData3 = evaluated(4, 80, 160); - var sharpDiminishingReturnsEvaluatedData4 = evaluated(2, 384, 96); - var sharpDiminishingReturnsEvaluatedData5 = evaluated(8, 480, 240); + var sharpDiminishingReturnsEvaluatedData1 = evaluated(4, 80, 160); + var sharpDiminishingReturnsEvaluatedData2 = evaluated(2, 384, 96); + var sharpDiminishingReturnsEvaluatedData3 = evaluated(8, 480, 240); - sharpDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary(1, 2, sharpDiminishingReturnsEvaluatedData1)); - sharpDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary(2, 4, sharpDiminishingReturnsEvaluatedData2)); sharpDiminishingReturnsScalingHistory.put( currentTime.minusSeconds(20), - new ScalingSummary(4, 2, sharpDiminishingReturnsEvaluatedData3)); + new ScalingSummary(4, 2, sharpDiminishingReturnsEvaluatedData1)); sharpDiminishingReturnsScalingHistory.put( currentTime.minusSeconds(10), - new ScalingSummary(2, 8, sharpDiminishingReturnsEvaluatedData4)); + new ScalingSummary(2, 8, sharpDiminishingReturnsEvaluatedData2)); sharpDiminishingReturnsScalingHistory.put( - currentTime, new ScalingSummary(8, 16, sharpDiminishingReturnsEvaluatedData5)); + currentTime, new ScalingSummary(8, 16, sharpDiminishingReturnsEvaluatedData3)); double sharpDiminishingReturnsScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - sharpDiminishingReturnsScalingHistory, 5); + sharpDiminishingReturnsScalingHistory, conf); assertTrue( sharpDiminishingReturnsScalingCoefficient < 0.9 && sharpDiminishingReturnsScalingCoefficient > 0.4); - var sharpDiminishingReturnsWithoutOneParallelismScalingHistory = + var sharpDiminishingReturnsWithOneParallelismScalingHistory = new TreeMap(); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1 = evaluated(4, 80, 160); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2 = evaluated(8, 480, 240); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3 = evaluated(16, 140, 280); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4 = evaluated(8, 480, 240); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5 = evaluated(16, 560, 280); - - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary( - 4, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary( - 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + var sharpDiminishingReturnsWithOneParallelismEvaluatedData1 = evaluated(1, 100, 50); + var sharpDiminishingReturnsWithOneParallelismEvaluatedData2 = evaluated(2, 160, 80); + var sharpDiminishingReturnsWithOneParallelismEvaluatedData3 = evaluated(4, 200, 100); + + sharpDiminishingReturnsWithOneParallelismScalingHistory.put( currentTime.minusSeconds(20), - new ScalingSummary( - 16, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + new ScalingSummary(1, 2, sharpDiminishingReturnsWithOneParallelismEvaluatedData1)); + sharpDiminishingReturnsWithOneParallelismScalingHistory.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 4, sharpDiminishingReturnsWithOneParallelismEvaluatedData2)); + sharpDiminishingReturnsWithOneParallelismScalingHistory.put( + currentTime, + new ScalingSummary(4, 8, sharpDiminishingReturnsWithOneParallelismEvaluatedData3)); + + double sharpDiminishingReturnsWithOneParallelismScalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + sharpDiminishingReturnsWithOneParallelismScalingHistory, conf); + + assertTrue( + sharpDiminishingReturnsWithOneParallelismScalingCoefficient < 0.9 + && sharpDiminishingReturnsWithOneParallelismScalingCoefficient > 0.4); + + conf.set(OBSERVED_SCALABILITY_MIN_OBSERVATIONS, 1); + + var withOneScalingHistoryRecord = new TreeMap(); + + var withOneScalingHistoryRecordEvaluatedData1 = evaluated(4, 200, 100); + + withOneScalingHistoryRecord.put( + currentTime, new ScalingSummary(4, 8, withOneScalingHistoryRecordEvaluatedData1)); + + double withOneScalingHistoryRecordScalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + withOneScalingHistoryRecord, conf); + + assertEquals(1, withOneScalingHistoryRecordScalingCoefficient); + + var diminishingReturnWithTwoScalingHistoryRecord = new TreeMap(); + + var diminishingReturnWithTwoScalingHistoryRecordEvaluatedData1 = evaluated(2, 160, 80); + var diminishingReturnWithTwoScalingHistoryRecordEvaluatedData2 = evaluated(4, 200, 100); + + diminishingReturnWithTwoScalingHistoryRecord.put( currentTime.minusSeconds(10), new ScalingSummary( - 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( + 2, 4, diminishingReturnWithTwoScalingHistoryRecordEvaluatedData1)); + diminishingReturnWithTwoScalingHistoryRecord.put( currentTime, new ScalingSummary( - 16, 32, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5)); + 4, 8, diminishingReturnWithTwoScalingHistoryRecordEvaluatedData2)); - double sharpDiminishingReturnsWithoutOneParallelismScalingCoefficient = + double diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - sharpDiminishingReturnsWithoutOneParallelismScalingHistory, 5); + diminishingReturnWithTwoScalingHistoryRecord, conf); assertTrue( - sharpDiminishingReturnsWithoutOneParallelismScalingCoefficient < 0.9 - && sharpDiminishingReturnsWithoutOneParallelismScalingCoefficient > 0.4); + diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient < 0.9 + && diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient > 0.4); + + var linearReturnWithTwoScalingHistoryRecord = new TreeMap(); + + var linearReturnWithTwoScalingHistoryRecordEvaluatedData1 = evaluated(2, 160, 80); + var linearReturnWithTwoScalingHistoryRecordEvaluatedData2 = evaluated(4, 320, 160); + + linearReturnWithTwoScalingHistoryRecord.put( + currentTime.minusSeconds(10), + new ScalingSummary(2, 4, linearReturnWithTwoScalingHistoryRecordEvaluatedData1)); + linearReturnWithTwoScalingHistoryRecord.put( + currentTime, + new ScalingSummary(4, 8, linearReturnWithTwoScalingHistoryRecordEvaluatedData2)); + + double linearReturnWithTwoScalingHistoryRecordScalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + linearReturnWithTwoScalingHistoryRecord, conf); + + assertEquals(1, linearReturnWithTwoScalingHistoryRecordScalingCoefficient); } @ParameterizedTest @@ -1299,26 +1319,18 @@ public void testParallelismScalingWithObservedScalingCoefficient( conf.set(OBSERVED_SCALABILITY_ENABLED, true); var linearScalingHistory = new TreeMap(); - var linearScalingEvaluatedData1 = evaluated(1, 100, 50); - var linearScalingEvaluatedData2 = evaluated(2, 200, 100); - var linearScalingEvaluatedData3 = evaluated(4, 100, 200); - var linearScalingEvaluatedData4 = evaluated(2, 400, 100); - var linearScalingEvaluatedData5 = evaluated(8, 800, 400); + var linearScalingEvaluatedData1 = evaluated(4, 100, 200); + var linearScalingEvaluatedData2 = evaluated(2, 400, 100); + var linearScalingEvaluatedData3 = evaluated(8, 800, 400); - linearScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary(1, 2, linearScalingEvaluatedData1)); - linearScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary(2, 4, linearScalingEvaluatedData2)); linearScalingHistory.put( currentTime.minusSeconds(20), - new ScalingSummary(4, 2, linearScalingEvaluatedData3)); + new ScalingSummary(4, 2, linearScalingEvaluatedData1)); linearScalingHistory.put( currentTime.minusSeconds(10), - new ScalingSummary(2, 8, linearScalingEvaluatedData4)); + new ScalingSummary(2, 8, linearScalingEvaluatedData2)); linearScalingHistory.put( - currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData5)); + currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3)); assertEquals( ParallelismChange.build(10, true), @@ -1331,109 +1343,28 @@ public void testParallelismScalingWithObservedScalingCoefficient( restartTime, delayedScaleDown)); - var slightDiminishingReturnsScalingHistory = new TreeMap(); - var slightDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); - var slightDiminishingReturnsEvaluatedData2 = evaluated(2, 196, 98); - var slightDiminishingReturnsEvaluatedData3 = evaluated(4, 98, 196); - var slightDiminishingReturnsEvaluatedData4 = evaluated(2, 396, 99); - var slightDiminishingReturnsEvaluatedData5 = evaluated(8, 780, 390); - - slightDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary(1, 2, slightDiminishingReturnsEvaluatedData1)); - slightDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary(2, 4, slightDiminishingReturnsEvaluatedData2)); - slightDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(20), - new ScalingSummary(4, 2, slightDiminishingReturnsEvaluatedData3)); - slightDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(10), - new ScalingSummary(2, 8, slightDiminishingReturnsEvaluatedData4)); - slightDiminishingReturnsScalingHistory.put( - currentTime, new ScalingSummary(8, 16, slightDiminishingReturnsEvaluatedData5)); - - assertEquals( - ParallelismChange.build(12, true), - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(2, 100, 40), - slightDiminishingReturnsScalingHistory, - restartTime, - delayedScaleDown)); - - var sharpDiminishingReturnsScalingHistory = new TreeMap(); - var sharpDiminishingReturnsEvaluatedData1 = evaluated(1, 100, 50); - var sharpDiminishingReturnsEvaluatedData2 = evaluated(2, 192, 96); - var sharpDiminishingReturnsEvaluatedData3 = evaluated(4, 80, 160); - var sharpDiminishingReturnsEvaluatedData4 = evaluated(2, 384, 96); - var sharpDiminishingReturnsEvaluatedData5 = evaluated(8, 480, 240); + var diminishingReturnsScalingHistory = new TreeMap(); + var diminishingReturnsEvaluatedData1 = evaluated(4, 80, 160); + var diminishingReturnsEvaluatedData2 = evaluated(2, 384, 96); + var diminishingReturnsEvaluatedData3 = evaluated(8, 480, 240); - sharpDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary(1, 2, sharpDiminishingReturnsEvaluatedData1)); - sharpDiminishingReturnsScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary(2, 4, sharpDiminishingReturnsEvaluatedData2)); - sharpDiminishingReturnsScalingHistory.put( + diminishingReturnsScalingHistory.put( currentTime.minusSeconds(20), - new ScalingSummary(4, 2, sharpDiminishingReturnsEvaluatedData3)); - sharpDiminishingReturnsScalingHistory.put( + new ScalingSummary(4, 2, diminishingReturnsEvaluatedData1)); + diminishingReturnsScalingHistory.put( currentTime.minusSeconds(10), - new ScalingSummary(2, 8, sharpDiminishingReturnsEvaluatedData4)); - sharpDiminishingReturnsScalingHistory.put( - currentTime, new ScalingSummary(8, 16, sharpDiminishingReturnsEvaluatedData5)); + new ScalingSummary(2, 8, diminishingReturnsEvaluatedData2)); + diminishingReturnsScalingHistory.put( + currentTime, new ScalingSummary(8, 16, diminishingReturnsEvaluatedData3)); assertEquals( - ParallelismChange.build(18, true), - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(2, 100, 40), - sharpDiminishingReturnsScalingHistory, - restartTime, - delayedScaleDown)); - - var sharpDiminishingReturnsWithoutOneParallelismScalingHistory = - new TreeMap(); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1 = evaluated(4, 80, 160); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2 = evaluated(8, 480, 240); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3 = evaluated(16, 140, 280); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4 = evaluated(8, 480, 240); - var sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5 = evaluated(16, 560, 280); - - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( - currentTime.minusSeconds(40), - new ScalingSummary( - 4, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData1)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( - currentTime.minusSeconds(30), - new ScalingSummary( - 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData2)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( - currentTime.minusSeconds(20), - new ScalingSummary( - 16, 8, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData3)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( - currentTime.minusSeconds(10), - new ScalingSummary( - 8, 16, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData4)); - sharpDiminishingReturnsWithoutOneParallelismScalingHistory.put( - currentTime, - new ScalingSummary( - 16, 32, sharpDiminishingReturnsWithoutOneParallelismEvaluatedData5)); - - assertEquals( - ParallelismChange.build(24, true), + ParallelismChange.build(15, true), vertexScaler.computeScaleTargetParallelism( context, op, inputShipStrategies, evaluated(2, 100, 40), - sharpDiminishingReturnsWithoutOneParallelismScalingHistory, + diminishingReturnsScalingHistory, restartTime, delayedScaleDown)); } From 638538968a5bf5b8b9b456954cc17462495a16c0 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Wed, 30 Apr 2025 15:58:03 +0530 Subject: [PATCH 3/5] 1. Clamped lowerBound for scaling coefficient to 0.5 --- .../flink/autoscaler/JobVertexScaler.java | 25 ++++++------------- .../flink/autoscaler/JobVertexScalerTest.java | 16 ++++++------ 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 2d8c9b8c9b..b09af50836 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -187,7 +187,9 @@ public ParallelismChange computeScaleTargetParallelism( if (conf.get(OBSERVED_SCALABILITY_ENABLED)) { double scalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient(history, conf); + JobVertexScaler.calculateObservedScalingCoefficient( + history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS)); + scaleFactor = scaleFactor / scalingCoefficient; } double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR); @@ -256,12 +258,14 @@ public ParallelismChange computeScaleTargetParallelism( * 1.0} is returned, assuming linear scaling. * * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} - * @param conf Deployment configuration. + * @param minObservations The minimum number of observations required to compute the scaling + * coefficient. If the number of historical entries is less than this threshold, a default + * coefficient of {@code 1.0} is returned. * @return The computed scaling coefficient. */ @VisibleForTesting protected static double calculateObservedScalingCoefficient( - SortedMap history, Configuration conf) { + SortedMap history, int minObservations) { /* * The scaling coefficient is computed using the least squares approach * to fit a linear model: @@ -283,11 +287,8 @@ protected static double calculateObservedScalingCoefficient( * α = ∑ (P_i * R_i) / (∑ (P_i^2) * β) * * We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0. - * If the computed coefficient falls below threshold, the system falls back to assuming linear scaling (α = 1.0). */ - var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS); - // not enough data to compute scaling coefficient; we assume linear scaling. if (history.isEmpty() || history.size() < minObservations) { return 1.0; @@ -319,17 +320,7 @@ protected static double calculateObservedScalingCoefficient( var coefficient = AutoScalerUtils.optimizeLinearScalingCoefficient( - parallelismList, processingRateList, baselineProcessingRate, 1, 0.01); - - double threshold = - conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED) - ? conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD) - : 0.5; - - if (coefficient < threshold) { - LOG.warn("Scaling coefficient is below threshold. Falling back to linear scaling."); - return 1.0; - } + parallelismList, processingRateList, baselineProcessingRate, 1, 0.5); return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue(); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index c475095936..12454c625c 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -52,7 +52,6 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS; import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; - import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -1179,7 +1178,7 @@ public void testCalculateScalingCoefficient() { currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3)); double linearScalingScalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, conf); + JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, 3); assertEquals(1.0, linearScalingScalingCoefficient); @@ -1199,7 +1198,7 @@ public void testCalculateScalingCoefficient() { double slightDiminishingReturnsScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - slightDiminishingReturnsScalingHistory, conf); + slightDiminishingReturnsScalingHistory, 3); assertTrue( slightDiminishingReturnsScalingCoefficient > 0.9 @@ -1221,7 +1220,7 @@ public void testCalculateScalingCoefficient() { double sharpDiminishingReturnsScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - sharpDiminishingReturnsScalingHistory, conf); + sharpDiminishingReturnsScalingHistory, 3); assertTrue( sharpDiminishingReturnsScalingCoefficient < 0.9 @@ -1245,7 +1244,7 @@ public void testCalculateScalingCoefficient() { double sharpDiminishingReturnsWithOneParallelismScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - sharpDiminishingReturnsWithOneParallelismScalingHistory, conf); + sharpDiminishingReturnsWithOneParallelismScalingHistory, 3); assertTrue( sharpDiminishingReturnsWithOneParallelismScalingCoefficient < 0.9 @@ -1261,8 +1260,7 @@ public void testCalculateScalingCoefficient() { currentTime, new ScalingSummary(4, 8, withOneScalingHistoryRecordEvaluatedData1)); double withOneScalingHistoryRecordScalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient( - withOneScalingHistoryRecord, conf); + JobVertexScaler.calculateObservedScalingCoefficient(withOneScalingHistoryRecord, 1); assertEquals(1, withOneScalingHistoryRecordScalingCoefficient); @@ -1282,7 +1280,7 @@ public void testCalculateScalingCoefficient() { double diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - diminishingReturnWithTwoScalingHistoryRecord, conf); + diminishingReturnWithTwoScalingHistoryRecord, 1); assertTrue( diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient < 0.9 @@ -1302,7 +1300,7 @@ public void testCalculateScalingCoefficient() { double linearReturnWithTwoScalingHistoryRecordScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - linearReturnWithTwoScalingHistoryRecord, conf); + linearReturnWithTwoScalingHistoryRecord, 1); assertEquals(1, linearReturnWithTwoScalingHistoryRecordScalingCoefficient); } From 3adc6b30311c5eb807a1fad01bc643773500b17a Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Thu, 1 May 2025 11:58:50 +0530 Subject: [PATCH 4/5] 1. Adding config for min value of Scaling coefficient. 2. Updated validator to validate the min scaling coefficient config. --- .../apache/flink/autoscaler/JobVertexScaler.java | 15 ++++++++------- .../autoscaler/config/AutoScalerOptions.java | 15 ++++++++++++++- .../flink/autoscaler/JobVertexScalerTest.java | 15 ++++++++------- .../operator/validation/DefaultValidator.java | 2 ++ .../operator/validation/DefaultValidatorTest.java | 15 +++++++++++++++ 5 files changed, 47 insertions(+), 15 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index b09af50836..492615f402 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -187,8 +187,7 @@ public ParallelismChange computeScaleTargetParallelism( if (conf.get(OBSERVED_SCALABILITY_ENABLED)) { double scalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient( - history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS)); + JobVertexScaler.calculateObservedScalingCoefficient(history, conf); scaleFactor = scaleFactor / scalingCoefficient; } @@ -258,14 +257,12 @@ public ParallelismChange computeScaleTargetParallelism( * 1.0} is returned, assuming linear scaling. * * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} - * @param minObservations The minimum number of observations required to compute the scaling - * coefficient. If the number of historical entries is less than this threshold, a default - * coefficient of {@code 1.0} is returned. + * @param conf Deployment configuration. * @return The computed scaling coefficient. */ @VisibleForTesting protected static double calculateObservedScalingCoefficient( - SortedMap history, int minObservations) { + SortedMap history, Configuration conf) { /* * The scaling coefficient is computed using the least squares approach * to fit a linear model: @@ -289,6 +286,8 @@ protected static double calculateObservedScalingCoefficient( * We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0. */ + var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS); + // not enough data to compute scaling coefficient; we assume linear scaling. if (history.isEmpty() || history.size() < minObservations) { return 1.0; @@ -318,9 +317,11 @@ protected static double calculateObservedScalingCoefficient( processingRateList.add(processingRate); } + double lowerBound = conf.get(AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN); + var coefficient = AutoScalerUtils.optimizeLinearScalingCoefficient( - parallelismList, processingRateList, baselineProcessingRate, 1, 0.5); + parallelismList, processingRateList, baselineProcessingRate, 1, lowerBound); return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue(); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index c62616fc0f..a67bfd505c 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -389,7 +389,10 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .defaultValue(false) .withFallbackKeys(oldOperatorConfigKey("observed-scalability.enabled")) .withDescription( - "Enables the use of an observed scalability coefficient when computing target parallelism. If enabled, the system will estimate the scalability coefficient based on historical scaling data instead of assuming perfect linear scaling. This helps account for real-world inefficiencies such as network overhead and coordination costs."); + "Enables the use of an observed scalability coefficient when computing target parallelism. " + + "If enabled, the system will estimate the scalability coefficient based on historical scaling data " + + "instead of assuming perfect linear scaling. " + + "This helps account for real-world inefficiencies such as network overhead and coordination costs."); public static final ConfigOption OBSERVED_SCALABILITY_MIN_OBSERVATIONS = autoScalerConfig("observed-scalability.min-observations") @@ -405,4 +408,14 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { + VERTEX_SCALING_HISTORY_COUNT.key() + " to a very high value, as the number of retained data points is limited by the size of the state store—" + "particularly when using Kubernetes-based state store."); + + public static final ConfigOption OBSERVED_SCALABILITY_COEFFICIENT_MIN = + autoScalerConfig("observed-scalability.coefficient-min") + .doubleType() + .defaultValue(0.5) + .withFallbackKeys(oldOperatorConfigKey("observed-scalability.coefficient-min")) + .withDescription( + "Minimum allowed value for the observed scalability coefficient. " + + "Prevents aggressive scaling by clamping low coefficient estimates. " + + "If the estimated coefficient falls below this value, it is capped at the configured minimum."); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 12454c625c..3d085e1718 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -1178,7 +1178,7 @@ public void testCalculateScalingCoefficient() { currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3)); double linearScalingScalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, 3); + JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, conf); assertEquals(1.0, linearScalingScalingCoefficient); @@ -1198,7 +1198,7 @@ public void testCalculateScalingCoefficient() { double slightDiminishingReturnsScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - slightDiminishingReturnsScalingHistory, 3); + slightDiminishingReturnsScalingHistory, conf); assertTrue( slightDiminishingReturnsScalingCoefficient > 0.9 @@ -1220,7 +1220,7 @@ public void testCalculateScalingCoefficient() { double sharpDiminishingReturnsScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - sharpDiminishingReturnsScalingHistory, 3); + sharpDiminishingReturnsScalingHistory, conf); assertTrue( sharpDiminishingReturnsScalingCoefficient < 0.9 @@ -1244,7 +1244,7 @@ public void testCalculateScalingCoefficient() { double sharpDiminishingReturnsWithOneParallelismScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - sharpDiminishingReturnsWithOneParallelismScalingHistory, 3); + sharpDiminishingReturnsWithOneParallelismScalingHistory, conf); assertTrue( sharpDiminishingReturnsWithOneParallelismScalingCoefficient < 0.9 @@ -1260,7 +1260,8 @@ public void testCalculateScalingCoefficient() { currentTime, new ScalingSummary(4, 8, withOneScalingHistoryRecordEvaluatedData1)); double withOneScalingHistoryRecordScalingCoefficient = - JobVertexScaler.calculateObservedScalingCoefficient(withOneScalingHistoryRecord, 1); + JobVertexScaler.calculateObservedScalingCoefficient( + withOneScalingHistoryRecord, conf); assertEquals(1, withOneScalingHistoryRecordScalingCoefficient); @@ -1280,7 +1281,7 @@ public void testCalculateScalingCoefficient() { double diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - diminishingReturnWithTwoScalingHistoryRecord, 1); + diminishingReturnWithTwoScalingHistoryRecord, conf); assertTrue( diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient < 0.9 @@ -1300,7 +1301,7 @@ public void testCalculateScalingCoefficient() { double linearReturnWithTwoScalingHistoryRecordScalingCoefficient = JobVertexScaler.calculateObservedScalingCoefficient( - linearReturnWithTwoScalingHistoryRecord, 1); + linearReturnWithTwoScalingHistoryRecord, conf); assertEquals(1, linearReturnWithTwoScalingHistoryRecordScalingCoefficient); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index e2b5db8564..42fd56e76c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -65,6 +65,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN; import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX; import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN; import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; @@ -622,6 +623,7 @@ public static Optional validateAutoScalerFlinkConfiguration( UTILIZATION_MIN, 0.0d, flinkConfiguration.get(UTILIZATION_TARGET)), + validateNumber(flinkConfiguration, OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d), CalendarUtils.validateExcludedPeriods(flinkConfiguration)); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java index 8caa24eb28..08388b79ea 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java @@ -842,6 +842,21 @@ public void testAutoScalerDeploymentWithInvalidExcludedPeriods() { assertTrue(result.isPresent()); } + @Test + public void testAutoScalerDeploymentWithInvalidScalingCoefficientMin() { + var result = + testAutoScalerConfiguration( + flinkConf -> + flinkConf.put( + AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN + .key(), + "1.2")); + assertErrorContains( + result, + getFormattedErrorMessage( + AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d)); + } + @Test public void testNonEnabledAutoScalerDeploymentJob() { var result = From dcd815b5ab493d83bedab98b7863dae508a5e576 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Mon, 5 May 2025 14:11:40 +0530 Subject: [PATCH 5/5] Updating auto scaler configuration doc --- .../generated/auto_scaler_configuration.html | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index ab2bbcb283..3c12ee1087 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -98,6 +98,24 @@ Duration Scaling metrics aggregation window size. + +

job.autoscaler.observed-scalability.coefficient-min
+ 0.5 + Double + Minimum allowed value for the observed scalability coefficient. Prevents aggressive scaling by clamping low coefficient estimates. If the estimated coefficient falls below this value, it is capped at the configured minimum. + + +
job.autoscaler.observed-scalability.enabled
+ false + Boolean + Enables the use of an observed scalability coefficient when computing target parallelism. If enabled, the system will estimate the scalability coefficient based on historical scaling data instead of assuming perfect linear scaling. This helps account for real-world inefficiencies such as network overhead and coordination costs. + + +
job.autoscaler.observed-scalability.min-observations
+ 3 + Integer + Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. If the number of available observations is below this threshold, the system falls back to assuming linear scaling. Note: To effectively use a higher minimum observation count, you need to increase job.autoscaler.history.max.count. Avoid setting job.autoscaler.history.max.count to a very high value, as the number of retained data points is limited by the size of the state store—particularly when using Kubernetes-based state store. +
job.autoscaler.observed-true-processing-rate.lag-threshold
30 s