Skip to content

Commit 968a578

Browse files
authored
[FLINK-36018][autoscaler] Support lazy scale down to avoid frequent rescaling (#875)
1 parent d8568ae commit 968a578

File tree

18 files changed

+802
-230
lines changed

18 files changed

+802
-230
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,18 +146,18 @@
146146
<td>Duration</td>
147147
<td>Maximum cap for the observed restart time when 'job.autoscaler.restart.time-tracking.enabled' is set to true.</td>
148148
</tr>
149+
<tr>
150+
<td><h5>job.autoscaler.scale-down.interval</h5></td>
151+
<td style="word-wrap: break-word;">1 h</td>
152+
<td>Duration</td>
153+
<td>The delay time for scale down to be executed. If it is greater than 0, the scale down will be delayed. Delayed rescale can merge multiple scale downs within `scale-down.interval` into a scale down, thereby reducing the number of rescales. Reducing the frequency of job restarts can improve job availability. Scale down can be executed directly if it's less than or equal 0.</td>
154+
</tr>
149155
<tr>
150156
<td><h5>job.autoscaler.scale-down.max-factor</h5></td>
151157
<td style="word-wrap: break-word;">0.6</td>
152158
<td>Double</td>
153159
<td>Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.</td>
154160
</tr>
155-
<tr>
156-
<td><h5>job.autoscaler.scale-up.grace-period</h5></td>
157-
<td style="word-wrap: break-word;">1 h</td>
158-
<td>Duration</td>
159-
<td>Duration in which no scale down of a vertex is allowed after it has been scaled up.</td>
160-
</tr>
161161
<tr>
162162
<td><h5>job.autoscaler.scale-up.max-factor</h5></td>
163163
<td style="word-wrap: break-word;">100000.0</td>

e2e-tests/data/autoscaler.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ spec:
4040
job.autoscaler.scaling.enabled: "true"
4141
job.autoscaler.stabilization.interval: "5s"
4242
job.autoscaler.metrics.window: "1m"
43+
job.autoscaler.scale-down.interval: "0m"
4344

4445
# Invalid Validations for testing autoscaler configurations
4546
# kubernetes.operator.job.autoscaler.scale-down.max-factor: "-0.6"

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.annotation.Experimental;
2121
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.autoscaler.DelayedScaleDown;
2223
import org.apache.flink.autoscaler.JobAutoScalerContext;
2324
import org.apache.flink.autoscaler.ScalingSummary;
2425
import org.apache.flink.autoscaler.ScalingTracking;
@@ -49,6 +50,7 @@
4950

5051
import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
5152
import static org.apache.flink.autoscaler.jdbc.state.StateType.CONFIG_OVERRIDES;
53+
import static org.apache.flink.autoscaler.jdbc.state.StateType.DELAYED_SCALE_DOWN;
5254
import static org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES;
5355
import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
5456
import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
@@ -214,6 +216,35 @@ public void removeConfigChanges(Context jobContext) {
214216
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), CONFIG_OVERRIDES);
215217
}
216218

219+
@Override
220+
public void storeDelayedScaleDown(Context jobContext, DelayedScaleDown delayedScaleDown)
221+
throws Exception {
222+
jdbcStateStore.putSerializedState(
223+
getSerializeKey(jobContext),
224+
DELAYED_SCALE_DOWN,
225+
serializeDelayedScaleDown(delayedScaleDown));
226+
}
227+
228+
@Nonnull
229+
@Override
230+
public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
231+
Optional<String> delayedScaleDown =
232+
jdbcStateStore.getSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN);
233+
if (delayedScaleDown.isEmpty()) {
234+
return new DelayedScaleDown();
235+
}
236+
237+
try {
238+
return deserializeDelayedScaleDown(delayedScaleDown.get());
239+
} catch (JacksonException e) {
240+
LOG.error(
241+
"Could not deserialize delayed scale down, possibly the format changed. Discarding...",
242+
e);
243+
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN);
244+
return new DelayedScaleDown();
245+
}
246+
}
247+
217248
@Override
218249
public void clearAll(Context jobContext) {
219250
jdbcStateStore.clearAll(getSerializeKey(jobContext));
@@ -296,4 +327,16 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides)
296327
return null;
297328
}
298329
}
330+
331+
private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown)
332+
throws JacksonException {
333+
return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
334+
}
335+
336+
private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown)
337+
throws JacksonException {
338+
Map<JobVertexID, Instant> firstTriggerTime =
339+
YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
340+
return new DelayedScaleDown(firstTriggerTime);
341+
}
299342
}

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public enum StateType {
2727
SCALING_TRACKING("scalingTracking"),
2828
COLLECTED_METRICS("collectedMetrics"),
2929
PARALLELISM_OVERRIDES("parallelismOverrides"),
30-
CONFIG_OVERRIDES("configOverrides");
30+
CONFIG_OVERRIDES("configOverrides"),
31+
DELAYED_SCALE_DOWN("delayedScaleDown");
3132

3233
/**
3334
* The identifier of each state type, it will be used to store. Please ensure the identifier is
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler;
19+
20+
import org.apache.flink.runtime.jobgraph.JobVertexID;
21+
22+
import lombok.Getter;
23+
24+
import java.time.Instant;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Optional;
28+
29+
/** All delayed scale down requests. */
30+
public class DelayedScaleDown {
31+
32+
@Getter private final Map<JobVertexID, Instant> firstTriggerTime;
33+
34+
// Have any scale down request been updated? It doesn't need to be stored, it is only used to
35+
// determine whether DelayedScaleDown needs to be stored.
36+
@Getter private boolean isUpdated = false;
37+
38+
public DelayedScaleDown() {
39+
this.firstTriggerTime = new HashMap<>();
40+
}
41+
42+
public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
43+
this.firstTriggerTime = firstTriggerTime;
44+
}
45+
46+
Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
47+
return Optional.ofNullable(firstTriggerTime.get(vertex));
48+
}
49+
50+
void updateTriggerTime(JobVertexID vertex, Instant instant) {
51+
firstTriggerTime.put(vertex, instant);
52+
isUpdated = true;
53+
}
54+
55+
void clearVertex(JobVertexID vertex) {
56+
Instant removed = firstTriggerTime.remove(vertex);
57+
if (removed != null) {
58+
isUpdated = true;
59+
}
60+
}
61+
62+
void clearAll() {
63+
if (firstTriggerTime.isEmpty()) {
64+
return;
65+
}
66+
firstTriggerTime.clear();
67+
isUpdated = true;
68+
}
69+
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,20 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri
214214
return;
215215
}
216216

217+
var delayedScaleDown = stateStore.getDelayedScaleDown(ctx);
217218
var parallelismChanged =
218219
scalingExecutor.scaleResource(
219-
ctx, evaluatedMetrics, scalingHistory, scalingTracking, now, jobTopology);
220+
ctx,
221+
evaluatedMetrics,
222+
scalingHistory,
223+
scalingTracking,
224+
now,
225+
jobTopology,
226+
delayedScaleDown);
227+
228+
if (delayedScaleDown.isUpdated()) {
229+
stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
230+
}
220231

221232
if (parallelismChanged) {
222233
autoscalerMetrics.incrementScaling();

0 commit comments

Comments
 (0)