From 8fc87aeedd7667a787f159f53edf41fe76a34a72 Mon Sep 17 00:00:00 2001 From: Rui Fan Date: Sun, 24 Nov 2024 16:16:08 +0800 Subject: [PATCH] [FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency 1. Using the maximum parallelism within the window instead of the latest parallelism when scaling down 2. Never scale down when (currentTime - triggerTime) < scale-down.interval --- .../jdbc/state/JdbcAutoScalerStateStore.java | 8 +- .../flink/autoscaler/DelayedScaleDown.java | 68 ++++++++---- .../flink/autoscaler/JobVertexScaler.java | 78 ++++++------- .../flink/autoscaler/ScalingExecutor.java | 25 ++--- .../autoscaler/DelayedScaleDownTest.java | 77 +++++++++++++ .../flink/autoscaler/JobVertexScalerTest.java | 105 +++++++++--------- .../flink/autoscaler/ScalingExecutorTest.java | 18 +-- .../AbstractAutoScalerStateStoreTest.java | 19 ++-- .../state/KubernetesAutoScalerStateStore.java | 8 +- 9 files changed, 245 insertions(+), 161 deletions(-) create mode 100644 flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java index 5ac870fe6a..dfadc9ac38 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java @@ -237,7 +237,7 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) { try { return deserializeDelayedScaleDown(delayedScaleDown.get()); } catch (JacksonException e) { - LOG.error( + LOG.warn( "Could not deserialize delayed scale down, possibly the format changed. Discarding...", e); jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN); @@ -330,13 +330,11 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides) private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown) throws JacksonException { - return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime()); + return YAML_MAPPER.writeValueAsString(delayedScaleDown); } private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown) throws JacksonException { - Map firstTriggerTime = - YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); - return new DelayedScaleDown(firstTriggerTime); + return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java index c2402264dc..b5a0dba79e 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java @@ -19,51 +19,81 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Data; import lombok.Getter; +import javax.annotation.Nonnull; + import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.Optional; /** All delayed scale down requests. */ public class DelayedScaleDown { - @Getter private final Map firstTriggerTime; + /** The delayed scale down info for vertex. */ + @Data + public static class VertexDelayedScaleDownInfo { + private final Instant firstTriggerTime; + private int maxRecommendedParallelism; + + @JsonCreator + public VertexDelayedScaleDownInfo( + @JsonProperty("firstTriggerTime") Instant firstTriggerTime, + @JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) { + this.firstTriggerTime = firstTriggerTime; + this.maxRecommendedParallelism = maxRecommendedParallelism; + } + } + + @Getter private final Map delayedVertices; // Have any scale down request been updated? It doesn't need to be stored, it is only used to // determine whether DelayedScaleDown needs to be stored. - @Getter private boolean isUpdated = false; + @JsonIgnore @Getter private boolean updated = false; public DelayedScaleDown() { - this.firstTriggerTime = new HashMap<>(); - } - - public DelayedScaleDown(Map firstTriggerTime) { - this.firstTriggerTime = firstTriggerTime; + this.delayedVertices = new HashMap<>(); } - Optional getFirstTriggerTimeForVertex(JobVertexID vertex) { - return Optional.ofNullable(firstTriggerTime.get(vertex)); - } + /** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */ + @Nonnull + public VertexDelayedScaleDownInfo triggerScaleDown( + JobVertexID vertex, Instant triggerTime, int parallelism) { + var vertexDelayedScaleDownInfo = delayedVertices.get(vertex); + if (vertexDelayedScaleDownInfo == null) { + // It's the first trigger + vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism); + delayedVertices.put(vertex, vertexDelayedScaleDownInfo); + updated = true; + } else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) { + // Not the first trigger, but the maxRecommendedParallelism needs to be updated. + vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism); + updated = true; + } - void updateTriggerTime(JobVertexID vertex, Instant instant) { - firstTriggerTime.put(vertex, instant); - isUpdated = true; + return vertexDelayedScaleDownInfo; } + // Clear the delayed scale down for corresponding vertex when the recommended parallelism is + // greater than or equal to the currentParallelism. void clearVertex(JobVertexID vertex) { - Instant removed = firstTriggerTime.remove(vertex); + VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex); if (removed != null) { - isUpdated = true; + updated = true; } } + // Clear all delayed scale down when rescale happens. void clearAll() { - if (firstTriggerTime.isEmpty()) { + if (delayedVertices.isEmpty()) { return; } - firstTriggerTime.clear(); - isUpdated = true; + delayedVertices.clear(); + updated = true; } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 84520d0fc2..40f25c7782 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler autoScalerEventHandl this.autoScalerEventHandler = autoScalerEventHandler; } - /** The parallelism change type of {@link ParallelismChange}. */ - public enum ParallelismChangeType { - NO_CHANGE, - REQUIRED_CHANGE, - OPTIONAL_CHANGE; - } - - /** - * The rescaling will be triggered if any vertex's ParallelismChange is required. This means - * that if all vertices' ParallelismChange is optional, rescaling will be ignored. - */ + /** The rescaling will be triggered if any vertex's {@link ParallelismChange} is changed. */ @Getter public static class ParallelismChange { - private static final ParallelismChange NO_CHANGE = - new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1); + private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1); - private final ParallelismChangeType changeType; private final int newParallelism; - private ParallelismChange(ParallelismChangeType changeType, int newParallelism) { - this.changeType = changeType; + private ParallelismChange(int newParallelism) { this.newParallelism = newParallelism; } + public boolean isNoChange() { + return this == NO_CHANGE; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -122,30 +113,24 @@ public boolean equals(Object o) { return false; } ParallelismChange that = (ParallelismChange) o; - return changeType == that.changeType && newParallelism == that.newParallelism; + return newParallelism == that.newParallelism; } @Override public int hashCode() { - return Objects.hash(changeType, newParallelism); + return Objects.hash(newParallelism); } @Override public String toString() { - return "ParallelismChange{" - + "changeType=" - + changeType - + ", newParallelism=" - + newParallelism - + '}'; + return isNoChange() + ? "NoParallelismChange" + : "ParallelismChange{newParallelism=" + newParallelism + '}'; } - public static ParallelismChange required(int newParallelism) { - return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism); - } - - public static ParallelismChange optional(int newParallelism) { - return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism); + public static ParallelismChange build(int newParallelism) { + checkArgument(newParallelism > 0, "The parallelism should be greater than 0."); + return new ParallelismChange(newParallelism); } public static ParallelismChange noChange() { @@ -263,7 +248,7 @@ private ParallelismChange detectBlockScaling( // If we don't have past scaling actions for this vertex, don't block scale up. if (history.isEmpty()) { - return ParallelismChange.required(newParallelism); + return ParallelismChange.build(newParallelism); } var lastSummary = history.get(history.lastKey()); @@ -275,7 +260,7 @@ && detectIneffectiveScaleUp( return ParallelismChange.noChange(); } - return ParallelismChange.required(newParallelism); + return ParallelismChange.build(newParallelism); } else { return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism); } @@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval( var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL); if (scaleDownInterval.toMillis() <= 0) { // The scale down interval is disable, so don't block scaling. - return ParallelismChange.required(newParallelism); - } - - var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex); - if (firstTriggerTime.isEmpty()) { - LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); - delayedScaleDown.updateTriggerTime(vertex, clock.instant()); - return ParallelismChange.optional(newParallelism); + return ParallelismChange.build(newParallelism); } - if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) { - LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex); - return ParallelismChange.optional(newParallelism); + var now = clock.instant(); + var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism); + + // Never scale down within scale down interval + if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) { + if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) { + LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); + } else { + LOG.debug( + "Try to skip immediate scale down within scale-down interval for {}", + vertex); + } + return ParallelismChange.noChange(); } else { - return ParallelismChange.required(newParallelism); + // Using the maximum parallelism within the scale down interval window instead of the + // latest parallelism when scaling down + return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism()); } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index 02e5ad4f15..248fec711b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -45,13 +45,10 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.SortedMap; -import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE; -import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE; import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; @@ -181,15 +178,15 @@ private void updateRecommendedParallelism( } @VisibleForTesting - static boolean allRequiredVerticesWithinUtilizationTarget( + static boolean allChangedVerticesWithinUtilizationTarget( Map> evaluatedMetrics, - Set requiredVertices) { - // All vertices' ParallelismChange is optional, rescaling will be ignored. - if (requiredVertices.isEmpty()) { + Set changedVertices) { + // No vertices with changed parallelism. + if (changedVertices.isEmpty()) { return true; } - for (JobVertexID vertex : requiredVertices) { + for (JobVertexID vertex : changedVertices) { var metrics = evaluatedMetrics.get(vertex); double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage(); @@ -234,7 +231,6 @@ Map computeScalingSummary( } var out = new HashMap(); - var requiredVertices = new HashSet(); var excludeVertexIdList = context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); @@ -260,10 +256,8 @@ Map computeScalingSummary( v, Collections.emptySortedMap()), restartTime, delayedScaleDown); - if (NO_CHANGE == parallelismChange.getChangeType()) { + if (parallelismChange.isNoChange()) { return; - } else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) { - requiredVertices.add(v); } out.put( v, @@ -274,10 +268,9 @@ Map computeScalingSummary( } }); - // If the Utilization of all required tasks is within range, we can skip scaling. - // It means that if only optional tasks are out of scope, we still need to ignore scale. - if (allRequiredVerticesWithinUtilizationTarget( - evaluatedMetrics.getVertexMetrics(), requiredVertices)) { + // If the Utilization of all tasks is within range, we can skip scaling. + if (allChangedVerticesWithinUtilizationTarget( + evaluatedMetrics.getVertexMetrics(), out.keySet())) { return Map.of(); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java new file mode 100644 index 0000000000..f27ad5b0ab --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DelayedScaleDown}. */ +public class DelayedScaleDownTest { + + private final JobVertexID vertex = new JobVertexID(); + + @Test + void testTriggerUpdateAndClean() { + var instant = Instant.now(); + var delayedScaleDown = new DelayedScaleDown(); + assertThat(delayedScaleDown.isUpdated()).isFalse(); + + // First trigger time as the trigger time, and it won't be updated. + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5); + assertThat(delayedScaleDown.isUpdated()).isTrue(); + + // The lower parallelism doesn't update the result + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3), instant, 5); + + // The higher parallelism will update the result + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8), instant, 8); + + // The scale down could be re-triggered again after clean + delayedScaleDown.clearVertex(vertex); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4), + instant.plusSeconds(15), + 4); + + // The scale down could be re-triggered again after cleanAll + delayedScaleDown.clearAll(); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 2), + instant.plusSeconds(15), + 2); + } + + void assertVertexDelayedScaleDownInfo( + DelayedScaleDown.VertexDelayedScaleDownInfo vertexDelayedScaleDownInfo, + Instant expectedTriggerTime, + int expectedMaxRecommendedParallelism) { + assertThat(vertexDelayedScaleDownInfo.getFirstTriggerTime()).isEqualTo(expectedTriggerTime); + assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) + .isEqualTo(expectedMaxRecommendedParallelism); + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 6b6c3b15ee..3c557ab397 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -99,10 +99,11 @@ public void setup() { public void testParallelismScaling(Collection inputShipStrategies) { var op = new JobVertexID(); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.optional(5), + ParallelismChange.build(5), vertexScaler.computeScaleTargetParallelism( context, op, @@ -114,7 +115,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( - ParallelismChange.optional(8), + ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( context, op, @@ -138,7 +139,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( - ParallelismChange.optional(8), + ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( context, op, @@ -149,7 +150,7 @@ public void testParallelismScaling(Collection inputShipStrategies) delayedScaleDown)); assertEquals( - ParallelismChange.optional(8), + ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( context, op, @@ -161,7 +162,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, op, @@ -173,7 +174,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); assertEquals( - ParallelismChange.required(4), + ParallelismChange.build(4), vertexScaler.computeScaleTargetParallelism( context, op, @@ -186,7 +187,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( - ParallelismChange.optional(5), + ParallelismChange.build(5), vertexScaler.computeScaleTargetParallelism( context, op, @@ -198,7 +199,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6); assertEquals( - ParallelismChange.optional(4), + ParallelismChange.build(4), vertexScaler.computeScaleTargetParallelism( context, op, @@ -211,7 +212,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5); assertEquals( - ParallelismChange.required(15), + ParallelismChange.build(15), vertexScaler.computeScaleTargetParallelism( context, op, @@ -223,7 +224,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6); assertEquals( - ParallelismChange.required(16), + ParallelismChange.build(16), vertexScaler.computeScaleTargetParallelism( context, op, @@ -527,10 +528,11 @@ public void ensureMinParallelismDoesNotExceedMax() { @Test public void testMinParallelismLimitIsUsed() { conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5); + conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.optional(5), + ParallelismChange.build(5), vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), @@ -590,46 +592,39 @@ public void testDisableScaleDownInterval() { var delayedScaleDown = new DelayedScaleDown(); - assertParallelismChange(10, 50, 100, ParallelismChange.required(5), delayedScaleDown); + assertParallelismChange(10, 50, 100, ParallelismChange.build(5), delayedScaleDown); } @Test - public void testRequiredScaleDownAfterInterval() { + public void testScaleDownAfterInterval() { conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); var delayedScaleDown = new DelayedScaleDown(); - // The scale down shouldn't be required. + // The scale down never happen when scale down is first triggered. vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault())); - assertParallelismChange(100, 800, 1000, ParallelismChange.optional(80), delayedScaleDown); + assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), delayedScaleDown); - // Within scale down interval. + // The scale down never happen within scale down interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(10)), ZoneId.systemDefault())); - assertParallelismChange(100, 900, 1000, ParallelismChange.optional(90), delayedScaleDown); - - vertexScaler.setClock( - Clock.fixed(instant.plus(Duration.ofSeconds(20)), ZoneId.systemDefault())); - assertParallelismChange(100, 820, 1000, ParallelismChange.optional(82), delayedScaleDown); + assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), delayedScaleDown); vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(40)), ZoneId.systemDefault())); - assertParallelismChange(100, 720, 1000, ParallelismChange.optional(72), delayedScaleDown); - - vertexScaler.setClock( - Clock.fixed(instant.plus(Duration.ofSeconds(50)), ZoneId.systemDefault())); - assertParallelismChange(100, 600, 1000, ParallelismChange.optional(60), delayedScaleDown); + assertParallelismChange(100, 720, 1000, ParallelismChange.noChange(), delayedScaleDown); vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(59)), ZoneId.systemDefault())); - assertParallelismChange(100, 640, 1000, ParallelismChange.optional(64), delayedScaleDown); + assertParallelismChange(100, 640, 1000, ParallelismChange.noChange(), delayedScaleDown); - // The scale down is required after the scale down interval ends. + // The parallelism result should be the max recommended parallelism within the scale down + // interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(60)), ZoneId.systemDefault())); - assertParallelismChange(100, 700, 1000, ParallelismChange.required(70), delayedScaleDown); + assertParallelismChange(100, 700, 1000, ParallelismChange.build(90), delayedScaleDown); } @Test @@ -640,22 +635,22 @@ public void testImmediateScaleUpWithinScaleDownInterval() { var delayedScaleDown = new DelayedScaleDown(); - // The scale down shouldn't be required. + // The scale down never happen when scale down is first triggered. vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault())); - assertParallelismChange(100, 800, 1000, ParallelismChange.optional(80), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); - // Within scale down interval. + // The scale down never happen within scale down interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(10)), ZoneId.systemDefault())); - assertParallelismChange(100, 900, 1000, ParallelismChange.optional(90), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); // Allow immediate scale up vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(12)), ZoneId.systemDefault())); - assertParallelismChange(100, 1700, 1000, ParallelismChange.required(170), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty(); + assertParallelismChange(100, 1700, 1000, ParallelismChange.build(170), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); } @Test @@ -666,22 +661,22 @@ public void testCancelDelayedScaleDownAfterNewParallelismIsSame() { var delayedScaleDown = new DelayedScaleDown(); - // The scale down shouldn't be required. + // The scale down never happen when scale down is first triggered. vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault())); - assertParallelismChange(100, 800, 1000, ParallelismChange.optional(80), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); - // Within scale down interval. + // The scale down never happen within scale down interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(10)), ZoneId.systemDefault())); - assertParallelismChange(100, 900, 1000, ParallelismChange.optional(90), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); // The delayed scale down is canceled when new parallelism is same with current parallelism. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(12)), ZoneId.systemDefault())); assertParallelismChange(100, 1000, 1000, ParallelismChange.noChange(), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty(); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); } private void assertParallelismChange( @@ -714,7 +709,7 @@ public void testIneffectiveScalingDetection() { var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, op, @@ -729,7 +724,7 @@ public void testIneffectiveScalingDetection() { // Allow to scale higher if scaling was effective (80%) evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.required(20), + ParallelismChange.build(20), vertexScaler.computeScaleTargetParallelism( context, op, @@ -771,7 +766,7 @@ public void testIneffectiveScalingDetection() { // Allow scale up if current parallelism doesnt match last (user rescaled manually) evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.required(20), + ParallelismChange.build(20), vertexScaler.computeScaleTargetParallelism( context, op, @@ -784,7 +779,7 @@ public void testIneffectiveScalingDetection() { // Over 10%, effective evaluated = evaluated(20, 180, 100); assertEquals( - ParallelismChange.required(36), + ParallelismChange.build(36), vertexScaler.computeScaleTargetParallelism( context, op, @@ -799,7 +794,7 @@ public void testIneffectiveScalingDetection() { conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); evaluated = evaluated(20, 180, 90); assertEquals( - ParallelismChange.required(40), + ParallelismChange.build(40), vertexScaler.computeScaleTargetParallelism( context, op, @@ -814,7 +809,7 @@ public void testIneffectiveScalingDetection() { // Allow scale down even if ineffective evaluated = evaluated(20, 45, 90); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, op, @@ -839,7 +834,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -854,7 +849,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh // Effective scale, no events triggered evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.required(20), + ParallelismChange.build(20), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -949,7 +944,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh // Test ineffective scaling switched off conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); assertEquals( - ParallelismChange.required(40), + ParallelismChange.build(40), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -1096,7 +1091,7 @@ public void testSendingScalingLimitedEvents() { var delayedScaleDown = new DelayedScaleDown(); // partition limited assertEquals( - ParallelismChange.required(15), + ParallelismChange.build(15), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -1119,7 +1114,7 @@ public void testSendingScalingLimitedEvents() { smallChangesForScaleFactor.put( ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15)); assertEquals( - ParallelismChange.required(15), + ParallelismChange.build(15), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 3eeac97b06..d3fc167492 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -133,13 +133,13 @@ public void testUtilizationBoundariesForAllRequiredVertices() throws Exception { var evaluated = Map.of(op1, evaluated(1, 70, 100)); assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2); evaluated = Map.of(op1, evaluated(1, 70, 100)); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); assertTrue(getScaledParallelism(stateStore, context).isEmpty()); @@ -150,7 +150,7 @@ op1, evaluated(1, 70, 100), op2, evaluated(1, 85, 100)); assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); evaluated = @@ -158,13 +158,13 @@ op1, evaluated(1, 70, 100), op1, evaluated(1, 70, 100), op2, evaluated(1, 70, 100)); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); // Test with backlog based scaling evaluated = Map.of(op1, evaluated(1, 70, 100, 15)); assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); } @@ -186,11 +186,11 @@ public void testUtilizationBoundariesWithOptionalVertex() { op1, evaluated(1, 70, 100), op2, evaluated(1, 85, 100)); - assertTrue(ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of())); + assertTrue(ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of())); // One vertex is required, and it's out of range. assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); // One vertex is required, and it's within the range. // The op2 is optional, so it shouldn't affect the scaling even if it is out of range, @@ -200,7 +200,7 @@ op1, evaluated(1, 70, 100), op1, evaluated(1, 65, 100), op2, evaluated(1, 85, 100)); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); } @Test @@ -225,7 +225,7 @@ public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception { dummyGlobalMetrics); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated.getVertexMetrics(), evaluated.getVertexMetrics().keySet())); // Execute the full scaling path diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java index 3dff84060a..d4e0db9aa3 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java @@ -184,7 +184,7 @@ protected void testDiscardAllState() throws Exception { assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty(); assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty(); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty(); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()).isEmpty(); stateStore.storeCollectedMetrics( ctx, new TreeMap<>(Map.of(Instant.now(), new CollectedMetrics()))); @@ -206,16 +206,19 @@ protected void testDiscardAllState() throws Exception { Instant.now().minus(Duration.ofHours(1)), new ScalingRecord()); stateStore.storeScalingTracking(ctx, scalingTracking); - var firstTriggerTime = Map.of(new JobVertexID(), Instant.now()); - stateStore.storeDelayedScaleDown(ctx, new DelayedScaleDown(firstTriggerTime)); + var delayedScaleDown = new DelayedScaleDown(); + delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10); + delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now().plusSeconds(10), 12); + + stateStore.storeDelayedScaleDown(ctx, delayedScaleDown); assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty(); assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()) - .isEqualTo(firstTriggerTime); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()) + .isEqualTo(delayedScaleDown.getDelayedVertices()); stateStore.flush(ctx); @@ -224,8 +227,8 @@ protected void testDiscardAllState() throws Exception { assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty(); assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()) - .isEqualTo(firstTriggerTime); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()) + .isEqualTo(delayedScaleDown.getDelayedVertices()); stateStore.clearAll(ctx); @@ -234,6 +237,6 @@ protected void testDiscardAllState() throws Exception { assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty(); assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty(); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty(); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()).isEmpty(); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java index afd247dc77..1059b96ab0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java @@ -247,7 +247,7 @@ public DelayedScaleDown getDelayedScaleDown(KubernetesJobAutoScalerContext jobCo try { return deserializeDelayedScaleDown(delayedScaleDown.get()); } catch (JacksonException e) { - LOG.error( + LOG.warn( "Could not deserialize delayed scale down, possibly the format changed. Discarding...", e); configMapStore.removeSerializedState(jobContext, DELAYED_SCALE_DOWN); @@ -334,14 +334,12 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides) private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown) throws JacksonException { - return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime()); + return YAML_MAPPER.writeValueAsString(delayedScaleDown); } private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown) throws JacksonException { - Map firstTriggerTime = - YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); - return new DelayedScaleDown(firstTriggerTime); + return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); } @VisibleForTesting