diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java index 5ac870fe6a..dfadc9ac38 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java @@ -237,7 +237,7 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) { try { return deserializeDelayedScaleDown(delayedScaleDown.get()); } catch (JacksonException e) { - LOG.error( + LOG.warn( "Could not deserialize delayed scale down, possibly the format changed. Discarding...", e); jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN); @@ -330,13 +330,11 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides) private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown) throws JacksonException { - return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime()); + return YAML_MAPPER.writeValueAsString(delayedScaleDown); } private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown) throws JacksonException { - Map firstTriggerTime = - YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); - return new DelayedScaleDown(firstTriggerTime); + return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java index c2402264dc..b5a0dba79e 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java @@ -19,51 +19,81 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Data; import lombok.Getter; +import javax.annotation.Nonnull; + import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.Optional; /** All delayed scale down requests. */ public class DelayedScaleDown { - @Getter private final Map firstTriggerTime; + /** The delayed scale down info for vertex. */ + @Data + public static class VertexDelayedScaleDownInfo { + private final Instant firstTriggerTime; + private int maxRecommendedParallelism; + + @JsonCreator + public VertexDelayedScaleDownInfo( + @JsonProperty("firstTriggerTime") Instant firstTriggerTime, + @JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) { + this.firstTriggerTime = firstTriggerTime; + this.maxRecommendedParallelism = maxRecommendedParallelism; + } + } + + @Getter private final Map delayedVertices; // Have any scale down request been updated? It doesn't need to be stored, it is only used to // determine whether DelayedScaleDown needs to be stored. - @Getter private boolean isUpdated = false; + @JsonIgnore @Getter private boolean updated = false; public DelayedScaleDown() { - this.firstTriggerTime = new HashMap<>(); - } - - public DelayedScaleDown(Map firstTriggerTime) { - this.firstTriggerTime = firstTriggerTime; + this.delayedVertices = new HashMap<>(); } - Optional getFirstTriggerTimeForVertex(JobVertexID vertex) { - return Optional.ofNullable(firstTriggerTime.get(vertex)); - } + /** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */ + @Nonnull + public VertexDelayedScaleDownInfo triggerScaleDown( + JobVertexID vertex, Instant triggerTime, int parallelism) { + var vertexDelayedScaleDownInfo = delayedVertices.get(vertex); + if (vertexDelayedScaleDownInfo == null) { + // It's the first trigger + vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism); + delayedVertices.put(vertex, vertexDelayedScaleDownInfo); + updated = true; + } else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) { + // Not the first trigger, but the maxRecommendedParallelism needs to be updated. + vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism); + updated = true; + } - void updateTriggerTime(JobVertexID vertex, Instant instant) { - firstTriggerTime.put(vertex, instant); - isUpdated = true; + return vertexDelayedScaleDownInfo; } + // Clear the delayed scale down for corresponding vertex when the recommended parallelism is + // greater than or equal to the currentParallelism. void clearVertex(JobVertexID vertex) { - Instant removed = firstTriggerTime.remove(vertex); + VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex); if (removed != null) { - isUpdated = true; + updated = true; } } + // Clear all delayed scale down when rescale happens. void clearAll() { - if (firstTriggerTime.isEmpty()) { + if (delayedVertices.isEmpty()) { return; } - firstTriggerTime.clear(); - isUpdated = true; + delayedVertices.clear(); + updated = true; } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 84520d0fc2..40f25c7782 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler autoScalerEventHandl this.autoScalerEventHandler = autoScalerEventHandler; } - /** The parallelism change type of {@link ParallelismChange}. */ - public enum ParallelismChangeType { - NO_CHANGE, - REQUIRED_CHANGE, - OPTIONAL_CHANGE; - } - - /** - * The rescaling will be triggered if any vertex's ParallelismChange is required. This means - * that if all vertices' ParallelismChange is optional, rescaling will be ignored. - */ + /** The rescaling will be triggered if any vertex's {@link ParallelismChange} is changed. */ @Getter public static class ParallelismChange { - private static final ParallelismChange NO_CHANGE = - new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1); + private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1); - private final ParallelismChangeType changeType; private final int newParallelism; - private ParallelismChange(ParallelismChangeType changeType, int newParallelism) { - this.changeType = changeType; + private ParallelismChange(int newParallelism) { this.newParallelism = newParallelism; } + public boolean isNoChange() { + return this == NO_CHANGE; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -122,30 +113,24 @@ public boolean equals(Object o) { return false; } ParallelismChange that = (ParallelismChange) o; - return changeType == that.changeType && newParallelism == that.newParallelism; + return newParallelism == that.newParallelism; } @Override public int hashCode() { - return Objects.hash(changeType, newParallelism); + return Objects.hash(newParallelism); } @Override public String toString() { - return "ParallelismChange{" - + "changeType=" - + changeType - + ", newParallelism=" - + newParallelism - + '}'; + return isNoChange() + ? "NoParallelismChange" + : "ParallelismChange{newParallelism=" + newParallelism + '}'; } - public static ParallelismChange required(int newParallelism) { - return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism); - } - - public static ParallelismChange optional(int newParallelism) { - return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism); + public static ParallelismChange build(int newParallelism) { + checkArgument(newParallelism > 0, "The parallelism should be greater than 0."); + return new ParallelismChange(newParallelism); } public static ParallelismChange noChange() { @@ -263,7 +248,7 @@ private ParallelismChange detectBlockScaling( // If we don't have past scaling actions for this vertex, don't block scale up. if (history.isEmpty()) { - return ParallelismChange.required(newParallelism); + return ParallelismChange.build(newParallelism); } var lastSummary = history.get(history.lastKey()); @@ -275,7 +260,7 @@ && detectIneffectiveScaleUp( return ParallelismChange.noChange(); } - return ParallelismChange.required(newParallelism); + return ParallelismChange.build(newParallelism); } else { return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism); } @@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval( var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL); if (scaleDownInterval.toMillis() <= 0) { // The scale down interval is disable, so don't block scaling. - return ParallelismChange.required(newParallelism); - } - - var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex); - if (firstTriggerTime.isEmpty()) { - LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); - delayedScaleDown.updateTriggerTime(vertex, clock.instant()); - return ParallelismChange.optional(newParallelism); + return ParallelismChange.build(newParallelism); } - if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) { - LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex); - return ParallelismChange.optional(newParallelism); + var now = clock.instant(); + var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism); + + // Never scale down within scale down interval + if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) { + if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) { + LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval); + } else { + LOG.debug( + "Try to skip immediate scale down within scale-down interval for {}", + vertex); + } + return ParallelismChange.noChange(); } else { - return ParallelismChange.required(newParallelism); + // Using the maximum parallelism within the scale down interval window instead of the + // latest parallelism when scaling down + return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism()); } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index 02e5ad4f15..248fec711b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -45,13 +45,10 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.SortedMap; -import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE; -import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE; import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; @@ -181,15 +178,15 @@ private void updateRecommendedParallelism( } @VisibleForTesting - static boolean allRequiredVerticesWithinUtilizationTarget( + static boolean allChangedVerticesWithinUtilizationTarget( Map> evaluatedMetrics, - Set requiredVertices) { - // All vertices' ParallelismChange is optional, rescaling will be ignored. - if (requiredVertices.isEmpty()) { + Set changedVertices) { + // No vertices with changed parallelism. + if (changedVertices.isEmpty()) { return true; } - for (JobVertexID vertex : requiredVertices) { + for (JobVertexID vertex : changedVertices) { var metrics = evaluatedMetrics.get(vertex); double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage(); @@ -234,7 +231,6 @@ Map computeScalingSummary( } var out = new HashMap(); - var requiredVertices = new HashSet(); var excludeVertexIdList = context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); @@ -260,10 +256,8 @@ Map computeScalingSummary( v, Collections.emptySortedMap()), restartTime, delayedScaleDown); - if (NO_CHANGE == parallelismChange.getChangeType()) { + if (parallelismChange.isNoChange()) { return; - } else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) { - requiredVertices.add(v); } out.put( v, @@ -274,10 +268,9 @@ Map computeScalingSummary( } }); - // If the Utilization of all required tasks is within range, we can skip scaling. - // It means that if only optional tasks are out of scope, we still need to ignore scale. - if (allRequiredVerticesWithinUtilizationTarget( - evaluatedMetrics.getVertexMetrics(), requiredVertices)) { + // If the Utilization of all tasks is within range, we can skip scaling. + if (allChangedVerticesWithinUtilizationTarget( + evaluatedMetrics.getVertexMetrics(), out.keySet())) { return Map.of(); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java new file mode 100644 index 0000000000..f27ad5b0ab --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DelayedScaleDown}. */ +public class DelayedScaleDownTest { + + private final JobVertexID vertex = new JobVertexID(); + + @Test + void testTriggerUpdateAndClean() { + var instant = Instant.now(); + var delayedScaleDown = new DelayedScaleDown(); + assertThat(delayedScaleDown.isUpdated()).isFalse(); + + // First trigger time as the trigger time, and it won't be updated. + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5); + assertThat(delayedScaleDown.isUpdated()).isTrue(); + + // The lower parallelism doesn't update the result + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3), instant, 5); + + // The higher parallelism will update the result + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8), instant, 8); + + // The scale down could be re-triggered again after clean + delayedScaleDown.clearVertex(vertex); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4), + instant.plusSeconds(15), + 4); + + // The scale down could be re-triggered again after cleanAll + delayedScaleDown.clearAll(); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); + assertVertexDelayedScaleDownInfo( + delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 2), + instant.plusSeconds(15), + 2); + } + + void assertVertexDelayedScaleDownInfo( + DelayedScaleDown.VertexDelayedScaleDownInfo vertexDelayedScaleDownInfo, + Instant expectedTriggerTime, + int expectedMaxRecommendedParallelism) { + assertThat(vertexDelayedScaleDownInfo.getFirstTriggerTime()).isEqualTo(expectedTriggerTime); + assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) + .isEqualTo(expectedMaxRecommendedParallelism); + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 6b6c3b15ee..3c557ab397 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -99,10 +99,11 @@ public void setup() { public void testParallelismScaling(Collection inputShipStrategies) { var op = new JobVertexID(); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.optional(5), + ParallelismChange.build(5), vertexScaler.computeScaleTargetParallelism( context, op, @@ -114,7 +115,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( - ParallelismChange.optional(8), + ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( context, op, @@ -138,7 +139,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( - ParallelismChange.optional(8), + ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( context, op, @@ -149,7 +150,7 @@ public void testParallelismScaling(Collection inputShipStrategies) delayedScaleDown)); assertEquals( - ParallelismChange.optional(8), + ParallelismChange.build(8), vertexScaler.computeScaleTargetParallelism( context, op, @@ -161,7 +162,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, op, @@ -173,7 +174,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); assertEquals( - ParallelismChange.required(4), + ParallelismChange.build(4), vertexScaler.computeScaleTargetParallelism( context, op, @@ -186,7 +187,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( - ParallelismChange.optional(5), + ParallelismChange.build(5), vertexScaler.computeScaleTargetParallelism( context, op, @@ -198,7 +199,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6); assertEquals( - ParallelismChange.optional(4), + ParallelismChange.build(4), vertexScaler.computeScaleTargetParallelism( context, op, @@ -211,7 +212,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5); assertEquals( - ParallelismChange.required(15), + ParallelismChange.build(15), vertexScaler.computeScaleTargetParallelism( context, op, @@ -223,7 +224,7 @@ public void testParallelismScaling(Collection inputShipStrategies) conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6); assertEquals( - ParallelismChange.required(16), + ParallelismChange.build(16), vertexScaler.computeScaleTargetParallelism( context, op, @@ -527,10 +528,11 @@ public void ensureMinParallelismDoesNotExceedMax() { @Test public void testMinParallelismLimitIsUsed() { conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5); + conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.optional(5), + ParallelismChange.build(5), vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), @@ -590,46 +592,39 @@ public void testDisableScaleDownInterval() { var delayedScaleDown = new DelayedScaleDown(); - assertParallelismChange(10, 50, 100, ParallelismChange.required(5), delayedScaleDown); + assertParallelismChange(10, 50, 100, ParallelismChange.build(5), delayedScaleDown); } @Test - public void testRequiredScaleDownAfterInterval() { + public void testScaleDownAfterInterval() { conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1)); var instant = Instant.now(); var delayedScaleDown = new DelayedScaleDown(); - // The scale down shouldn't be required. + // The scale down never happen when scale down is first triggered. vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault())); - assertParallelismChange(100, 800, 1000, ParallelismChange.optional(80), delayedScaleDown); + assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), delayedScaleDown); - // Within scale down interval. + // The scale down never happen within scale down interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(10)), ZoneId.systemDefault())); - assertParallelismChange(100, 900, 1000, ParallelismChange.optional(90), delayedScaleDown); - - vertexScaler.setClock( - Clock.fixed(instant.plus(Duration.ofSeconds(20)), ZoneId.systemDefault())); - assertParallelismChange(100, 820, 1000, ParallelismChange.optional(82), delayedScaleDown); + assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), delayedScaleDown); vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(40)), ZoneId.systemDefault())); - assertParallelismChange(100, 720, 1000, ParallelismChange.optional(72), delayedScaleDown); - - vertexScaler.setClock( - Clock.fixed(instant.plus(Duration.ofSeconds(50)), ZoneId.systemDefault())); - assertParallelismChange(100, 600, 1000, ParallelismChange.optional(60), delayedScaleDown); + assertParallelismChange(100, 720, 1000, ParallelismChange.noChange(), delayedScaleDown); vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(59)), ZoneId.systemDefault())); - assertParallelismChange(100, 640, 1000, ParallelismChange.optional(64), delayedScaleDown); + assertParallelismChange(100, 640, 1000, ParallelismChange.noChange(), delayedScaleDown); - // The scale down is required after the scale down interval ends. + // The parallelism result should be the max recommended parallelism within the scale down + // interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(60)), ZoneId.systemDefault())); - assertParallelismChange(100, 700, 1000, ParallelismChange.required(70), delayedScaleDown); + assertParallelismChange(100, 700, 1000, ParallelismChange.build(90), delayedScaleDown); } @Test @@ -640,22 +635,22 @@ public void testImmediateScaleUpWithinScaleDownInterval() { var delayedScaleDown = new DelayedScaleDown(); - // The scale down shouldn't be required. + // The scale down never happen when scale down is first triggered. vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault())); - assertParallelismChange(100, 800, 1000, ParallelismChange.optional(80), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); - // Within scale down interval. + // The scale down never happen within scale down interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(10)), ZoneId.systemDefault())); - assertParallelismChange(100, 900, 1000, ParallelismChange.optional(90), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); // Allow immediate scale up vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(12)), ZoneId.systemDefault())); - assertParallelismChange(100, 1700, 1000, ParallelismChange.required(170), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty(); + assertParallelismChange(100, 1700, 1000, ParallelismChange.build(170), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); } @Test @@ -666,22 +661,22 @@ public void testCancelDelayedScaleDownAfterNewParallelismIsSame() { var delayedScaleDown = new DelayedScaleDown(); - // The scale down shouldn't be required. + // The scale down never happen when scale down is first triggered. vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault())); - assertParallelismChange(100, 800, 1000, ParallelismChange.optional(80), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); - // Within scale down interval. + // The scale down never happen within scale down interval. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(10)), ZoneId.systemDefault())); - assertParallelismChange(100, 900, 1000, ParallelismChange.optional(90), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isNotEmpty(); + assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), delayedScaleDown); + assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty(); // The delayed scale down is canceled when new parallelism is same with current parallelism. vertexScaler.setClock( Clock.fixed(instant.plus(Duration.ofSeconds(12)), ZoneId.systemDefault())); assertParallelismChange(100, 1000, 1000, ParallelismChange.noChange(), delayedScaleDown); - assertThat(delayedScaleDown.getFirstTriggerTime()).isEmpty(); + assertThat(delayedScaleDown.getDelayedVertices()).isEmpty(); } private void assertParallelismChange( @@ -714,7 +709,7 @@ public void testIneffectiveScalingDetection() { var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, op, @@ -729,7 +724,7 @@ public void testIneffectiveScalingDetection() { // Allow to scale higher if scaling was effective (80%) evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.required(20), + ParallelismChange.build(20), vertexScaler.computeScaleTargetParallelism( context, op, @@ -771,7 +766,7 @@ public void testIneffectiveScalingDetection() { // Allow scale up if current parallelism doesnt match last (user rescaled manually) evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.required(20), + ParallelismChange.build(20), vertexScaler.computeScaleTargetParallelism( context, op, @@ -784,7 +779,7 @@ public void testIneffectiveScalingDetection() { // Over 10%, effective evaluated = evaluated(20, 180, 100); assertEquals( - ParallelismChange.required(36), + ParallelismChange.build(36), vertexScaler.computeScaleTargetParallelism( context, op, @@ -799,7 +794,7 @@ public void testIneffectiveScalingDetection() { conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); evaluated = evaluated(20, 180, 90); assertEquals( - ParallelismChange.required(40), + ParallelismChange.build(40), vertexScaler.computeScaleTargetParallelism( context, op, @@ -814,7 +809,7 @@ public void testIneffectiveScalingDetection() { // Allow scale down even if ineffective evaluated = evaluated(20, 45, 90); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, op, @@ -839,7 +834,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh var delayedScaleDown = new DelayedScaleDown(); assertEquals( - ParallelismChange.required(10), + ParallelismChange.build(10), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -854,7 +849,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh // Effective scale, no events triggered evaluated = evaluated(10, 180, 90); assertEquals( - ParallelismChange.required(20), + ParallelismChange.build(20), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -949,7 +944,7 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh // Test ineffective scaling switched off conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); assertEquals( - ParallelismChange.required(40), + ParallelismChange.build(40), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -1096,7 +1091,7 @@ public void testSendingScalingLimitedEvents() { var delayedScaleDown = new DelayedScaleDown(); // partition limited assertEquals( - ParallelismChange.required(15), + ParallelismChange.build(15), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, @@ -1119,7 +1114,7 @@ public void testSendingScalingLimitedEvents() { smallChangesForScaleFactor.put( ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15)); assertEquals( - ParallelismChange.required(15), + ParallelismChange.build(15), vertexScaler.computeScaleTargetParallelism( context, jobVertexID, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 3eeac97b06..d3fc167492 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -133,13 +133,13 @@ public void testUtilizationBoundariesForAllRequiredVertices() throws Exception { var evaluated = Map.of(op1, evaluated(1, 70, 100)); assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2); evaluated = Map.of(op1, evaluated(1, 70, 100)); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); assertTrue(getScaledParallelism(stateStore, context).isEmpty()); @@ -150,7 +150,7 @@ op1, evaluated(1, 70, 100), op2, evaluated(1, 85, 100)); assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); evaluated = @@ -158,13 +158,13 @@ op1, evaluated(1, 70, 100), op1, evaluated(1, 70, 100), op2, evaluated(1, 70, 100)); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); // Test with backlog based scaling evaluated = Map.of(op1, evaluated(1, 70, 100, 15)); assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated, evaluated.keySet())); } @@ -186,11 +186,11 @@ public void testUtilizationBoundariesWithOptionalVertex() { op1, evaluated(1, 70, 100), op2, evaluated(1, 85, 100)); - assertTrue(ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of())); + assertTrue(ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of())); // One vertex is required, and it's out of range. assertFalse( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); // One vertex is required, and it's within the range. // The op2 is optional, so it shouldn't affect the scaling even if it is out of range, @@ -200,7 +200,7 @@ op1, evaluated(1, 70, 100), op1, evaluated(1, 65, 100), op2, evaluated(1, 85, 100)); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); + ScalingExecutor.allChangedVerticesWithinUtilizationTarget(evaluated, Set.of(op1))); } @Test @@ -225,7 +225,7 @@ public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception { dummyGlobalMetrics); assertTrue( - ScalingExecutor.allRequiredVerticesWithinUtilizationTarget( + ScalingExecutor.allChangedVerticesWithinUtilizationTarget( evaluated.getVertexMetrics(), evaluated.getVertexMetrics().keySet())); // Execute the full scaling path diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java index 3dff84060a..d4e0db9aa3 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java @@ -184,7 +184,7 @@ protected void testDiscardAllState() throws Exception { assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty(); assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty(); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty(); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()).isEmpty(); stateStore.storeCollectedMetrics( ctx, new TreeMap<>(Map.of(Instant.now(), new CollectedMetrics()))); @@ -206,16 +206,19 @@ protected void testDiscardAllState() throws Exception { Instant.now().minus(Duration.ofHours(1)), new ScalingRecord()); stateStore.storeScalingTracking(ctx, scalingTracking); - var firstTriggerTime = Map.of(new JobVertexID(), Instant.now()); - stateStore.storeDelayedScaleDown(ctx, new DelayedScaleDown(firstTriggerTime)); + var delayedScaleDown = new DelayedScaleDown(); + delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10); + delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now().plusSeconds(10), 12); + + stateStore.storeDelayedScaleDown(ctx, delayedScaleDown); assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty(); assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()) - .isEqualTo(firstTriggerTime); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()) + .isEqualTo(delayedScaleDown.getDelayedVertices()); stateStore.flush(ctx); @@ -224,8 +227,8 @@ protected void testDiscardAllState() throws Exception { assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty(); assertThat(stateStore.getScalingTracking(ctx)).isEqualTo(scalingTracking); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()) - .isEqualTo(firstTriggerTime); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()) + .isEqualTo(delayedScaleDown.getDelayedVertices()); stateStore.clearAll(ctx); @@ -234,6 +237,6 @@ protected void testDiscardAllState() throws Exception { assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty(); assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty(); assertThat(stateStore.getScalingTracking(ctx).getScalingRecords()).isEmpty(); - assertThat(stateStore.getDelayedScaleDown(ctx).getFirstTriggerTime()).isEmpty(); + assertThat(stateStore.getDelayedScaleDown(ctx).getDelayedVertices()).isEmpty(); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java index afd247dc77..1059b96ab0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java @@ -247,7 +247,7 @@ public DelayedScaleDown getDelayedScaleDown(KubernetesJobAutoScalerContext jobCo try { return deserializeDelayedScaleDown(delayedScaleDown.get()); } catch (JacksonException e) { - LOG.error( + LOG.warn( "Could not deserialize delayed scale down, possibly the format changed. Discarding...", e); configMapStore.removeSerializedState(jobContext, DELAYED_SCALE_DOWN); @@ -334,14 +334,12 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides) private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown) throws JacksonException { - return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime()); + return YAML_MAPPER.writeValueAsString(delayedScaleDown); } private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown) throws JacksonException { - Map firstTriggerTime = - YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); - return new DelayedScaleDown(firstTriggerTime); + return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); } @VisibleForTesting