Skip to content

Commit d9e8cce

Browse files
committed
[FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency
1. Using the maximum parallelism within the window instead of the latest parallelism when scaling down 2. Never scale down when (currentTime - triggerTime) < scale-down.interval
1 parent 9bab028 commit d9e8cce

File tree

9 files changed

+245
-161
lines changed

9 files changed

+245
-161
lines changed

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
237237
try {
238238
return deserializeDelayedScaleDown(delayedScaleDown.get());
239239
} catch (JacksonException e) {
240-
LOG.error(
240+
LOG.warn(
241241
"Could not deserialize delayed scale down, possibly the format changed. Discarding...",
242242
e);
243243
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN);
@@ -330,13 +330,11 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides)
330330

331331
private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown)
332332
throws JacksonException {
333-
return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
333+
return YAML_MAPPER.writeValueAsString(delayedScaleDown);
334334
}
335335

336336
private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown)
337337
throws JacksonException {
338-
Map<JobVertexID, Instant> firstTriggerTime =
339-
YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
340-
return new DelayedScaleDown(firstTriggerTime);
338+
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
341339
}
342340
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,81 @@
1919

2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
2121

22+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
23+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
25+
26+
import lombok.Data;
2227
import lombok.Getter;
2328

29+
import javax.annotation.Nonnull;
30+
2431
import java.time.Instant;
2532
import java.util.HashMap;
2633
import java.util.Map;
27-
import java.util.Optional;
2834

2935
/** All delayed scale down requests. */
3036
public class DelayedScaleDown {
3137

32-
@Getter private final Map<JobVertexID, Instant> firstTriggerTime;
38+
/** The delayed scale down info for vertex. */
39+
@Data
40+
public static class VertexDelayedScaleDownInfo {
41+
private final Instant firstTriggerTime;
42+
private int maxRecommendedParallelism;
43+
44+
@JsonCreator
45+
public VertexDelayedScaleDownInfo(
46+
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
47+
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
48+
this.firstTriggerTime = firstTriggerTime;
49+
this.maxRecommendedParallelism = maxRecommendedParallelism;
50+
}
51+
}
52+
53+
@Getter private final Map<JobVertexID, VertexDelayedScaleDownInfo> delayedVertices;
3354

3455
// Have any scale down request been updated? It doesn't need to be stored, it is only used to
3556
// determine whether DelayedScaleDown needs to be stored.
36-
@Getter private boolean isUpdated = false;
57+
@JsonIgnore @Getter private boolean updated = false;
3758

3859
public DelayedScaleDown() {
39-
this.firstTriggerTime = new HashMap<>();
40-
}
41-
42-
public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
43-
this.firstTriggerTime = firstTriggerTime;
60+
this.delayedVertices = new HashMap<>();
4461
}
4562

46-
Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
47-
return Optional.ofNullable(firstTriggerTime.get(vertex));
48-
}
63+
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
64+
@Nonnull
65+
public VertexDelayedScaleDownInfo triggerScaleDown(
66+
JobVertexID vertex, Instant triggerTime, int parallelism) {
67+
var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
68+
if (vertexDelayedScaleDownInfo == null) {
69+
// It's the first trigger
70+
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
71+
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
72+
updated = true;
73+
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
74+
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
75+
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
76+
updated = true;
77+
}
4978

50-
void updateTriggerTime(JobVertexID vertex, Instant instant) {
51-
firstTriggerTime.put(vertex, instant);
52-
isUpdated = true;
79+
return vertexDelayedScaleDownInfo;
5380
}
5481

82+
// Clear the delayed scale down for corresponding vertex when the recommended parallelism is
83+
// greater than or equal to the currentParallelism.
5584
void clearVertex(JobVertexID vertex) {
56-
Instant removed = firstTriggerTime.remove(vertex);
85+
VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex);
5786
if (removed != null) {
58-
isUpdated = true;
87+
updated = true;
5988
}
6089
}
6190

91+
// Clear all delayed scale down when rescale happens.
6292
void clearAll() {
63-
if (firstTriggerTime.isEmpty()) {
93+
if (delayedVertices.isEmpty()) {
6494
return;
6595
}
66-
firstTriggerTime.clear();
67-
isUpdated = true;
96+
delayedVertices.clear();
97+
updated = true;
6898
}
6999
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
8888
this.autoScalerEventHandler = autoScalerEventHandler;
8989
}
9090

91-
/** The parallelism change type of {@link ParallelismChange}. */
92-
public enum ParallelismChangeType {
93-
NO_CHANGE,
94-
REQUIRED_CHANGE,
95-
OPTIONAL_CHANGE;
96-
}
97-
98-
/**
99-
* The rescaling will be triggered if any vertex's ParallelismChange is required. This means
100-
* that if all vertices' ParallelismChange is optional, rescaling will be ignored.
101-
*/
91+
/** The rescaling will be triggered if any vertex's {@link ParallelismChange} is changed. */
10292
@Getter
10393
public static class ParallelismChange {
10494

105-
private static final ParallelismChange NO_CHANGE =
106-
new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);
95+
private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1);
10796

108-
private final ParallelismChangeType changeType;
10997
private final int newParallelism;
11098

111-
private ParallelismChange(ParallelismChangeType changeType, int newParallelism) {
112-
this.changeType = changeType;
99+
private ParallelismChange(int newParallelism) {
113100
this.newParallelism = newParallelism;
114101
}
115102

103+
public boolean isNoChange() {
104+
return this == NO_CHANGE;
105+
}
106+
116107
@Override
117108
public boolean equals(Object o) {
118109
if (this == o) {
@@ -122,30 +113,24 @@ public boolean equals(Object o) {
122113
return false;
123114
}
124115
ParallelismChange that = (ParallelismChange) o;
125-
return changeType == that.changeType && newParallelism == that.newParallelism;
116+
return newParallelism == that.newParallelism;
126117
}
127118

128119
@Override
129120
public int hashCode() {
130-
return Objects.hash(changeType, newParallelism);
121+
return Objects.hash(newParallelism);
131122
}
132123

133124
@Override
134125
public String toString() {
135-
return "ParallelismChange{"
136-
+ "changeType="
137-
+ changeType
138-
+ ", newParallelism="
139-
+ newParallelism
140-
+ '}';
126+
return isNoChange()
127+
? "NoParallelismChange"
128+
: "ParallelismChange{newParallelism=" + newParallelism + '}';
141129
}
142130

143-
public static ParallelismChange required(int newParallelism) {
144-
return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
145-
}
146-
147-
public static ParallelismChange optional(int newParallelism) {
148-
return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
131+
public static ParallelismChange build(int newParallelism) {
132+
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
133+
return new ParallelismChange(newParallelism);
149134
}
150135

151136
public static ParallelismChange noChange() {
@@ -263,7 +248,7 @@ private ParallelismChange detectBlockScaling(
263248

264249
// If we don't have past scaling actions for this vertex, don't block scale up.
265250
if (history.isEmpty()) {
266-
return ParallelismChange.required(newParallelism);
251+
return ParallelismChange.build(newParallelism);
267252
}
268253

269254
var lastSummary = history.get(history.lastKey());
@@ -275,7 +260,7 @@ && detectIneffectiveScaleUp(
275260
return ParallelismChange.noChange();
276261
}
277262

278-
return ParallelismChange.required(newParallelism);
263+
return ParallelismChange.build(newParallelism);
279264
} else {
280265
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
281266
}
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
289274
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
290275
if (scaleDownInterval.toMillis() <= 0) {
291276
// The scale down interval is disable, so don't block scaling.
292-
return ParallelismChange.required(newParallelism);
293-
}
294-
295-
var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
296-
if (firstTriggerTime.isEmpty()) {
297-
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
298-
delayedScaleDown.updateTriggerTime(vertex, clock.instant());
299-
return ParallelismChange.optional(newParallelism);
277+
return ParallelismChange.build(newParallelism);
300278
}
301279

302-
if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
303-
LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex);
304-
return ParallelismChange.optional(newParallelism);
280+
var now = clock.instant();
281+
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
282+
283+
// Never scale down within scale down interval
284+
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
285+
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
286+
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
287+
} else {
288+
LOG.debug(
289+
"Try to skip immediate scale down within scale-down interval for {}",
290+
vertex);
291+
}
292+
return ParallelismChange.noChange();
305293
} else {
306-
return ParallelismChange.required(newParallelism);
294+
// Using the maximum parallelism within the scale down interval window instead of the
295+
// latest parallelism when scaling down
296+
return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
307297
}
308298
}
309299

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,10 @@
4545
import java.time.Instant;
4646
import java.util.Collections;
4747
import java.util.HashMap;
48-
import java.util.HashSet;
4948
import java.util.Map;
5049
import java.util.Set;
5150
import java.util.SortedMap;
5251

53-
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
54-
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
5552
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
5653
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5754
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
@@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
181178
}
182179

183180
@VisibleForTesting
184-
static boolean allRequiredVerticesWithinUtilizationTarget(
181+
static boolean allChangedVerticesWithinUtilizationTarget(
185182
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
186-
Set<JobVertexID> requiredVertices) {
187-
// All vertices' ParallelismChange is optional, rescaling will be ignored.
188-
if (requiredVertices.isEmpty()) {
183+
Set<JobVertexID> changedVertices) {
184+
// No vertices with changed parallelism.
185+
if (changedVertices.isEmpty()) {
189186
return true;
190187
}
191188

192-
for (JobVertexID vertex : requiredVertices) {
189+
for (JobVertexID vertex : changedVertices) {
193190
var metrics = evaluatedMetrics.get(vertex);
194191

195192
double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
@@ -234,7 +231,6 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
234231
}
235232

236233
var out = new HashMap<JobVertexID, ScalingSummary>();
237-
var requiredVertices = new HashSet<JobVertexID>();
238234

239235
var excludeVertexIdList =
240236
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
@@ -260,10 +256,8 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
260256
v, Collections.emptySortedMap()),
261257
restartTime,
262258
delayedScaleDown);
263-
if (NO_CHANGE == parallelismChange.getChangeType()) {
259+
if (parallelismChange.isNoChange()) {
264260
return;
265-
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
266-
requiredVertices.add(v);
267261
}
268262
out.put(
269263
v,
@@ -274,10 +268,9 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
274268
}
275269
});
276270

277-
// If the Utilization of all required tasks is within range, we can skip scaling.
278-
// It means that if only optional tasks are out of scope, we still need to ignore scale.
279-
if (allRequiredVerticesWithinUtilizationTarget(
280-
evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
271+
// If the Utilization of all tasks is within range, we can skip scaling.
272+
if (allChangedVerticesWithinUtilizationTarget(
273+
evaluatedMetrics.getVertexMetrics(), out.keySet())) {
281274
return Map.of();
282275
}
283276

0 commit comments

Comments
 (0)