Skip to content

Commit b9c61ae

Browse files
committed
[FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency
1. Using the maximum parallelism within the window instead of the latest parallelism when scaling down 2. Never scale down when (currentTime - triggerTime) < scale-down.interval
1 parent 5d29554 commit b9c61ae

File tree

9 files changed

+266
-185
lines changed

9 files changed

+266
-185
lines changed

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
237237
try {
238238
return deserializeDelayedScaleDown(delayedScaleDown.get());
239239
} catch (JacksonException e) {
240-
LOG.error(
240+
LOG.warn(
241241
"Could not deserialize delayed scale down, possibly the format changed. Discarding...",
242242
e);
243243
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN);
@@ -330,13 +330,11 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides)
330330

331331
private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown)
332332
throws JacksonException {
333-
return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
333+
return YAML_MAPPER.writeValueAsString(delayedScaleDown);
334334
}
335335

336336
private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown)
337337
throws JacksonException {
338-
Map<JobVertexID, Instant> firstTriggerTime =
339-
YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
340-
return new DelayedScaleDown(firstTriggerTime);
338+
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
341339
}
342340
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,81 @@
1919

2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
2121

22+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
23+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
25+
26+
import lombok.Data;
2227
import lombok.Getter;
2328

29+
import javax.annotation.Nonnull;
30+
2431
import java.time.Instant;
2532
import java.util.HashMap;
2633
import java.util.Map;
27-
import java.util.Optional;
2834

2935
/** All delayed scale down requests. */
3036
public class DelayedScaleDown {
3137

32-
@Getter private final Map<JobVertexID, Instant> firstTriggerTime;
38+
/** The delayed scale down info for vertex. */
39+
@Data
40+
public static class VertexDelayedScaleDownInfo {
41+
private final Instant firstTriggerTime;
42+
private int maxRecommendedParallelism;
43+
44+
@JsonCreator
45+
public VertexDelayedScaleDownInfo(
46+
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
47+
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
48+
this.firstTriggerTime = firstTriggerTime;
49+
this.maxRecommendedParallelism = maxRecommendedParallelism;
50+
}
51+
}
52+
53+
@Getter private final Map<JobVertexID, VertexDelayedScaleDownInfo> delayedVertices;
3354

3455
// Have any scale down request been updated? It doesn't need to be stored, it is only used to
3556
// determine whether DelayedScaleDown needs to be stored.
36-
@Getter private boolean isUpdated = false;
57+
@JsonIgnore @Getter private boolean updated = false;
3758

3859
public DelayedScaleDown() {
39-
this.firstTriggerTime = new HashMap<>();
40-
}
41-
42-
public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
43-
this.firstTriggerTime = firstTriggerTime;
60+
this.delayedVertices = new HashMap<>();
4461
}
4562

46-
Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
47-
return Optional.ofNullable(firstTriggerTime.get(vertex));
48-
}
63+
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
64+
@Nonnull
65+
public VertexDelayedScaleDownInfo triggerScaleDown(
66+
JobVertexID vertex, Instant triggerTime, int parallelism) {
67+
var vertexInfo = delayedVertices.get(vertex);
68+
if (vertexInfo == null) {
69+
// It's the first trigger
70+
vertexInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
71+
delayedVertices.put(vertex, vertexInfo);
72+
updated = true;
73+
} else if (parallelism > vertexInfo.getMaxRecommendedParallelism()) {
74+
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
75+
vertexInfo.setMaxRecommendedParallelism(parallelism);
76+
updated = true;
77+
}
4978

50-
void updateTriggerTime(JobVertexID vertex, Instant instant) {
51-
firstTriggerTime.put(vertex, instant);
52-
isUpdated = true;
79+
return vertexInfo;
5380
}
5481

82+
// Clear the delayed scale down for corresponding vertex when the recommended parallelism is
83+
// greater than or equal to the currentParallelism.
5584
void clearVertex(JobVertexID vertex) {
56-
Instant removed = firstTriggerTime.remove(vertex);
85+
VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex);
5786
if (removed != null) {
58-
isUpdated = true;
87+
updated = true;
5988
}
6089
}
6190

91+
// Clear all delayed scale down when rescale happens.
6292
void clearAll() {
63-
if (firstTriggerTime.isEmpty()) {
93+
if (delayedVertices.isEmpty()) {
6494
return;
6595
}
66-
firstTriggerTime.clear();
67-
isUpdated = true;
96+
delayedVertices.clear();
97+
updated = true;
6898
}
6999
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 40 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
8888
this.autoScalerEventHandler = autoScalerEventHandler;
8989
}
9090

91-
/** The parallelism change type of {@link ParallelismChange}. */
92-
public enum ParallelismChangeType {
93-
NO_CHANGE,
94-
REQUIRED_CHANGE,
95-
OPTIONAL_CHANGE;
96-
}
97-
98-
/**
99-
* The rescaling will be triggered if any vertex's ParallelismChange is required. This means
100-
* that if all vertices' ParallelismChange is optional, rescaling will be ignored.
101-
*/
91+
/** The rescaling will be triggered if any vertex's {@link ParallelismResult} is changed. */
10292
@Getter
103-
public static class ParallelismChange {
93+
public static class ParallelismResult {
10494

105-
private static final ParallelismChange NO_CHANGE =
106-
new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);
95+
private static final ParallelismResult NO_CHANGE = new ParallelismResult(-1);
10796

108-
private final ParallelismChangeType changeType;
10997
private final int newParallelism;
11098

111-
private ParallelismChange(ParallelismChangeType changeType, int newParallelism) {
112-
this.changeType = changeType;
99+
private ParallelismResult(int newParallelism) {
113100
this.newParallelism = newParallelism;
114101
}
115102

103+
public boolean isNoChange() {
104+
return this == NO_CHANGE;
105+
}
106+
116107
@Override
117108
public boolean equals(Object o) {
118109
if (this == o) {
@@ -121,39 +112,33 @@ public boolean equals(Object o) {
121112
if (o == null || getClass() != o.getClass()) {
122113
return false;
123114
}
124-
ParallelismChange that = (ParallelismChange) o;
125-
return changeType == that.changeType && newParallelism == that.newParallelism;
115+
ParallelismResult that = (ParallelismResult) o;
116+
return newParallelism == that.newParallelism;
126117
}
127118

128119
@Override
129120
public int hashCode() {
130-
return Objects.hash(changeType, newParallelism);
121+
return Objects.hash(newParallelism);
131122
}
132123

133124
@Override
134125
public String toString() {
135-
return "ParallelismChange{"
136-
+ "changeType="
137-
+ changeType
138-
+ ", newParallelism="
139-
+ newParallelism
140-
+ '}';
141-
}
142-
143-
public static ParallelismChange required(int newParallelism) {
144-
return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
126+
return isNoChange()
127+
? "NoParallelismChange"
128+
: "ParallelismChange{newParallelism=" + newParallelism + '}';
145129
}
146130

147-
public static ParallelismChange optional(int newParallelism) {
148-
return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
131+
public static ParallelismResult build(int newParallelism) {
132+
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
133+
return new ParallelismResult(newParallelism);
149134
}
150135

151-
public static ParallelismChange noChange() {
136+
public static ParallelismResult noChange() {
152137
return NO_CHANGE;
153138
}
154139
}
155140

156-
public ParallelismChange computeScaleTargetParallelism(
141+
public ParallelismResult computeScaleTargetParallelism(
157142
Context context,
158143
JobVertexID vertex,
159144
Collection<ShipStrategy> inputShipStrategies,
@@ -168,7 +153,7 @@ public ParallelismChange computeScaleTargetParallelism(
168153
LOG.warn(
169154
"True processing rate is not available for {}, cannot compute new parallelism",
170155
vertex);
171-
return ParallelismChange.noChange();
156+
return ParallelismResult.noChange();
172157
}
173158

174159
double targetCapacity =
@@ -178,7 +163,7 @@ public ParallelismChange computeScaleTargetParallelism(
178163
LOG.warn(
179164
"Target data rate is not available for {}, cannot compute new parallelism",
180165
vertex);
181-
return ParallelismChange.noChange();
166+
return ParallelismResult.noChange();
182167
}
183168

184169
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
@@ -222,7 +207,7 @@ public ParallelismChange computeScaleTargetParallelism(
222207
// Clear delayed scale down request if the new parallelism is equal to
223208
// currentParallelism.
224209
delayedScaleDown.clearVertex(vertex);
225-
return ParallelismChange.noChange();
210+
return ParallelismResult.noChange();
226211
}
227212

228213
// We record our expectations for this scaling operation
@@ -241,7 +226,7 @@ public ParallelismChange computeScaleTargetParallelism(
241226
delayedScaleDown);
242227
}
243228

244-
private ParallelismChange detectBlockScaling(
229+
private ParallelismResult detectBlockScaling(
245230
Context context,
246231
JobVertexID vertex,
247232
Configuration conf,
@@ -263,7 +248,7 @@ private ParallelismChange detectBlockScaling(
263248

264249
// If we don't have past scaling actions for this vertex, don't block scale up.
265250
if (history.isEmpty()) {
266-
return ParallelismChange.required(newParallelism);
251+
return ParallelismResult.build(newParallelism);
267252
}
268253

269254
var lastSummary = history.get(history.lastKey());
@@ -272,38 +257,40 @@ private ParallelismChange detectBlockScaling(
272257
&& detectIneffectiveScaleUp(
273258
context, vertex, conf, evaluatedMetrics, lastSummary)) {
274259
// Block scale up when last rescale is ineffective.
275-
return ParallelismChange.noChange();
260+
return ParallelismResult.noChange();
276261
}
277262

278-
return ParallelismChange.required(newParallelism);
263+
return ParallelismResult.build(newParallelism);
279264
} else {
280265
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
281266
}
282267
}
283268

284-
private ParallelismChange applyScaleDownInterval(
269+
private ParallelismResult applyScaleDownInterval(
285270
DelayedScaleDown delayedScaleDown,
286271
JobVertexID vertex,
287272
Configuration conf,
288273
int newParallelism) {
289274
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
290275
if (scaleDownInterval.toMillis() <= 0) {
291276
// The scale down interval is disable, so don't block scaling.
292-
return ParallelismChange.required(newParallelism);
277+
return ParallelismResult.build(newParallelism);
293278
}
294279

295-
var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
296-
if (firstTriggerTime.isEmpty()) {
297-
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
298-
delayedScaleDown.updateTriggerTime(vertex, clock.instant());
299-
return ParallelismChange.optional(newParallelism);
300-
}
280+
var now = clock.instant();
281+
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
301282

302-
if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
303-
LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex);
304-
return ParallelismChange.optional(newParallelism);
283+
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
284+
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
285+
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
286+
} else {
287+
LOG.debug(
288+
"Try to skip immediate scale down within scale-down interval for {}",
289+
vertex);
290+
}
291+
return ParallelismResult.noChange();
305292
} else {
306-
return ParallelismChange.required(newParallelism);
293+
return ParallelismResult.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
307294
}
308295
}
309296

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,10 @@
4545
import java.time.Instant;
4646
import java.util.Collections;
4747
import java.util.HashMap;
48-
import java.util.HashSet;
4948
import java.util.Map;
5049
import java.util.Set;
5150
import java.util.SortedMap;
5251

53-
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
54-
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
5552
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
5653
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5754
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
@@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
181178
}
182179

183180
@VisibleForTesting
184-
static boolean allRequiredVerticesWithinUtilizationTarget(
181+
static boolean allChangedVerticesWithinUtilizationTarget(
185182
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
186-
Set<JobVertexID> requiredVertices) {
187-
// All vertices' ParallelismChange is optional, rescaling will be ignored.
188-
if (requiredVertices.isEmpty()) {
183+
Set<JobVertexID> changedVertices) {
184+
// No any vertex is changed.
185+
if (changedVertices.isEmpty()) {
189186
return true;
190187
}
191188

192-
for (JobVertexID vertex : requiredVertices) {
189+
for (var vertex : changedVertices) {
193190
var metrics = evaluatedMetrics.get(vertex);
194191

195192
double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
@@ -234,7 +231,6 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
234231
}
235232

236233
var out = new HashMap<JobVertexID, ScalingSummary>();
237-
var requiredVertices = new HashSet<JobVertexID>();
238234

239235
var excludeVertexIdList =
240236
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
@@ -250,7 +246,7 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
250246
var currentParallelism =
251247
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
252248

253-
var parallelismChange =
249+
var parallelismResult =
254250
jobVertexScaler.computeScaleTargetParallelism(
255251
context,
256252
v,
@@ -260,24 +256,21 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
260256
v, Collections.emptySortedMap()),
261257
restartTime,
262258
delayedScaleDown);
263-
if (NO_CHANGE == parallelismChange.getChangeType()) {
259+
if (parallelismResult.isNoChange()) {
264260
return;
265-
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
266-
requiredVertices.add(v);
267261
}
268262
out.put(
269263
v,
270264
new ScalingSummary(
271265
currentParallelism,
272-
parallelismChange.getNewParallelism(),
266+
parallelismResult.getNewParallelism(),
273267
metrics));
274268
}
275269
});
276270

277-
// If the Utilization of all required tasks is within range, we can skip scaling.
278-
// It means that if only optional tasks are out of scope, we still need to ignore scale.
279-
if (allRequiredVerticesWithinUtilizationTarget(
280-
evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
271+
// If the Utilization of all tasks is within range, we can skip scaling.
272+
if (allChangedVerticesWithinUtilizationTarget(
273+
evaluatedMetrics.getVertexMetrics(), out.keySet())) {
281274
return Map.of();
282275
}
283276

0 commit comments

Comments
 (0)