Skip to content

Commit 5a825f0

Browse files
committed
[FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency
1. Using the maximum parallelism within the window instead of the latest parallelism when scaling down 2. Never scale down when (currentTime - triggerTime) < scale-down.interval
1 parent 5d29554 commit 5a825f0

File tree

9 files changed

+261
-179
lines changed

9 files changed

+261
-179
lines changed

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
237237
try {
238238
return deserializeDelayedScaleDown(delayedScaleDown.get());
239239
} catch (JacksonException e) {
240-
LOG.error(
240+
LOG.warn(
241241
"Could not deserialize delayed scale down, possibly the format changed. Discarding...",
242242
e);
243243
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN);
@@ -330,13 +330,11 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides)
330330

331331
private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown)
332332
throws JacksonException {
333-
return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
333+
return YAML_MAPPER.writeValueAsString(delayedScaleDown);
334334
}
335335

336336
private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown)
337337
throws JacksonException {
338-
Map<JobVertexID, Instant> firstTriggerTime =
339-
YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
340-
return new DelayedScaleDown(firstTriggerTime);
338+
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
341339
}
342340
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,75 @@
1919

2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
2121

22+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
23+
24+
import lombok.AllArgsConstructor;
25+
import lombok.Data;
2226
import lombok.Getter;
27+
import lombok.NoArgsConstructor;
28+
29+
import javax.annotation.Nonnull;
2330

2431
import java.time.Instant;
2532
import java.util.HashMap;
2633
import java.util.Map;
27-
import java.util.Optional;
2834

2935
/** All delayed scale down requests. */
3036
public class DelayedScaleDown {
3137

32-
@Getter private final Map<JobVertexID, Instant> firstTriggerTime;
38+
/** The delayed scale down info for vertex. */
39+
@Data
40+
@NoArgsConstructor
41+
@AllArgsConstructor
42+
public static class VertexDelayedScaleDownInfo {
43+
private Instant firstTriggerTime;
44+
private int maxRecommendedParallelism;
45+
}
46+
47+
@Getter private final Map<JobVertexID, VertexDelayedScaleDownInfo> delayedVertices;
3348

3449
// Have any scale down request been updated? It doesn't need to be stored, it is only used to
3550
// determine whether DelayedScaleDown needs to be stored.
36-
@Getter private boolean isUpdated = false;
51+
@JsonIgnore @Getter private boolean updated = false;
3752

3853
public DelayedScaleDown() {
39-
this.firstTriggerTime = new HashMap<>();
54+
this.delayedVertices = new HashMap<>();
4055
}
4156

42-
public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
43-
this.firstTriggerTime = firstTriggerTime;
44-
}
45-
46-
Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
47-
return Optional.ofNullable(firstTriggerTime.get(vertex));
48-
}
57+
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
58+
@Nonnull
59+
public VertexDelayedScaleDownInfo triggerScaleDown(
60+
JobVertexID vertex, Instant triggerTime, int parallelism) {
61+
var vertexInfo = delayedVertices.get(vertex);
62+
if (vertexInfo == null) {
63+
// It's the first trigger
64+
vertexInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
65+
delayedVertices.put(vertex, vertexInfo);
66+
updated = true;
67+
} else if (parallelism > vertexInfo.getMaxRecommendedParallelism()) {
68+
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
69+
vertexInfo.setMaxRecommendedParallelism(parallelism);
70+
updated = true;
71+
}
4972

50-
void updateTriggerTime(JobVertexID vertex, Instant instant) {
51-
firstTriggerTime.put(vertex, instant);
52-
isUpdated = true;
73+
return vertexInfo;
5374
}
5475

76+
// Clear the delayed scale down for corresponding vertex when the recommended parallelism is
77+
// greater than or equal to the currentParallelism.
5578
void clearVertex(JobVertexID vertex) {
56-
Instant removed = firstTriggerTime.remove(vertex);
79+
VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex);
5780
if (removed != null) {
58-
isUpdated = true;
81+
updated = true;
5982
}
6083
}
6184

85+
// Clear all delayed scale down when rescale happens.
6286
void clearAll() {
63-
if (firstTriggerTime.isEmpty()) {
87+
if (delayedVertices.isEmpty()) {
6488
return;
6589
}
66-
firstTriggerTime.clear();
67-
isUpdated = true;
90+
delayedVertices.clear();
91+
updated = true;
6892
}
6993
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -88,31 +88,29 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
8888
this.autoScalerEventHandler = autoScalerEventHandler;
8989
}
9090

91-
/** The parallelism change type of {@link ParallelismChange}. */
91+
/** The parallelism change type of {@link ParallelismResult}. */
9292
public enum ParallelismChangeType {
9393
NO_CHANGE,
9494
REQUIRED_CHANGE,
9595
OPTIONAL_CHANGE;
9696
}
9797

98-
/**
99-
* The rescaling will be triggered if any vertex's ParallelismChange is required. This means
100-
* that if all vertices' ParallelismChange is optional, rescaling will be ignored.
101-
*/
98+
/** The rescaling will be triggered if any vertex's {@link ParallelismResult} is changed. */
10299
@Getter
103-
public static class ParallelismChange {
100+
public static class ParallelismResult {
104101

105-
private static final ParallelismChange NO_CHANGE =
106-
new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);
102+
private static final ParallelismResult NO_CHANGE = new ParallelismResult(-1);
107103

108-
private final ParallelismChangeType changeType;
109104
private final int newParallelism;
110105

111-
private ParallelismChange(ParallelismChangeType changeType, int newParallelism) {
112-
this.changeType = changeType;
106+
private ParallelismResult(int newParallelism) {
113107
this.newParallelism = newParallelism;
114108
}
115109

110+
public boolean isNoChange() {
111+
return this == NO_CHANGE;
112+
}
113+
116114
@Override
117115
public boolean equals(Object o) {
118116
if (this == o) {
@@ -121,39 +119,33 @@ public boolean equals(Object o) {
121119
if (o == null || getClass() != o.getClass()) {
122120
return false;
123121
}
124-
ParallelismChange that = (ParallelismChange) o;
125-
return changeType == that.changeType && newParallelism == that.newParallelism;
122+
ParallelismResult that = (ParallelismResult) o;
123+
return newParallelism == that.newParallelism;
126124
}
127125

128126
@Override
129127
public int hashCode() {
130-
return Objects.hash(changeType, newParallelism);
128+
return Objects.hash(newParallelism);
131129
}
132130

133131
@Override
134132
public String toString() {
135-
return "ParallelismChange{"
136-
+ "changeType="
137-
+ changeType
138-
+ ", newParallelism="
139-
+ newParallelism
140-
+ '}';
133+
return isNoChange()
134+
? "NoParallelismChange"
135+
: "ParallelismChange{newParallelism=" + newParallelism + '}';
141136
}
142137

143-
public static ParallelismChange required(int newParallelism) {
144-
return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
138+
public static ParallelismResult build(int newParallelism) {
139+
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
140+
return new ParallelismResult(newParallelism);
145141
}
146142

147-
public static ParallelismChange optional(int newParallelism) {
148-
return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
149-
}
150-
151-
public static ParallelismChange noChange() {
143+
public static ParallelismResult noChange() {
152144
return NO_CHANGE;
153145
}
154146
}
155147

156-
public ParallelismChange computeScaleTargetParallelism(
148+
public ParallelismResult computeScaleTargetParallelism(
157149
Context context,
158150
JobVertexID vertex,
159151
Collection<ShipStrategy> inputShipStrategies,
@@ -168,7 +160,7 @@ public ParallelismChange computeScaleTargetParallelism(
168160
LOG.warn(
169161
"True processing rate is not available for {}, cannot compute new parallelism",
170162
vertex);
171-
return ParallelismChange.noChange();
163+
return ParallelismResult.noChange();
172164
}
173165

174166
double targetCapacity =
@@ -178,7 +170,7 @@ public ParallelismChange computeScaleTargetParallelism(
178170
LOG.warn(
179171
"Target data rate is not available for {}, cannot compute new parallelism",
180172
vertex);
181-
return ParallelismChange.noChange();
173+
return ParallelismResult.noChange();
182174
}
183175

184176
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
@@ -222,7 +214,7 @@ public ParallelismChange computeScaleTargetParallelism(
222214
// Clear delayed scale down request if the new parallelism is equal to
223215
// currentParallelism.
224216
delayedScaleDown.clearVertex(vertex);
225-
return ParallelismChange.noChange();
217+
return ParallelismResult.noChange();
226218
}
227219

228220
// We record our expectations for this scaling operation
@@ -241,7 +233,7 @@ public ParallelismChange computeScaleTargetParallelism(
241233
delayedScaleDown);
242234
}
243235

244-
private ParallelismChange detectBlockScaling(
236+
private ParallelismResult detectBlockScaling(
245237
Context context,
246238
JobVertexID vertex,
247239
Configuration conf,
@@ -263,7 +255,7 @@ private ParallelismChange detectBlockScaling(
263255

264256
// If we don't have past scaling actions for this vertex, don't block scale up.
265257
if (history.isEmpty()) {
266-
return ParallelismChange.required(newParallelism);
258+
return ParallelismResult.build(newParallelism);
267259
}
268260

269261
var lastSummary = history.get(history.lastKey());
@@ -272,38 +264,40 @@ private ParallelismChange detectBlockScaling(
272264
&& detectIneffectiveScaleUp(
273265
context, vertex, conf, evaluatedMetrics, lastSummary)) {
274266
// Block scale up when last rescale is ineffective.
275-
return ParallelismChange.noChange();
267+
return ParallelismResult.noChange();
276268
}
277269

278-
return ParallelismChange.required(newParallelism);
270+
return ParallelismResult.build(newParallelism);
279271
} else {
280272
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
281273
}
282274
}
283275

284-
private ParallelismChange applyScaleDownInterval(
276+
private ParallelismResult applyScaleDownInterval(
285277
DelayedScaleDown delayedScaleDown,
286278
JobVertexID vertex,
287279
Configuration conf,
288280
int newParallelism) {
289281
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
290282
if (scaleDownInterval.toMillis() <= 0) {
291283
// The scale down interval is disable, so don't block scaling.
292-
return ParallelismChange.required(newParallelism);
284+
return ParallelismResult.build(newParallelism);
293285
}
294286

295-
var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
296-
if (firstTriggerTime.isEmpty()) {
297-
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
298-
delayedScaleDown.updateTriggerTime(vertex, clock.instant());
299-
return ParallelismChange.optional(newParallelism);
300-
}
287+
var now = clock.instant();
288+
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
301289

302-
if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
303-
LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex);
304-
return ParallelismChange.optional(newParallelism);
290+
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
291+
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
292+
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
293+
} else {
294+
LOG.debug(
295+
"Try to skip immediate scale down within scale-down interval for {}",
296+
vertex);
297+
}
298+
return ParallelismResult.noChange();
305299
} else {
306-
return ParallelismChange.required(newParallelism);
300+
return ParallelismResult.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
307301
}
308302
}
309303

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,10 @@
4545
import java.time.Instant;
4646
import java.util.Collections;
4747
import java.util.HashMap;
48-
import java.util.HashSet;
4948
import java.util.Map;
5049
import java.util.Set;
5150
import java.util.SortedMap;
5251

53-
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
54-
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
5552
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
5653
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5754
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
@@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
181178
}
182179

183180
@VisibleForTesting
184-
static boolean allRequiredVerticesWithinUtilizationTarget(
181+
static boolean allChangedVerticesWithinUtilizationTarget(
185182
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
186-
Set<JobVertexID> requiredVertices) {
187-
// All vertices' ParallelismChange is optional, rescaling will be ignored.
188-
if (requiredVertices.isEmpty()) {
183+
Set<JobVertexID> changedVertices) {
184+
// No any vertex is changed.
185+
if (changedVertices.isEmpty()) {
189186
return true;
190187
}
191188

192-
for (JobVertexID vertex : requiredVertices) {
189+
for (var vertex : changedVertices) {
193190
var metrics = evaluatedMetrics.get(vertex);
194191

195192
double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
@@ -234,7 +231,6 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
234231
}
235232

236233
var out = new HashMap<JobVertexID, ScalingSummary>();
237-
var requiredVertices = new HashSet<JobVertexID>();
238234

239235
var excludeVertexIdList =
240236
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
@@ -250,7 +246,7 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
250246
var currentParallelism =
251247
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
252248

253-
var parallelismChange =
249+
var parallelismResult =
254250
jobVertexScaler.computeScaleTargetParallelism(
255251
context,
256252
v,
@@ -260,24 +256,21 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
260256
v, Collections.emptySortedMap()),
261257
restartTime,
262258
delayedScaleDown);
263-
if (NO_CHANGE == parallelismChange.getChangeType()) {
259+
if (parallelismResult.isNoChange()) {
264260
return;
265-
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
266-
requiredVertices.add(v);
267261
}
268262
out.put(
269263
v,
270264
new ScalingSummary(
271265
currentParallelism,
272-
parallelismChange.getNewParallelism(),
266+
parallelismResult.getNewParallelism(),
273267
metrics));
274268
}
275269
});
276270

277-
// If the Utilization of all required tasks is within range, we can skip scaling.
278-
// It means that if only optional tasks are out of scope, we still need to ignore scale.
279-
if (allRequiredVerticesWithinUtilizationTarget(
280-
evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
271+
// If the Utilization of all tasks is within range, we can skip scaling.
272+
if (allChangedVerticesWithinUtilizationTarget(
273+
evaluatedMetrics.getVertexMetrics(), out.keySet())) {
281274
return Map.of();
282275
}
283276

0 commit comments

Comments
 (0)