diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
index b5a0dba79e..489bbc9f9e 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
@@ -30,23 +30,96 @@
import java.time.Instant;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** All delayed scale down requests. */
public class DelayedScaleDown {
+ /** Details of the recommended parallelism. */
+ @Data
+ public static class RecommendedParallelism {
+ @Nonnull private final Instant triggerTime;
+ private final int parallelism;
+ private final boolean outsideUtilizationBound;
+
+ @JsonCreator
+ public RecommendedParallelism(
+ @Nonnull @JsonProperty("triggerTime") Instant triggerTime,
+ @JsonProperty("parallelism") int parallelism,
+ @JsonProperty("outsideUtilizationBound") boolean outsideUtilizationBound) {
+ this.triggerTime = triggerTime;
+ this.parallelism = parallelism;
+ this.outsideUtilizationBound = outsideUtilizationBound;
+ }
+ }
+
/** The delayed scale down info for vertex. */
@Data
public static class VertexDelayedScaleDownInfo {
private final Instant firstTriggerTime;
- private int maxRecommendedParallelism;
+
+ /**
+ * In theory, it maintains all recommended parallelisms at each time within the past
+ * `scale-down.interval` window period, so all recommended parallelisms before the window
+ * start time will be evicted.
+ *
+ *
Also, if latest parallelism is greater than the past parallelism, all smaller
+ * parallelism in the past never be the max recommended parallelism, so we could evict all
+ * smaller parallelism in the past. It's a general optimization for calculating max value
+ * for sliding window. So we only need to maintain a list with monotonically decreasing
+ * parallelism within the past window, and the first parallelism will be the max recommended
+ * parallelism within the past `scale-down.interval` window period.
+ */
+ private final LinkedList recommendedParallelisms;
+
+ public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
+ this.firstTriggerTime = firstTriggerTime;
+ this.recommendedParallelisms = new LinkedList<>();
+ }
@JsonCreator
public VertexDelayedScaleDownInfo(
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
- @JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
+ @JsonProperty("recommendedParallelisms")
+ LinkedList recommendedParallelisms) {
this.firstTriggerTime = firstTriggerTime;
- this.maxRecommendedParallelism = maxRecommendedParallelism;
+ this.recommendedParallelisms = recommendedParallelisms;
+ }
+
+ /** Record current recommended parallelism. */
+ public void recordRecommendedParallelism(
+ Instant triggerTime, int parallelism, boolean outsideUtilizationBound) {
+ // Evict all recommended parallelisms that are lower than or equal to the latest
+ // parallelism. When the past parallelism is equal to the latest parallelism,
+ // triggerTime needs to be updated, so it also needs to be evicted.
+ while (!recommendedParallelisms.isEmpty()
+ && recommendedParallelisms.peekLast().getParallelism() <= parallelism) {
+ recommendedParallelisms.pollLast();
+ }
+
+ recommendedParallelisms.addLast(
+ new RecommendedParallelism(triggerTime, parallelism, outsideUtilizationBound));
+ }
+
+ @JsonIgnore
+ public RecommendedParallelism getMaxRecommendedParallelism(Instant windowStartTime) {
+ // Evict all recommended parallelisms before the window start time.
+ while (!recommendedParallelisms.isEmpty()
+ && recommendedParallelisms
+ .peekFirst()
+ .getTriggerTime()
+ .isBefore(windowStartTime)) {
+ recommendedParallelisms.pollFirst();
+ }
+
+ var maxRecommendedParallelism = recommendedParallelisms.peekFirst();
+ checkState(
+ maxRecommendedParallelism != null,
+ "The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug.");
+ return maxRecommendedParallelism;
}
}
@@ -63,18 +136,19 @@ public DelayedScaleDown() {
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
@Nonnull
public VertexDelayedScaleDownInfo triggerScaleDown(
- JobVertexID vertex, Instant triggerTime, int parallelism) {
- var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
- if (vertexDelayedScaleDownInfo == null) {
- // It's the first trigger
- vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
- delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
- updated = true;
- } else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
- // Not the first trigger, but the maxRecommendedParallelism needs to be updated.
- vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
- updated = true;
- }
+ JobVertexID vertex,
+ Instant triggerTime,
+ int parallelism,
+ boolean outsideUtilizationBound) {
+ // The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
+ // update the triggerTime each time.
+ updated = true;
+
+ var vertexDelayedScaleDownInfo =
+ delayedVertices.computeIfAbsent(
+ vertex, k -> new VertexDelayedScaleDownInfo(triggerTime));
+ vertexDelayedScaleDownInfo.recordRecommendedParallelism(
+ triggerTime, parallelism, outsideUtilizationBound);
return vertexDelayedScaleDownInfo;
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index d0e29e677b..d8ee54abfd 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -256,5 +256,7 @@ private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(Context ctx) {
@VisibleForTesting
void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
+ this.metricsCollector.setClock(clock);
+ this.scalingExecutor.setClock(clock);
}
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 41075b7f63..4c185f89ea 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -56,6 +56,8 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
import static org.apache.flink.configuration.description.TextElement.text;
@@ -92,12 +94,15 @@ public JobVertexScaler(AutoScalerEventHandler autoScalerEventHandl
@Getter
public static class ParallelismChange {
- private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1);
+ private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1, false);
private final int newParallelism;
- private ParallelismChange(int newParallelism) {
+ private final boolean outsideUtilizationBound;
+
+ private ParallelismChange(int newParallelism, boolean outsideUtilizationBound) {
this.newParallelism = newParallelism;
+ this.outsideUtilizationBound = outsideUtilizationBound;
}
public boolean isNoChange() {
@@ -113,24 +118,29 @@ public boolean equals(Object o) {
return false;
}
ParallelismChange that = (ParallelismChange) o;
- return newParallelism == that.newParallelism;
+ return newParallelism == that.newParallelism
+ && outsideUtilizationBound == that.outsideUtilizationBound;
}
@Override
public int hashCode() {
- return Objects.hash(newParallelism);
+ return Objects.hash(newParallelism, outsideUtilizationBound);
}
@Override
public String toString() {
return isNoChange()
? "NoParallelismChange"
- : "ParallelismChange{newParallelism=" + newParallelism + '}';
+ : "ParallelismChange{newParallelism="
+ + newParallelism
+ + ", outsideUtilizationBound="
+ + outsideUtilizationBound
+ + "}";
}
- public static ParallelismChange build(int newParallelism) {
+ public static ParallelismChange build(int newParallelism, boolean outsideUtilizationBound) {
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
- return new ParallelismChange(newParallelism);
+ return new ParallelismChange(newParallelism, outsideUtilizationBound);
}
public static ParallelismChange noChange() {
@@ -239,6 +249,8 @@ private ParallelismChange detectBlockScaling(
currentParallelism != newParallelism,
"The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug.");
+ var outsideUtilizationBound = outsideUtilizationBound(vertex, evaluatedMetrics);
+
var scaledUp = currentParallelism < newParallelism;
if (scaledUp) {
@@ -248,7 +260,7 @@ private ParallelismChange detectBlockScaling(
// If we don't have past scaling actions for this vertex, don't block scale up.
if (history.isEmpty()) {
- return ParallelismChange.build(newParallelism);
+ return ParallelismChange.build(newParallelism, outsideUtilizationBound);
}
var lastSummary = history.get(history.lastKey());
@@ -260,28 +272,59 @@ && detectIneffectiveScaleUp(
return ParallelismChange.noChange();
}
- return ParallelismChange.build(newParallelism);
+ return ParallelismChange.build(newParallelism, outsideUtilizationBound);
+ } else {
+ return applyScaleDownInterval(
+ delayedScaleDown, vertex, conf, newParallelism, outsideUtilizationBound);
+ }
+ }
+
+ private static boolean outsideUtilizationBound(
+ JobVertexID vertex, Map metrics) {
+ double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
+ double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
+ double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
+
+ if (trueProcessingRate < scaleUpRateThreshold
+ || trueProcessingRate > scaleDownRateThreshold) {
+ LOG.debug(
+ "Vertex {} processing rate {} is outside ({}, {})",
+ vertex,
+ trueProcessingRate,
+ scaleUpRateThreshold,
+ scaleDownRateThreshold);
+ return true;
} else {
- return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
+ LOG.debug(
+ "Vertex {} processing rate {} is within target ({}, {})",
+ vertex,
+ trueProcessingRate,
+ scaleUpRateThreshold,
+ scaleDownRateThreshold);
}
+ return false;
}
private ParallelismChange applyScaleDownInterval(
DelayedScaleDown delayedScaleDown,
JobVertexID vertex,
Configuration conf,
- int newParallelism) {
+ int newParallelism,
+ boolean outsideUtilizationBound) {
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
if (scaleDownInterval.toMillis() <= 0) {
// The scale down interval is disable, so don't block scaling.
- return ParallelismChange.build(newParallelism);
+ return ParallelismChange.build(newParallelism, outsideUtilizationBound);
}
var now = clock.instant();
- var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
+ var windowStartTime = now.minus(scaleDownInterval);
+ var delayedScaleDownInfo =
+ delayedScaleDown.triggerScaleDown(
+ vertex, now, newParallelism, outsideUtilizationBound);
// Never scale down within scale down interval
- if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
+ if (windowStartTime.isBefore(delayedScaleDownInfo.getFirstTriggerTime())) {
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
} else {
@@ -293,7 +336,11 @@ private ParallelismChange applyScaleDownInterval(
} else {
// Using the maximum parallelism within the scale down interval window instead of the
// latest parallelism when scaling down
- return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
+ var maxRecommendedParallelism =
+ delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime);
+ return ParallelismChange.build(
+ maxRecommendedParallelism.getParallelism(),
+ maxRecommendedParallelism.isOutsideUtilizationBound());
}
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 248fec711b..a9b04276a0 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -41,13 +41,14 @@
import javax.annotation.Nullable;
+import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
@@ -56,9 +57,6 @@
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED;
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Class responsible for executing scaling decisions. */
public class ScalingExecutor> {
@@ -177,44 +175,6 @@ private void updateRecommendedParallelism(
scalingSummary.getNewParallelism())));
}
- @VisibleForTesting
- static boolean allChangedVerticesWithinUtilizationTarget(
- Map> evaluatedMetrics,
- Set changedVertices) {
- // No vertices with changed parallelism.
- if (changedVertices.isEmpty()) {
- return true;
- }
-
- for (JobVertexID vertex : changedVertices) {
- var metrics = evaluatedMetrics.get(vertex);
-
- double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
- double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
- double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
-
- if (trueProcessingRate < scaleUpRateThreshold
- || trueProcessingRate > scaleDownRateThreshold) {
- LOG.debug(
- "Vertex {} processing rate {} is outside ({}, {})",
- vertex,
- trueProcessingRate,
- scaleUpRateThreshold,
- scaleDownRateThreshold);
- return false;
- } else {
- LOG.debug(
- "Vertex {} processing rate {} is within target ({}, {})",
- vertex,
- trueProcessingRate,
- scaleUpRateThreshold,
- scaleDownRateThreshold);
- }
- }
- LOG.info("All vertex processing rates are within target.");
- return true;
- }
-
@VisibleForTesting
Map computeScalingSummary(
Context context,
@@ -234,6 +194,7 @@ Map computeScalingSummary(
var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
+ AtomicBoolean anyVertexOutsideBound = new AtomicBoolean(false);
evaluatedMetrics
.getVertexMetrics()
.forEach(
@@ -259,6 +220,9 @@ Map computeScalingSummary(
if (parallelismChange.isNoChange()) {
return;
}
+ if (parallelismChange.isOutsideUtilizationBound()) {
+ anyVertexOutsideBound.set(true);
+ }
out.put(
v,
new ScalingSummary(
@@ -269,8 +233,8 @@ Map computeScalingSummary(
});
// If the Utilization of all tasks is within range, we can skip scaling.
- if (allChangedVerticesWithinUtilizationTarget(
- evaluatedMetrics.getVertexMetrics(), out.keySet())) {
+ if (!anyVertexOutsideBound.get()) {
+ LOG.info("All vertex processing rates are within target.");
return Map.of();
}
@@ -494,4 +458,9 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+
+ @VisibleForTesting
+ void setClock(Clock clock) {
+ jobVertexScaler.setClock(clock);
+ }
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 4599d6345b..df950bb4c0 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -417,7 +417,6 @@ private void assertCollectedMetricsSize(int expectedSize) throws Exception {
private void setClocksTo(Instant time) {
var clock = Clock.fixed(time, ZoneId.systemDefault());
- metricsCollector.setClock(clock);
autoscaler.setClock(clock);
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java
new file mode 100644
index 0000000000..420d9df3a0
--- /dev/null
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.TestMetrics;
+import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.apache.flink.autoscaler.TestingAutoscalerUtils.getRestClusterClientSupplier;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end test for {@link DelayedScaleDown}. */
+public class DelayedScaleDownEndToEndTest {
+
+ private static final int INITIAL_SOURCE_PARALLELISM = 200;
+ private static final int INITIAL_SINK_PARALLELISM = 1000;
+ private static final double UTILIZATION_TARGET = 0.8;
+
+ private JobAutoScalerContext context;
+ private AutoScalerStateStore> stateStore;
+
+ TestingScalingRealizer> scalingRealizer;
+
+ private TestingMetricsCollector> metricsCollector;
+
+ private JobVertexID source, sink;
+
+ private JobAutoScalerImpl> autoscaler;
+ private Instant now;
+ private int expectedMetricSize;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ context = createDefaultJobAutoScalerContext();
+
+ TestingEventCollector> eventCollector =
+ new TestingEventCollector<>();
+ stateStore = new InMemoryAutoScalerStateStore<>();
+
+ source = new JobVertexID();
+ sink = new JobVertexID();
+
+ metricsCollector =
+ new TestingMetricsCollector<>(
+ new JobTopology(
+ new VertexInfo(source, Map.of(), INITIAL_SOURCE_PARALLELISM, 4000),
+ new VertexInfo(
+ sink,
+ Map.of(source, REBALANCE),
+ INITIAL_SINK_PARALLELISM,
+ 4000)));
+
+ var scaleDownInterval = Duration.ofMinutes(60).minus(Duration.ofSeconds(1));
+ // The metric window size is 9:59 to avoid other metrics are mixed.
+ var metricWindow = Duration.ofMinutes(10).minus(Duration.ofSeconds(1));
+
+ var defaultConf = context.getConfiguration();
+ defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+ defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(0));
+ defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(0));
+ defaultConf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10000);
+ defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, UTILIZATION_TARGET);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, UTILIZATION_TARGET + 0.1);
+ defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, UTILIZATION_TARGET - 0.1);
+ defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, scaleDownInterval);
+ defaultConf.set(AutoScalerOptions.METRICS_WINDOW, metricWindow);
+
+ scalingRealizer = new TestingScalingRealizer<>();
+ autoscaler =
+ new JobAutoScalerImpl<>(
+ metricsCollector,
+ new ScalingMetricEvaluator(),
+ new ScalingExecutor<>(eventCollector, stateStore),
+ eventCollector,
+ scalingRealizer,
+ stateStore);
+
+ // initially the last evaluated metrics are empty
+ assertThat(autoscaler.lastEvaluatedMetrics.get(context.getJobKey())).isNull();
+
+ now = Instant.ofEpochMilli(0);
+ setClocksTo(now);
+ running(now);
+
+ metricsCollector.updateMetrics(source, buildMetric(0, 800));
+ metricsCollector.updateMetrics(sink, buildMetric(0, 800));
+
+ // the recommended parallelism values are empty initially
+ autoscaler.scale(context);
+ expectedMetricSize = 1;
+ assertCollectedMetricsSize(expectedMetricSize);
+ }
+
+ /**
+ * The scale down won't be executed before scale down interval window is full, and it will use
+ * the max recommended parallelism in the past scale down interval window size when scale down
+ * is executed.
+ */
+ @Test
+ void testDelayedScaleDownHappenInLastMetricWindow() throws Exception {
+ var sourceBusyList = List.of(800, 800, 800, 800, 800, 800, 800);
+ var sinkBusyList = List.of(350, 300, 150, 200, 400, 250, 100);
+
+ var metricWindowSize = sourceBusyList.size();
+
+ assertThat(metricWindowSize).isGreaterThan(6);
+
+ var totalRecords = 0L;
+ int recordsPerMinutes = 4800000;
+
+ for (int windowIndex = 0; windowIndex < metricWindowSize; windowIndex++) {
+ for (int i = 1; i <= 10; i++) {
+ now = now.plus(Duration.ofMinutes(1));
+ setClocksTo(now);
+
+ metricsCollector.updateMetrics(
+ source, buildMetric(totalRecords, sourceBusyList.get(windowIndex)));
+ metricsCollector.updateMetrics(
+ sink, buildMetric(totalRecords, sinkBusyList.get(windowIndex)));
+
+ autoscaler.scale(context);
+ // Metric window is 10 minutes, so 10 is the maximal metric size.
+ expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+ assertCollectedMetricsSize(expectedMetricSize);
+
+ // Assert the recommended parallelism.
+ if (windowIndex == metricWindowSize - 1 && i == 10) {
+ // Last metric, we expect scale down is executed, and max recommended
+ // parallelism in the past window should be used.
+ // The max busy time needs more parallelism than others, so we could compute
+ // parallelism based on the max busy time.
+ var expectedSourceParallelism =
+ getExpectedParallelism(sourceBusyList, INITIAL_SOURCE_PARALLELISM);
+ var expectedSinkParallelism =
+ getExpectedParallelism(sinkBusyList, INITIAL_SINK_PARALLELISM);
+ pollAndAssertScalingRealizer(
+ expectedSourceParallelism, expectedSinkParallelism);
+ } else {
+ // Otherwise, scale down cannot be executed.
+ if (windowIndex == 0 && i <= 9) {
+ // Metric window is not full, so don't have recommended parallelism.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM)).isNull();
+ } else {
+ // Scale down won't be executed before scale down interval window is full.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SINK_PARALLELISM);
+ }
+ assertThat(scalingRealizer.events).isEmpty();
+ }
+
+ totalRecords += recordsPerMinutes;
+ }
+ }
+ }
+
+ private static List>
+ sinkParallelismMaxRecommendedParallelismWithinUtilizationBoundaryProvider() {
+ return List.of(
+ List.of(
+ 700, 690, 690, 690, 690, 690, 700, 690, 690, 690, 690, 690, 700, 690, 690,
+ 690, 690, 690, 700, 690, 690, 690, 690, 690, 700, 690, 690, 690, 690, 690,
+ 700),
+ List.of(700, 690, 700, 690, 700, 690, 700, 690, 700, 690, 700, 690, 700, 690, 700),
+ List.of(
+ 790, 200, 200, 200, 200, 200, 790, 200, 200, 200, 200, 200, 790, 200, 200,
+ 200, 200, 200, 790, 200, 200, 200, 200, 200, 790, 200, 200, 200, 200, 200,
+ 790),
+ List.of(790, 200, 790, 200, 790, 200, 790, 200, 790, 200, 790, 200, 790, 200, 790));
+ }
+
+ /**
+ * Job never scale down when the max recommended parallelism in the past scale down interval
+ * window is inside the utilization boundary.
+ */
+ @ParameterizedTest
+ @MethodSource("sinkParallelismMaxRecommendedParallelismWithinUtilizationBoundaryProvider")
+ void testScaleDownNeverHappenWhenMaxRecommendedParallelismWithinUtilizationBoundary(
+ List sinkBusyList) throws Exception {
+ var metricWindowSize = sinkBusyList.size();
+ var sourceBusyList = Collections.nCopies(metricWindowSize, 800);
+
+ assertThat(sourceBusyList).hasSameSizeAs(sinkBusyList);
+
+ var totalRecords = 0L;
+ int recordsPerMinutes = 4800000;
+
+ for (int windowIndex = 0; windowIndex < metricWindowSize; windowIndex++) {
+ for (int i = 1; i <= 10; i++) {
+ now = now.plus(Duration.ofMinutes(1));
+ setClocksTo(now);
+
+ metricsCollector.updateMetrics(
+ source, buildMetric(totalRecords, sourceBusyList.get(windowIndex)));
+ metricsCollector.updateMetrics(
+ sink, buildMetric(totalRecords, sinkBusyList.get(windowIndex)));
+
+ autoscaler.scale(context);
+ // Metric window is 10 minutes, so 10 is the maximal metric size.
+ expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+ assertCollectedMetricsSize(expectedMetricSize);
+
+ // Assert the recommended parallelism.
+ if (windowIndex == 0 && i <= 9) {
+ // Metric window is not full, so don't have recommended parallelism.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM)).isNull();
+ } else {
+ // Scale down won't be executed before scale down interval window is full.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SINK_PARALLELISM);
+ }
+ assertThat(scalingRealizer.events).isEmpty();
+
+ totalRecords += recordsPerMinutes;
+ }
+ }
+ }
+
+ private static Stream scaleDownUtilizationBoundaryFromWithinToOutsideProvider() {
+ return Stream.of(
+ Arguments.of(
+ List.of(
+ 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780,
+ 780, 780, 780, 780),
+ List.of(
+ 800, 790, 780, 770, 760, 750, 740, 730, 720, 710, 700, 690, 680,
+ 670, 660, 660, 660)),
+ Arguments.of(
+ List.of(
+ 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780,
+ 780, 780, 780, 780),
+ List.of(
+ 700, 700, 700, 700, 700, 700, 700, 700, 700, 700, 700, 690, 680,
+ 670, 660, 660, 660)),
+ Arguments.of(
+ List.of(
+ 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780,
+ 780, 780, 780, 780, 780),
+ List.of(
+ 800, 790, 790, 790, 790, 790, 790, 790, 790, 790, 790, 690, 680,
+ 670, 660, 660, 660, 660)),
+ Arguments.of(
+ List.of(
+ 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780,
+ 780, 780, 780, 780),
+ List.of(
+ 800, 790, 780, 770, 760, 750, 740, 730, 720, 710, 700, 100, 100,
+ 100, 100, 100, 100)),
+ Arguments.of(
+ List.of(
+ 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780, 780,
+ 780, 780, 780, 780, 780, 780, 780, 780, 780, 780),
+ // only 50 minutes are outside of bound in the first time
+ List.of(
+ 800, 790, 780, 770, 760, 750, 740, 730, 720, 710, 700, 200, 200,
+ 200, 200, 200, 700, 100, 100, 100, 100, 100, 100)));
+ }
+
+ /**
+ * Initially, all tasks are scaled down within the utilization bound, and scaling down is only
+ * executed when the max recommended parallelism in the past scale down interval for any task is
+ * outside the utilization bound.
+ */
+ @ParameterizedTest
+ @MethodSource("scaleDownUtilizationBoundaryFromWithinToOutsideProvider")
+ void testScaleDownUtilizationBoundaryFromWithinToOutside(
+ List sourceBusyList, List sinkBusyList) throws Exception {
+
+ assertThat(sourceBusyList).hasSameSizeAs(sinkBusyList);
+ var metricWindowSize = sourceBusyList.size();
+
+ assertThat(metricWindowSize).isGreaterThan(6);
+
+ var totalRecords = 0L;
+ int recordsPerMinutes = 4800000;
+ var sinkBusyWindow = new LinkedList();
+ var sinkRecommendationOutsideBound = new LinkedList();
+ var sourceBusyWindow = new LinkedList();
+ var sourceRecommendation = new LinkedList();
+
+ for (int windowIndex = 0; windowIndex < metricWindowSize; windowIndex++) {
+ for (int i = 1; i <= 10; i++) {
+ now = now.plus(Duration.ofMinutes(1));
+ setClocksTo(now);
+
+ var sourceBusyTimePerSec = sourceBusyList.get(windowIndex);
+ var sinkBusyTimePerSec = sinkBusyList.get(windowIndex);
+ sourceBusyWindow.add(sourceBusyTimePerSec);
+ sinkBusyWindow.add(sinkBusyTimePerSec);
+ // Poll the oldest metric.
+ if (sinkBusyWindow.size() > 10) {
+ sinkBusyWindow.pollFirst();
+ sourceBusyWindow.pollFirst();
+ }
+
+ metricsCollector.updateMetrics(
+ source, buildMetric(totalRecords, sourceBusyList.get(windowIndex)));
+ metricsCollector.updateMetrics(
+ sink, buildMetric(totalRecords, sinkBusyList.get(windowIndex)));
+
+ autoscaler.scale(context);
+ // Metric window is 10 minutes, so 10 is the maximal metric size.
+ expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+ assertCollectedMetricsSize(expectedMetricSize);
+
+ if (windowIndex == 0 && i <= 9) {
+ // Metric window is not full, so don't have recommended parallelism.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(scalingRealizer.events).isEmpty();
+ } else {
+ double sourceBusyAvg =
+ sourceBusyWindow.stream().mapToInt(e -> e).average().getAsDouble();
+ double sinkBusyAvg =
+ sinkBusyWindow.stream().mapToInt(e -> e).average().getAsDouble();
+ if (sinkBusyAvg < 700) {
+ var expectedSourceParallelism =
+ getExpectedParallelism(sourceBusyAvg, INITIAL_SOURCE_PARALLELISM);
+ var expectedSinkParallelism =
+ getExpectedParallelism(sinkBusyAvg, INITIAL_SINK_PARALLELISM);
+ sourceRecommendation.add(expectedSourceParallelism);
+ sinkRecommendationOutsideBound.add(expectedSinkParallelism);
+ } else {
+ sourceRecommendation.clear();
+ sinkRecommendationOutsideBound.clear();
+ }
+
+ // Assert the recommended parallelism.
+ // why it's 60 instead of 59
+ if (sinkRecommendationOutsideBound.size() >= 60) {
+ // Last metric, we expect scale down is executed, and max recommended
+ // parallelism in the past window should be used.
+ // The max busy time needs more parallelism than others, so we could compute
+ // parallelism based on the max busy time.
+ var expectedSourceParallelism =
+ sourceRecommendation.stream().mapToInt(e -> e).max().getAsInt();
+ var expectedSinkParallelism =
+ sinkRecommendationOutsideBound.stream()
+ .mapToInt(e -> e)
+ .max()
+ .getAsInt();
+ pollAndAssertScalingRealizer(
+ expectedSourceParallelism, expectedSinkParallelism);
+ break;
+ } else {
+ // Otherwise, scale down cannot be executed.
+ // Scale down won't be executed before scale down interval window is full.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SINK_PARALLELISM);
+ assertThat(scalingRealizer.events).isEmpty();
+ }
+ }
+
+ totalRecords += recordsPerMinutes;
+ }
+ }
+ }
+
+ /** The scale down trigger time will be reset, when other tasks scale up. */
+ @Test
+ void testDelayedScaleDownIsResetWhenAnotherTaskScaleUp() throws Exception {
+ // It expects delayed scale down of sink is reset when scale up is executed for source.
+ var sourceBusyList = List.of(800, 800, 800, 800, 950);
+ var sinkBusyList = List.of(200, 200, 200, 200, 200);
+
+ var sourceBusyWindow = new LinkedList();
+
+ var metricWindowSize = sourceBusyList.size();
+
+ var totalRecords = 0L;
+ int recordsPerMinutes = 4800000;
+
+ for (int windowIndex = 0; windowIndex < metricWindowSize; windowIndex++) {
+ for (int i = 1; i <= 10; i++) {
+ now = now.plus(Duration.ofMinutes(1));
+ setClocksTo(now);
+
+ var busyTimePerSec = sourceBusyList.get(windowIndex);
+ sourceBusyWindow.add(busyTimePerSec);
+ // Poll the oldest metric.
+ if (sourceBusyWindow.size() > 10) {
+ sourceBusyWindow.pollFirst();
+ }
+
+ metricsCollector.updateMetrics(source, buildMetric(totalRecords, busyTimePerSec));
+ metricsCollector.updateMetrics(
+ sink, buildMetric(totalRecords, sinkBusyList.get(windowIndex)));
+
+ autoscaler.scale(context);
+ // Metric window is 10 minutes, so 10 is the maximal metric size.
+ expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+ assertCollectedMetricsSize(expectedMetricSize);
+
+ // Assert the recommended parallelism.
+ if (windowIndex == 0 && i <= 9) {
+ // Metric window is not full, so don't have recommended parallelism.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(scalingRealizer.events).isEmpty();
+ } else {
+ double busyAvg =
+ sourceBusyWindow.stream().mapToInt(e -> e).average().getAsDouble();
+ if (busyAvg > 900) {
+ // Scaling up happens for source, and the scale down of sink cannot be
+ // executed since the scale down interval window is not full.
+ var sourceMaxBusyRatio = busyAvg / 1000;
+ var expectedSourceParallelism =
+ (int)
+ Math.ceil(
+ INITIAL_SOURCE_PARALLELISM
+ * sourceMaxBusyRatio
+ / UTILIZATION_TARGET);
+ pollAndAssertScalingRealizer(
+ expectedSourceParallelism, INITIAL_SINK_PARALLELISM);
+ // The delayed scale down should be cleaned up after source scales up.
+ assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+ .isEmpty();
+ break;
+ } else {
+ // Scale down won't be executed before source utilization within the
+ // utilization bound.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SINK_PARALLELISM);
+ assertThat(scalingRealizer.events).isEmpty();
+ // Delayed scale down is triggered
+ assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+ .isNotEmpty();
+ }
+ }
+ totalRecords += recordsPerMinutes;
+ }
+ }
+ }
+
+ /** The scale down trigger time will be reset, when other tasks scale down. */
+ @Test
+ void testDelayedScaleDownIsResetWhenAnotherTaskScaleDown() throws Exception {
+ // It expects delayed scale down of sink is reset when scale down is executed for source.
+ var sourceBusyList = List.of(200, 200, 200, 200, 200, 200, 200);
+ var sinkBusyList = List.of(800, 800, 200, 200, 200, 200, 200);
+
+ var metricWindowSize = sourceBusyList.size();
+
+ var totalRecords = 0L;
+ int recordsPerMinutes = 4800000;
+
+ for (int windowIndex = 0; windowIndex < metricWindowSize; windowIndex++) {
+ for (int i = 1; i <= 10; i++) {
+ now = now.plus(Duration.ofMinutes(1));
+ setClocksTo(now);
+
+ metricsCollector.updateMetrics(
+ source, buildMetric(totalRecords, sourceBusyList.get(windowIndex)));
+ metricsCollector.updateMetrics(
+ sink, buildMetric(totalRecords, sinkBusyList.get(windowIndex)));
+
+ autoscaler.scale(context);
+ // Metric window is 10 minutes, so 10 is the maximal metric size.
+ expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+ assertCollectedMetricsSize(expectedMetricSize);
+
+ // Assert the recommended parallelism.
+ if (windowIndex == 0 && i <= 9) {
+ // Metric window is not full, so don't have recommended parallelism.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(scalingRealizer.events).isEmpty();
+ } else {
+ if (windowIndex == metricWindowSize - 1 && i == 10) {
+ // Scaling up happens for source, and the scale down of sink cannot be
+ // executed since the scale down interval window is not full.
+ var expectedSourceParallelism =
+ getExpectedParallelism(sourceBusyList, INITIAL_SOURCE_PARALLELISM);
+ pollAndAssertScalingRealizer(
+ expectedSourceParallelism, INITIAL_SINK_PARALLELISM);
+ // The delayed scale down should be cleaned up after source scales down.
+ assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+ .isEmpty();
+ } else {
+ // Scale down won't be executed when scale down interval window of source is
+ // not full.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SINK_PARALLELISM);
+ assertThat(scalingRealizer.events).isEmpty();
+ // Delayed scale down is triggered
+ assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+ .isNotEmpty();
+ }
+ }
+ totalRecords += recordsPerMinutes;
+ }
+ }
+ }
+
+ private static List> sinkParallelismIsGreaterOrEqualProvider() {
+ return List.of(
+ // test for the recommended parallelism is equal to the current parallelism.
+ List.of(200, 200, 200, 200, 800),
+ List.of(700, 700, 700, 700, 800),
+ List.of(780, 780, 780, 780, 800),
+ List.of(790, 800),
+ List.of(760, 800),
+ List.of(750, 800),
+ List.of(350, 800),
+ // test for the recommended parallelism is greater than current parallelism.
+ List.of(200, 200, 200, 200, 850),
+ List.of(700, 700, 700, 700, 850),
+ List.of(780, 780, 780, 780, 850),
+ List.of(790, 850),
+ List.of(760, 850),
+ List.of(750, 850),
+ List.of(350, 850),
+ List.of(200, 200, 200, 200, 900),
+ List.of(700, 700, 700, 700, 900),
+ List.of(780, 780, 780, 780, 900),
+ List.of(790, 900),
+ List.of(760, 900),
+ List.of(750, 900),
+ List.of(350, 900));
+ }
+
+ /**
+ * The triggered scale down of sink will be canceled when the recommended parallelism is greater
+ * than or equal to the current parallelism.
+ */
+ @ParameterizedTest
+ @MethodSource("sinkParallelismIsGreaterOrEqualProvider")
+ void testDelayedScaleDownIsCanceledWhenRecommendedParallelismIsGreaterOrEqual(
+ List sinkBusyList) throws Exception {
+ var metricWindowSize = sinkBusyList.size();
+ var sourceBusyList = Collections.nCopies(metricWindowSize, 800);
+
+ var sinkBusyWindow = new LinkedList();
+
+ var totalRecords = 0L;
+ int recordsPerMinutes = 480000000;
+
+ for (int windowIndex = 0; windowIndex < metricWindowSize; windowIndex++) {
+ for (int i = 1; i <= 10; i++) {
+ now = now.plus(Duration.ofMinutes(1));
+ setClocksTo(now);
+
+ var busyTimePerSec = sinkBusyList.get(windowIndex);
+ sinkBusyWindow.add(busyTimePerSec);
+ // Poll the oldest metric.
+ if (sinkBusyWindow.size() > 10) {
+ sinkBusyWindow.pollFirst();
+ }
+
+ metricsCollector.updateMetrics(
+ source, buildMetric(totalRecords, sourceBusyList.get(windowIndex)));
+ metricsCollector.updateMetrics(sink, buildMetric(totalRecords, busyTimePerSec));
+
+ autoscaler.scale(context);
+ // Metric window is 10 minutes, so 10 is the maximal metric size.
+ expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+ assertCollectedMetricsSize(expectedMetricSize);
+
+ assertThat(scalingRealizer.events).isEmpty();
+
+ // Assert the recommended parallelism.
+ if (windowIndex == 0 && i <= 9) {
+ // Metric window is not full, so don't have recommended parallelism.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM)).isNull();
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM)).isNull();
+ } else {
+ // Scale down won't be executed before scale down interval window is full.
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+ .isEqualTo(INITIAL_SINK_PARALLELISM);
+
+ double sinkBusyAvg =
+ sinkBusyWindow.stream().mapToInt(e -> e).average().getAsDouble();
+ if (sinkBusyAvg >= 800) {
+ // The delayed scale down should be cleaned up after the expected
+ // recommended parallelism is greater than or equal to the current
+ // parallelism.
+ assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+ .isEmpty();
+ break;
+ } else {
+ // Delayed scale down is triggered
+ assertThat(stateStore.getDelayedScaleDown(context).getDelayedVertices())
+ .isNotEmpty();
+ }
+ }
+ totalRecords += recordsPerMinutes;
+ }
+ }
+ }
+
+ private static int getExpectedParallelism(List taskBusyList, int currentParallelism) {
+ var maxBusyTime =
+ taskBusyList.stream()
+ .skip(taskBusyList.size() - 6)
+ .max(Comparator.naturalOrder())
+ .get();
+ return getExpectedParallelism(maxBusyTime, currentParallelism);
+ }
+
+ private static int getExpectedParallelism(double busyTime, int currentParallelism) {
+ var maxBusyRatio = busyTime / 1000;
+ return (int) Math.ceil(currentParallelism * maxBusyRatio / UTILIZATION_TARGET);
+ }
+
+ private void pollAndAssertScalingRealizer(
+ int expectedSourceParallelism, int expectedSinkParallelism) {
+ // Assert metric
+ assertThat(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM))
+ .isEqualTo(expectedSourceParallelism);
+ assertThat(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM))
+ .isEqualTo(expectedSinkParallelism);
+
+ // Check scaling realizer.
+ assertThat(scalingRealizer.events).hasSize(1);
+ var parallelismOverrides = scalingRealizer.events.poll().getParallelismOverrides();
+ assertThat(parallelismOverrides)
+ .containsEntry(source.toHexString(), Integer.toString(expectedSourceParallelism));
+ assertThat(parallelismOverrides)
+ .containsEntry(sink.toHexString(), Integer.toString(expectedSinkParallelism));
+ }
+
+ private void assertCollectedMetricsSize(int expectedSize) throws Exception {
+ assertThat(stateStore.getCollectedMetrics(context)).hasSize(expectedSize);
+ }
+
+ private Double getCurrentMetricValue(JobVertexID jobVertexID, ScalingMetric scalingMetric) {
+ var metric =
+ autoscaler
+ .lastEvaluatedMetrics
+ .get(context.getJobKey())
+ .getVertexMetrics()
+ .get(jobVertexID)
+ .get(scalingMetric);
+ return metric == null ? null : metric.getCurrent();
+ }
+
+ private void running(Instant now) {
+ metricsCollector.setJobUpdateTs(now);
+ context =
+ new JobAutoScalerContext<>(
+ context.getJobKey(),
+ context.getJobID(),
+ JobStatus.RUNNING,
+ context.getConfiguration(),
+ context.getMetricGroup(),
+ getRestClusterClientSupplier());
+ }
+
+ private void setClocksTo(Instant time) {
+ var clock = Clock.fixed(time, ZoneId.systemDefault());
+ metricsCollector.setClock(clock);
+ autoscaler.setClock(clock);
+ }
+
+ private TestMetrics buildMetric(long totalRecords, int busyTimePerSec) {
+ return TestMetrics.builder()
+ .numRecordsIn(totalRecords)
+ .numRecordsOut(totalRecords)
+ .maxBusyTimePerSec(busyTimePerSec)
+ .build();
+ }
+}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
index f27ad5b0ab..8adc62bb94 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
@@ -24,12 +24,153 @@
import java.time.Instant;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link DelayedScaleDown}. */
public class DelayedScaleDownTest {
private final JobVertexID vertex = new JobVertexID();
+ @Test
+ void testWrongWindowStartTime() {
+ var instant = Instant.now();
+ var delayedScaleDown = new DelayedScaleDown();
+
+ // First trigger time as the trigger time, and it won't be updated.
+ var vertexDelayedScaleDownInfo =
+ delayedScaleDown.triggerScaleDown(vertex, instant, 5, true);
+ assertVertexDelayedScaleDownInfo(
+ vertexDelayedScaleDownInfo,
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+ instant);
+
+ // Get the max recommended parallelism from a wrong window, and no any recommended
+ // parallelism since the start window.
+ assertThatThrownBy(
+ () ->
+ vertexDelayedScaleDownInfo.getMaxRecommendedParallelism(
+ instant.plusSeconds(1)))
+ .isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ void testMaxRecommendedParallelismForInitialWindow() {
+ var instant = Instant.now();
+ var delayedScaleDown = new DelayedScaleDown();
+ assertThat(delayedScaleDown.isUpdated()).isFalse();
+
+ delayedScaleDown.triggerScaleDown(vertex, instant, 5, true);
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(1), 9, false);
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(2), 4, true);
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(3), 5, true);
+ var vertexDelayedScaleDownInfo =
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(4), 6, true);
+
+ // [5, 9, 4, 5, 6] -> 9
+ assertVertexDelayedScaleDownInfo(
+ vertexDelayedScaleDownInfo,
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 9, false),
+ instant);
+ // 5, [9, 4, 5, 6] -> 9
+ assertVertexDelayedScaleDownInfo(
+ vertexDelayedScaleDownInfo,
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 9, false),
+ instant.plusSeconds(1));
+ // 5, 9, [4, 5, 6] -> 6
+ assertVertexDelayedScaleDownInfo(
+ vertexDelayedScaleDownInfo,
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true),
+ instant.plusSeconds(2));
+ // 5, 9, 4, [5, 6] -> 6
+ assertVertexDelayedScaleDownInfo(
+ vertexDelayedScaleDownInfo,
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true),
+ instant.plusSeconds(3));
+ // 5, 9, 4, 5, [6] -> 6
+ assertVertexDelayedScaleDownInfo(
+ vertexDelayedScaleDownInfo,
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(4), 6, true),
+ instant.plusSeconds(4));
+ }
+
+ @Test
+ void testMaxRecommendedParallelismForSlidingWindow() {
+ var instant = Instant.now();
+ var delayedScaleDown = new DelayedScaleDown();
+ assertThat(delayedScaleDown.isUpdated()).isFalse();
+
+ // [5] -> 5
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant, 5, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+ instant);
+ // [5, 8] -> 8
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(1), 8, false),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+ instant);
+ // [5, 8, 6] -> 8
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(2), 6, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+ instant);
+ // [5, 8, 6, 4] -> 8
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(3), 4, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+ instant);
+ // 5, [8, 6, 4, 3] -> 8
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(4), 3, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(1), 8, false),
+ instant.plusSeconds(1));
+ // 5, 8, [6, 4, 3, 3] -> 6
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(2), 6, true),
+ instant.plusSeconds(2));
+ // 5, 8, 6, [4, 3, 3, 3] -> 4
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(6), 3, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(3), 4, true),
+ instant.plusSeconds(3));
+
+ // Check the timestamp of latest parallelism is maintained correctly when recommended
+ // parallelism is same.
+ // 5, 8, 6, 4, [3, 3, 3, 3] -> 3
+ // 5, 8, 6, 4, 3, [3, 3, 3] -> 3
+ // 5, 8, 6, 4, 3, 3, [3, 3] -> 3
+ // 5, 8, 6, 4, 3, 3, 3, [3] -> 3
+ var vertexDelayedScaleDownInfo =
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(7), 3, true);
+ for (int offset = 4; offset <= 7; offset++) {
+ assertVertexDelayedScaleDownInfo(
+ vertexDelayedScaleDownInfo,
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(7), 3, true),
+ instant.plusSeconds(offset));
+ }
+ // 5, 8, 6, 4, 3, 3, 3, [3, 9] -> 9
+ assertVertexDelayedScaleDownInfo(
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(9), 9, false),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(9), 9, false),
+ instant.plusSeconds(7));
+ }
+
@Test
void testTriggerUpdateAndClean() {
var instant = Instant.now();
@@ -38,40 +179,52 @@ void testTriggerUpdateAndClean() {
// First trigger time as the trigger time, and it won't be updated.
assertVertexDelayedScaleDownInfo(
- delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5);
+ delayedScaleDown.triggerScaleDown(vertex, instant, 5, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+ instant);
assertThat(delayedScaleDown.isUpdated()).isTrue();
// The lower parallelism doesn't update the result
assertVertexDelayedScaleDownInfo(
- delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3), instant, 5);
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant, 5, true),
+ instant);
// The higher parallelism will update the result
assertVertexDelayedScaleDownInfo(
- delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8), instant, 8);
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8, true),
+ instant,
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(10), 8, true),
+ instant);
// The scale down could be re-triggered again after clean
delayedScaleDown.clearVertex(vertex);
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
assertVertexDelayedScaleDownInfo(
- delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4),
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4, true),
instant.plusSeconds(15),
- 4);
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(15), 4, true),
+ instant);
// The scale down could be re-triggered again after cleanAll
delayedScaleDown.clearAll();
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
assertVertexDelayedScaleDownInfo(
- delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 2),
- instant.plusSeconds(15),
- 2);
+ delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(16), 2, true),
+ instant.plusSeconds(16),
+ new DelayedScaleDown.RecommendedParallelism(instant.plusSeconds(16), 2, true),
+ instant);
}
void assertVertexDelayedScaleDownInfo(
DelayedScaleDown.VertexDelayedScaleDownInfo vertexDelayedScaleDownInfo,
Instant expectedTriggerTime,
- int expectedMaxRecommendedParallelism) {
+ DelayedScaleDown.RecommendedParallelism expectedMaxRecommendedParallelism,
+ Instant windowStartTime) {
assertThat(vertexDelayedScaleDownInfo.getFirstTriggerTime()).isEqualTo(expectedTriggerTime);
- assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism())
+ assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime))
.isEqualTo(expectedMaxRecommendedParallelism);
}
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index af01ce34fd..9cdc71596b 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -104,7 +104,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(5),
+ ParallelismChange.build(5, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -116,7 +116,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
- ParallelismChange.build(8),
+ ParallelismChange.build(8, false),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -140,7 +140,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
- ParallelismChange.build(8),
+ ParallelismChange.build(8, false),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -151,7 +151,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
delayedScaleDown));
assertEquals(
- ParallelismChange.build(8),
+ ParallelismChange.build(8, false),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -163,7 +163,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(UTILIZATION_TARGET, 0.5);
assertEquals(
- ParallelismChange.build(10),
+ ParallelismChange.build(10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -175,7 +175,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(UTILIZATION_TARGET, 0.6);
assertEquals(
- ParallelismChange.build(4),
+ ParallelismChange.build(4, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -188,7 +188,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
assertEquals(
- ParallelismChange.build(5),
+ ParallelismChange.build(5, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -200,7 +200,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
assertEquals(
- ParallelismChange.build(4),
+ ParallelismChange.build(4, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -213,7 +213,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
assertEquals(
- ParallelismChange.build(15),
+ ParallelismChange.build(15, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -225,7 +225,7 @@ public void testParallelismScaling(Collection inputShipStrategies)
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
assertEquals(
- ParallelismChange.build(16),
+ ParallelismChange.build(16, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -533,7 +533,7 @@ public void testMinParallelismLimitIsUsed() {
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(5),
+ ParallelismChange.build(5, true),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
@@ -593,7 +593,7 @@ public void testDisableScaleDownInterval() {
var delayedScaleDown = new DelayedScaleDown();
- assertParallelismChange(10, 50, 100, ParallelismChange.build(5), delayedScaleDown);
+ assertParallelismChange(10, 50, 100, ParallelismChange.build(5, true), delayedScaleDown);
}
@Test
@@ -625,7 +625,8 @@ public void testScaleDownAfterInterval() {
// interval.
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(60)), ZoneId.systemDefault()));
- assertParallelismChange(100, 700, 1000, ParallelismChange.build(90), delayedScaleDown);
+ assertParallelismChange(
+ 100, 700, 1000, ParallelismChange.build(90, false), delayedScaleDown);
}
@Test
@@ -650,7 +651,8 @@ public void testImmediateScaleUpWithinScaleDownInterval() {
// Allow immediate scale up
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(12)), ZoneId.systemDefault()));
- assertParallelismChange(100, 1700, 1000, ParallelismChange.build(170), delayedScaleDown);
+ assertParallelismChange(
+ 100, 1700, 1000, ParallelismChange.build(170, true), delayedScaleDown);
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
}
@@ -710,7 +712,7 @@ public void testIneffectiveScalingDetection() {
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(10),
+ ParallelismChange.build(10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -725,7 +727,7 @@ public void testIneffectiveScalingDetection() {
// Allow to scale higher if scaling was effective (80%)
evaluated = evaluated(10, 180, 90);
assertEquals(
- ParallelismChange.build(20),
+ ParallelismChange.build(20, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -767,7 +769,7 @@ public void testIneffectiveScalingDetection() {
// Allow scale up if current parallelism doesnt match last (user rescaled manually)
evaluated = evaluated(10, 180, 90);
assertEquals(
- ParallelismChange.build(20),
+ ParallelismChange.build(20, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -780,7 +782,7 @@ public void testIneffectiveScalingDetection() {
// Over 10%, effective
evaluated = evaluated(20, 180, 100);
assertEquals(
- ParallelismChange.build(36),
+ ParallelismChange.build(36, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -795,7 +797,7 @@ public void testIneffectiveScalingDetection() {
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false);
evaluated = evaluated(20, 180, 90);
assertEquals(
- ParallelismChange.build(40),
+ ParallelismChange.build(40, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -810,7 +812,7 @@ public void testIneffectiveScalingDetection() {
// Allow scale down even if ineffective
evaluated = evaluated(20, 45, 90);
assertEquals(
- ParallelismChange.build(10),
+ ParallelismChange.build(10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -835,7 +837,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(10),
+ ParallelismChange.build(10, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -850,7 +852,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh
// Effective scale, no events triggered
evaluated = evaluated(10, 180, 90);
assertEquals(
- ParallelismChange.build(20),
+ ParallelismChange.build(20, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -945,7 +947,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh
// Test ineffective scaling switched off
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false);
assertEquals(
- ParallelismChange.build(40),
+ ParallelismChange.build(40, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -1092,7 +1094,7 @@ public void testSendingScalingLimitedEvents() {
var delayedScaleDown = new DelayedScaleDown();
// partition limited
assertEquals(
- ParallelismChange.build(15),
+ ParallelismChange.build(15, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -1115,7 +1117,7 @@ public void testSendingScalingLimitedEvents() {
smallChangesForScaleFactor.put(
ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15));
assertEquals(
- ParallelismChange.build(15),
+ ParallelismChange.build(15, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 8637e126f7..f34aa1fc09 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -279,7 +279,6 @@ private void running(Instant now) {
private void setClocksTo(Instant time) {
var clock = Clock.fixed(time, ZoneId.systemDefault());
- metricsCollector.setClock(clock);
autoscaler.setClock(clock);
}
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index a3bc674611..1b1a10c14f 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -50,7 +50,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -120,91 +119,104 @@ protected boolean scalingWouldExceedMaxResources(
}
@Test
- public void testUtilizationBoundariesForAllRequiredVertices() throws Exception {
+ public void testUtilizationBoundaries() {
// Restart time should not affect utilization boundary
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
- var op1 = new JobVertexID();
+ var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+ var source = JobVertexID.fromHexString(sourceHexString);
+ var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+ var sink = JobVertexID.fromHexString(sinkHexString);
+ var jobTopology =
+ new JobTopology(
+ new VertexInfo(source, Map.of(), 10, 1000, false, null),
+ new VertexInfo(sink, Map.of(source, HASH), 10, 1000, false, null));
+ // 0.7 is outside of the utilization bound [0.6 - 0.0, 0.6 + 0.0], so scaling happens
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
+ var metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 60, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .hasSize(1);
- var evaluated = Map.of(op1, evaluated(1, 70, 100));
- assertFalse(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
-
+ // Both of 0.7 and 0.6 are within the utilization bound [0.6 - 0.2, 0.6 + 0.2], so scaling
+ // could be ignored.
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.4);
- evaluated = Map.of(op1, evaluated(1, 70, 100));
- assertTrue(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
- assertTrue(getScaledParallelism(stateStore, context).isEmpty());
-
- var op2 = new JobVertexID();
- evaluated =
- Map.of(
- op1, evaluated(1, 70, 100),
- op2, evaluated(1, 85, 100));
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 60, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .isEmpty();
- assertFalse(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
+ // 0.85 is outside of the utilization bound [0.6 - 0.2, 0.6 + 0.2], so scaling happens
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 85, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .hasSize(2);
- evaluated =
- Map.of(
- op1, evaluated(1, 70, 100),
- op2, evaluated(1, 70, 100));
- assertTrue(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
+ // Both of 0.7 are within the utilization bound [0.6 - 0.2, 0.6 + 0.2], so scaling could be
+ // ignored.
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 70, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .isEmpty();
// Test with backlog based scaling
- evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
- assertFalse(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
- }
-
- @Test
- public void testUtilizationBoundariesWithOptionalVertex() {
- // Restart time should not affect utilization boundary
- var conf = context.getConfiguration();
- conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
- conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
- var op1 = new JobVertexID();
- var op2 = new JobVertexID();
-
- // All vertices are optional
- conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
- conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
- conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
-
- var evaluated =
- Map.of(
- op1, evaluated(1, 70, 100),
- op2, evaluated(1, 85, 100));
-
- assertTrue(ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of()));
-
- // One vertex is required, and it's out of range.
- assertFalse(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)));
-
- // One vertex is required, and it's within the range.
- // The op2 is optional, so it shouldn't affect the scaling even if it is out of range,
- conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.8);
- conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.6);
- evaluated =
- Map.of(
- op1, evaluated(1, 65, 100),
- op2, evaluated(1, 85, 100));
- assertTrue(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1)));
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 70, 100, 15), sink, evaluated(10, 70, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .hasSize(2);
}
@Test
@@ -230,10 +242,6 @@ public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception {
Map.of(vertex, evaluated(parallelism, targetRate, trueProcessingRate)),
dummyGlobalMetrics);
- assertTrue(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated.getVertexMetrics(), evaluated.getVertexMetrics().keySet()));
-
// Execute the full scaling path
var now = Instant.now();
var jobTopology =
@@ -260,83 +268,142 @@ public void testUtilizationBoundariesAndUtilizationMinMaxCompatibility() {
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
- var op1 = new JobVertexID();
- var op2 = new JobVertexID();
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
+
+ var source = new JobVertexID();
+ var sink = new JobVertexID();
+ var jobTopology =
+ new JobTopology(
+ new VertexInfo(source, Map.of(), 10, 1000, false, null),
+ new VertexInfo(sink, Map.of(source, HASH), 10, 1000, false, null));
- // All vertices are optional
+ // target 0.6, target boundary 0.1 -> max 0.7, min 0.5
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
- var evaluated =
- Map.of(
- op1, evaluated(1, 70, 100),
- op2, evaluated(1, 85, 100));
- // target boundary 0.1, target 0.6, max 0.7, min 0.5
- boolean boundaryOp1 =
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1));
- boolean boundaryOp2 =
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2));
+ var metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 70, 100), sink, evaluated(10, 85, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .containsOnlyKeys(source, sink);
- // Remove target boundary and use min max, should get the same result
+ // target 0.6, max 0.7, min 0.5
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
conf.removeConfig(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.5);
- boolean minMaxOp1 =
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1));
- boolean minMaxOp2 =
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op2));
- assertEquals(boundaryOp1, minMaxOp1);
- assertEquals(boundaryOp2, minMaxOp2);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .containsOnlyKeys(source, sink);
// When the target boundary parameter is used,
// but the min max parameter is also set,
// the min max parameter shall prevail.
+ // target 0.6, max 0.7, min 0.3
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.6);
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.7);
conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
- evaluated =
- Map.of(
- op1, evaluated(2, 150, 100),
- op2, evaluated(1, 85, 100));
- assertFalse(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 60, 100), sink, evaluated(10, 85, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .containsOnlyKeys(sink);
- // When the target boundary parameter is used,
- // but the max parameter is also set,
- conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
+ // When the target boundary parameter and max parameter are set,
+ // we expect the min is derived from boundary.
+ // target 0.5, max 0.6, min 0
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
conf.set(AutoScalerOptions.UTILIZATION_MAX, 0.6);
+ conf.removeConfig(AutoScalerOptions.UTILIZATION_MIN);
- evaluated =
- Map.of(
- op1, evaluated(2, 100, 99999),
- op2, evaluated(1, 80, 99999));
- assertTrue(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 100, 99999), sink, evaluated(10, 80, 99999)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .isEmpty();
- evaluated = Map.of(op2, evaluated(1, 85, 100));
- assertFalse(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 50, 100), sink, evaluated(10, 85, 100)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .containsOnlyKeys(sink);
+ // When the target boundary parameter and min parameter are set,
+ // we expect the max is derived from boundary.
+ // target 0.5, max 1.0, min 0.3
+ conf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.5);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 1.);
conf.removeConfig(AutoScalerOptions.UTILIZATION_MAX);
conf.set(AutoScalerOptions.UTILIZATION_MIN, 0.3);
- evaluated =
- Map.of(
- op1, evaluated(2, 80, 81),
- op2, evaluated(1, 100, 101));
- assertTrue(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 80, 81), sink, evaluated(10, 100, 101)),
+ dummyGlobalMetrics);
- evaluated = Map.of(op1, evaluated(1, 80, 79));
- assertFalse(
- ScalingExecutor.allChangedVerticesWithinUtilizationTarget(
- evaluated, evaluated.keySet()));
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .isEmpty();
+
+ metrics =
+ new EvaluatedMetrics(
+ Map.of(source, evaluated(10, 80, 79), sink, evaluated(10, 100, 101)),
+ dummyGlobalMetrics);
+ assertThat(
+ scalingExecutor.computeScalingSummary(
+ context,
+ metrics,
+ Map.of(),
+ Duration.ZERO,
+ jobTopology,
+ new DelayedScaleDown()))
+ .containsOnlyKeys(source, sink);
}
@Test
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index d4e0db9aa3..da2fa09735 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -207,8 +207,9 @@ protected void testDiscardAllState() throws Exception {
stateStore.storeScalingTracking(ctx, scalingTracking);
var delayedScaleDown = new DelayedScaleDown();
- delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10);
- delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now().plusSeconds(10), 12);
+ delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10, true);
+ delayedScaleDown.triggerScaleDown(
+ new JobVertexID(), Instant.now().plusSeconds(10), 12, true);
stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);