diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index ab2bbcb283..3c12ee1087 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -98,6 +98,24 @@
job.autoscaler.observed-true-processing-rate.lag-threshold |
30 s |
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 4c185f89ea..492615f402 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -34,11 +34,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
@@ -46,6 +50,8 @@
import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
@@ -178,6 +184,13 @@ public ParallelismChange computeScaleTargetParallelism(
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
+
+ double scalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(history, conf);
+
+ scaleFactor = scaleFactor / scalingCoefficient;
+ }
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
if (scaleFactor < minScaleFactor) {
@@ -236,6 +249,83 @@ public ParallelismChange computeScaleTargetParallelism(
delayedScaleDown);
}
+ /**
+ * Calculates the scaling coefficient based on historical scaling data.
+ *
+ * The scaling coefficient is computed using the least squares approach. If there are not
+ * enough observations, or if the computed coefficient is invalid, a default value of {@code
+ * 1.0} is returned, assuming linear scaling.
+ *
+ * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary}
+ * @param conf Deployment configuration.
+ * @return The computed scaling coefficient.
+ */
+ @VisibleForTesting
+ protected static double calculateObservedScalingCoefficient(
+ SortedMap history, Configuration conf) {
+ /*
+ * The scaling coefficient is computed using the least squares approach
+ * to fit a linear model:
+ *
+ * R_i = β * P_i * α
+ *
+ * where:
+ * - R_i = observed processing rate
+ * - P_i = parallelism
+ * - β = baseline processing rate
+ * - α = scaling coefficient to optimize
+ *
+ * The optimization minimizes the **sum of squared errors**:
+ *
+ * Loss = ∑ (R_i - β * α * P_i)^2
+ *
+ * Differentiating w.r.t. α and solving for α:
+ *
+ * α = ∑ (P_i * R_i) / (∑ (P_i^2) * β)
+ *
+ * We keep the system conservative for higher returns scenario by clamping computed α to an upper bound of 1.0.
+ */
+
+ var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS);
+
+ // not enough data to compute scaling coefficient; we assume linear scaling.
+ if (history.isEmpty() || history.size() < minObservations) {
+ return 1.0;
+ }
+
+ var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history);
+
+ if (Double.isNaN(baselineProcessingRate)) {
+ return 1.0;
+ }
+
+ List parallelismList = new ArrayList<>();
+ List processingRateList = new ArrayList<>();
+
+ for (Map.Entry entry : history.entrySet()) {
+ ScalingSummary summary = entry.getValue();
+ double parallelism = summary.getCurrentParallelism();
+ double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+
+ if (Double.isNaN(processingRate)) {
+ LOG.warn(
+ "True processing rate is not available in scaling history. Cannot compute scaling coefficient.");
+ return 1.0;
+ }
+
+ parallelismList.add(parallelism);
+ processingRateList.add(processingRate);
+ }
+
+ double lowerBound = conf.get(AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN);
+
+ var coefficient =
+ AutoScalerUtils.optimizeLinearScalingCoefficient(
+ parallelismList, processingRateList, baselineProcessingRate, 1, lowerBound);
+
+ return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue();
+ }
+
private ParallelismChange detectBlockScaling(
Context context,
JobVertexID vertex,
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 980db2f4cc..a67bfd505c 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -382,4 +382,40 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
"scaling.key-group.partitions.adjust.mode"))
.withDescription(
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
+
+ public static final ConfigOption OBSERVED_SCALABILITY_ENABLED =
+ autoScalerConfig("observed-scalability.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withFallbackKeys(oldOperatorConfigKey("observed-scalability.enabled"))
+ .withDescription(
+ "Enables the use of an observed scalability coefficient when computing target parallelism. "
+ + "If enabled, the system will estimate the scalability coefficient based on historical scaling data "
+ + "instead of assuming perfect linear scaling. "
+ + "This helps account for real-world inefficiencies such as network overhead and coordination costs.");
+
+ public static final ConfigOption OBSERVED_SCALABILITY_MIN_OBSERVATIONS =
+ autoScalerConfig("observed-scalability.min-observations")
+ .intType()
+ .defaultValue(3)
+ .withFallbackKeys(oldOperatorConfigKey("observed-scalability.min-observations"))
+ .withDescription(
+ "Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. "
+ + "If the number of available observations is below this threshold, the system falls back to assuming linear scaling. "
+ + "Note: To effectively use a higher minimum observation count, you need to increase "
+ + VERTEX_SCALING_HISTORY_COUNT.key()
+ + ". Avoid setting "
+ + VERTEX_SCALING_HISTORY_COUNT.key()
+ + " to a very high value, as the number of retained data points is limited by the size of the state store—"
+ + "particularly when using Kubernetes-based state store.");
+
+ public static final ConfigOption OBSERVED_SCALABILITY_COEFFICIENT_MIN =
+ autoScalerConfig("observed-scalability.coefficient-min")
+ .doubleType()
+ .defaultValue(0.5)
+ .withFallbackKeys(oldOperatorConfigKey("observed-scalability.coefficient-min"))
+ .withDescription(
+ "Minimum allowed value for the observed scalability coefficient. "
+ + "Prevents aggressive scaling by clamping low coefficient estimates. "
+ + "If the estimated coefficient falls below this value, it is capped at the configured minimum.");
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
index 411ab9b20d..837d429b42 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
@@ -17,6 +17,7 @@
package org.apache.flink.autoscaler.utils;
+import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -24,15 +25,19 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Set;
+import java.util.SortedMap;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** AutoScaler utilities. */
public class AutoScalerUtils {
@@ -94,4 +99,89 @@ public static boolean excludeVerticesFromScaling(
conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new ArrayList<>(excludedIds));
return anyAdded;
}
+
+ /**
+ * Computes the optimized linear scaling coefficient (α) by minimizing the least squares error.
+ *
+ * This method estimates the scaling coefficient in a linear scaling model by fitting
+ * observed processing rates and parallelism levels.
+ *
+ *
The computed coefficient is clamped within the specified lower and upper bounds to ensure
+ * stability and prevent extreme scaling adjustments.
+ *
+ * @param parallelismLevels List of parallelism levels.
+ * @param processingRates List of observed processing rates.
+ * @param baselineProcessingRate Baseline processing rate.
+ * @param upperBound Maximum allowable value for the scaling coefficient.
+ * @param lowerBound Minimum allowable value for the scaling coefficient.
+ * @return The optimized scaling coefficient (α), constrained within {@code [lowerBound,
+ * upperBound]}.
+ */
+ public static double optimizeLinearScalingCoefficient(
+ List parallelismLevels,
+ List processingRates,
+ double baselineProcessingRate,
+ double upperBound,
+ double lowerBound) {
+
+ double sum = 0.0;
+ double squaredSum = 0.0;
+
+ for (int i = 0; i < parallelismLevels.size(); i++) {
+ double parallelism = parallelismLevels.get(i);
+ double processingRate = processingRates.get(i);
+
+ sum += parallelism * processingRate;
+ squaredSum += parallelism * parallelism;
+ }
+
+ if (squaredSum == 0.0) {
+ return 1.0; // Fallback to linear scaling if denominator is zero
+ }
+
+ double alpha = sum / (squaredSum * baselineProcessingRate);
+
+ return Math.max(lowerBound, Math.min(upperBound, alpha));
+ }
+
+ /**
+ * Computes the baseline processing rate from historical scaling data.
+ *
+ * The baseline processing rate represents the **processing rate per unit of parallelism**.
+ * It is determined using the smallest observed parallelism in the history.
+ *
+ * @param history A {@code SortedMap} where keys are timestamps ({@code Instant}), and values
+ * are {@code ScalingSummary} objects.
+ * @return The computed baseline processing rate (processing rate per unit of parallelism).
+ */
+ public static double computeBaselineProcessingRate(SortedMap history) {
+ ScalingSummary latestSmallestParallelismSummary = null;
+
+ for (Map.Entry entry :
+ ((NavigableMap) history).descendingMap().entrySet()) {
+ ScalingSummary summary = entry.getValue();
+ double parallelism = summary.getCurrentParallelism();
+
+ if (parallelism == 1) {
+ return summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+ }
+
+ if (latestSmallestParallelismSummary == null
+ || parallelism < latestSmallestParallelismSummary.getCurrentParallelism()) {
+ latestSmallestParallelismSummary = entry.getValue();
+ }
+ }
+
+ if (latestSmallestParallelismSummary == null) {
+ return Double.NaN;
+ }
+
+ double parallelism = latestSmallestParallelismSummary.getCurrentParallelism();
+ double processingRate =
+ latestSmallestParallelismSummary
+ .getMetrics()
+ .get(TRUE_PROCESSING_RATE)
+ .getAverage();
+ return processingRate / parallelism;
+ }
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 9cdc71596b..3d085e1718 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -49,6 +49,8 @@
import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -1156,4 +1158,213 @@ private Map evaluated(
ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf, false, restartTime);
return metrics;
}
+
+ @Test
+ public void testCalculateScalingCoefficient() {
+ var currentTime = Instant.now();
+
+ var linearScalingHistory = new TreeMap();
+ var linearScalingEvaluatedData1 = evaluated(4, 100, 200);
+ var linearScalingEvaluatedData2 = evaluated(2, 400, 100);
+ var linearScalingEvaluatedData3 = evaluated(8, 800, 400);
+
+ linearScalingHistory.put(
+ currentTime.minusSeconds(20),
+ new ScalingSummary(4, 2, linearScalingEvaluatedData1));
+ linearScalingHistory.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(2, 8, linearScalingEvaluatedData2));
+ linearScalingHistory.put(
+ currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3));
+
+ double linearScalingScalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, conf);
+
+ assertEquals(1.0, linearScalingScalingCoefficient);
+
+ var slightDiminishingReturnsScalingHistory = new TreeMap();
+ var slightDiminishingReturnsEvaluatedData1 = evaluated(4, 98, 196);
+ var slightDiminishingReturnsEvaluatedData2 = evaluated(2, 396, 99);
+ var slightDiminishingReturnsEvaluatedData3 = evaluated(8, 780, 390);
+
+ slightDiminishingReturnsScalingHistory.put(
+ currentTime.minusSeconds(20),
+ new ScalingSummary(4, 2, slightDiminishingReturnsEvaluatedData1));
+ slightDiminishingReturnsScalingHistory.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(2, 8, slightDiminishingReturnsEvaluatedData2));
+ slightDiminishingReturnsScalingHistory.put(
+ currentTime, new ScalingSummary(8, 16, slightDiminishingReturnsEvaluatedData3));
+
+ double slightDiminishingReturnsScalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(
+ slightDiminishingReturnsScalingHistory, conf);
+
+ assertTrue(
+ slightDiminishingReturnsScalingCoefficient > 0.9
+ && slightDiminishingReturnsScalingCoefficient < 1);
+
+ var sharpDiminishingReturnsScalingHistory = new TreeMap();
+ var sharpDiminishingReturnsEvaluatedData1 = evaluated(4, 80, 160);
+ var sharpDiminishingReturnsEvaluatedData2 = evaluated(2, 384, 96);
+ var sharpDiminishingReturnsEvaluatedData3 = evaluated(8, 480, 240);
+
+ sharpDiminishingReturnsScalingHistory.put(
+ currentTime.minusSeconds(20),
+ new ScalingSummary(4, 2, sharpDiminishingReturnsEvaluatedData1));
+ sharpDiminishingReturnsScalingHistory.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(2, 8, sharpDiminishingReturnsEvaluatedData2));
+ sharpDiminishingReturnsScalingHistory.put(
+ currentTime, new ScalingSummary(8, 16, sharpDiminishingReturnsEvaluatedData3));
+
+ double sharpDiminishingReturnsScalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(
+ sharpDiminishingReturnsScalingHistory, conf);
+
+ assertTrue(
+ sharpDiminishingReturnsScalingCoefficient < 0.9
+ && sharpDiminishingReturnsScalingCoefficient > 0.4);
+
+ var sharpDiminishingReturnsWithOneParallelismScalingHistory =
+ new TreeMap();
+ var sharpDiminishingReturnsWithOneParallelismEvaluatedData1 = evaluated(1, 100, 50);
+ var sharpDiminishingReturnsWithOneParallelismEvaluatedData2 = evaluated(2, 160, 80);
+ var sharpDiminishingReturnsWithOneParallelismEvaluatedData3 = evaluated(4, 200, 100);
+
+ sharpDiminishingReturnsWithOneParallelismScalingHistory.put(
+ currentTime.minusSeconds(20),
+ new ScalingSummary(1, 2, sharpDiminishingReturnsWithOneParallelismEvaluatedData1));
+ sharpDiminishingReturnsWithOneParallelismScalingHistory.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(2, 4, sharpDiminishingReturnsWithOneParallelismEvaluatedData2));
+ sharpDiminishingReturnsWithOneParallelismScalingHistory.put(
+ currentTime,
+ new ScalingSummary(4, 8, sharpDiminishingReturnsWithOneParallelismEvaluatedData3));
+
+ double sharpDiminishingReturnsWithOneParallelismScalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(
+ sharpDiminishingReturnsWithOneParallelismScalingHistory, conf);
+
+ assertTrue(
+ sharpDiminishingReturnsWithOneParallelismScalingCoefficient < 0.9
+ && sharpDiminishingReturnsWithOneParallelismScalingCoefficient > 0.4);
+
+ conf.set(OBSERVED_SCALABILITY_MIN_OBSERVATIONS, 1);
+
+ var withOneScalingHistoryRecord = new TreeMap();
+
+ var withOneScalingHistoryRecordEvaluatedData1 = evaluated(4, 200, 100);
+
+ withOneScalingHistoryRecord.put(
+ currentTime, new ScalingSummary(4, 8, withOneScalingHistoryRecordEvaluatedData1));
+
+ double withOneScalingHistoryRecordScalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(
+ withOneScalingHistoryRecord, conf);
+
+ assertEquals(1, withOneScalingHistoryRecordScalingCoefficient);
+
+ var diminishingReturnWithTwoScalingHistoryRecord = new TreeMap();
+
+ var diminishingReturnWithTwoScalingHistoryRecordEvaluatedData1 = evaluated(2, 160, 80);
+ var diminishingReturnWithTwoScalingHistoryRecordEvaluatedData2 = evaluated(4, 200, 100);
+
+ diminishingReturnWithTwoScalingHistoryRecord.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(
+ 2, 4, diminishingReturnWithTwoScalingHistoryRecordEvaluatedData1));
+ diminishingReturnWithTwoScalingHistoryRecord.put(
+ currentTime,
+ new ScalingSummary(
+ 4, 8, diminishingReturnWithTwoScalingHistoryRecordEvaluatedData2));
+
+ double diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(
+ diminishingReturnWithTwoScalingHistoryRecord, conf);
+
+ assertTrue(
+ diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient < 0.9
+ && diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient > 0.4);
+
+ var linearReturnWithTwoScalingHistoryRecord = new TreeMap();
+
+ var linearReturnWithTwoScalingHistoryRecordEvaluatedData1 = evaluated(2, 160, 80);
+ var linearReturnWithTwoScalingHistoryRecordEvaluatedData2 = evaluated(4, 320, 160);
+
+ linearReturnWithTwoScalingHistoryRecord.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(2, 4, linearReturnWithTwoScalingHistoryRecordEvaluatedData1));
+ linearReturnWithTwoScalingHistoryRecord.put(
+ currentTime,
+ new ScalingSummary(4, 8, linearReturnWithTwoScalingHistoryRecordEvaluatedData2));
+
+ double linearReturnWithTwoScalingHistoryRecordScalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(
+ linearReturnWithTwoScalingHistoryRecord, conf);
+
+ assertEquals(1, linearReturnWithTwoScalingHistoryRecordScalingCoefficient);
+ }
+
+ @ParameterizedTest
+ @MethodSource("adjustmentInputsProvider")
+ public void testParallelismScalingWithObservedScalingCoefficient(
+ Collection inputShipStrategies) {
+ var op = new JobVertexID();
+ var delayedScaleDown = new DelayedScaleDown();
+ var currentTime = Instant.now();
+
+ conf.set(UTILIZATION_TARGET, 0.5);
+ conf.set(OBSERVED_SCALABILITY_ENABLED, true);
+
+ var linearScalingHistory = new TreeMap();
+ var linearScalingEvaluatedData1 = evaluated(4, 100, 200);
+ var linearScalingEvaluatedData2 = evaluated(2, 400, 100);
+ var linearScalingEvaluatedData3 = evaluated(8, 800, 400);
+
+ linearScalingHistory.put(
+ currentTime.minusSeconds(20),
+ new ScalingSummary(4, 2, linearScalingEvaluatedData1));
+ linearScalingHistory.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(2, 8, linearScalingEvaluatedData2));
+ linearScalingHistory.put(
+ currentTime, new ScalingSummary(8, 16, linearScalingEvaluatedData3));
+
+ assertEquals(
+ ParallelismChange.build(10, true),
+ vertexScaler.computeScaleTargetParallelism(
+ context,
+ op,
+ inputShipStrategies,
+ evaluated(2, 100, 40),
+ linearScalingHistory,
+ restartTime,
+ delayedScaleDown));
+
+ var diminishingReturnsScalingHistory = new TreeMap();
+ var diminishingReturnsEvaluatedData1 = evaluated(4, 80, 160);
+ var diminishingReturnsEvaluatedData2 = evaluated(2, 384, 96);
+ var diminishingReturnsEvaluatedData3 = evaluated(8, 480, 240);
+
+ diminishingReturnsScalingHistory.put(
+ currentTime.minusSeconds(20),
+ new ScalingSummary(4, 2, diminishingReturnsEvaluatedData1));
+ diminishingReturnsScalingHistory.put(
+ currentTime.minusSeconds(10),
+ new ScalingSummary(2, 8, diminishingReturnsEvaluatedData2));
+ diminishingReturnsScalingHistory.put(
+ currentTime, new ScalingSummary(8, 16, diminishingReturnsEvaluatedData3));
+
+ assertEquals(
+ ParallelismChange.build(15, true),
+ vertexScaler.computeScaleTargetParallelism(
+ context,
+ op,
+ inputShipStrategies,
+ evaluated(2, 100, 40),
+ diminishingReturnsScalingHistory,
+ restartTime,
+ delayedScaleDown));
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index e2b5db8564..42fd56e76c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -65,6 +65,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
@@ -622,6 +623,7 @@ public static Optional validateAutoScalerFlinkConfiguration(
UTILIZATION_MIN,
0.0d,
flinkConfiguration.get(UTILIZATION_TARGET)),
+ validateNumber(flinkConfiguration, OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 8caa24eb28..08388b79ea 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -842,6 +842,21 @@ public void testAutoScalerDeploymentWithInvalidExcludedPeriods() {
assertTrue(result.isPresent());
}
+ @Test
+ public void testAutoScalerDeploymentWithInvalidScalingCoefficientMin() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf ->
+ flinkConf.put(
+ AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN
+ .key(),
+ "1.2"));
+ assertErrorContains(
+ result,
+ getFormattedErrorMessage(
+ AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d));
+ }
+
@Test
public void testNonEnabledAutoScalerDeploymentJob() {
var result =