Skip to content

Commit 5d89009

Browse files
committed
[FLINK-36863][autoscaler] Use the maximum parallelism in the past scale-down.interval window when scaling down
1 parent c515429 commit 5d89009

File tree

5 files changed

+354
-87
lines changed

5 files changed

+354
-87
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java

Lines changed: 94 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,95 @@
3030

3131
import java.time.Instant;
3232
import java.util.HashMap;
33+
import java.util.LinkedList;
3334
import java.util.Map;
3435

36+
import static org.apache.flink.util.Preconditions.checkState;
37+
3538
/** All delayed scale down requests. */
3639
public class DelayedScaleDown {
3740

41+
@Data
42+
public static class RecommendedParallelism {
43+
@Nonnull private final Instant triggerTime;
44+
private final int parallelism;
45+
private final boolean outsideUtilizationBound;
46+
47+
@JsonCreator
48+
public RecommendedParallelism(
49+
@Nonnull @JsonProperty("triggerTime") Instant triggerTime,
50+
@JsonProperty("parallelism") int parallelism,
51+
@JsonProperty("outsideUtilizationBound") boolean outsideUtilizationBound) {
52+
this.triggerTime = triggerTime;
53+
this.parallelism = parallelism;
54+
this.outsideUtilizationBound = outsideUtilizationBound;
55+
}
56+
}
57+
3858
/** The delayed scale down info for vertex. */
3959
@Data
4060
public static class VertexDelayedScaleDownInfo {
4161
private final Instant firstTriggerTime;
42-
private int maxRecommendedParallelism;
62+
63+
/**
64+
* In theory, it maintains all recommended parallelisms at each time within the past
65+
* `scale-down.interval` window period, so all recommended parallelisms before the window
66+
* start time will be evicted.
67+
*
68+
* <p>Also, if latest parallelism is greater than the past parallelism, all smaller
69+
* parallelism in the past never be the max recommended parallelism, so we could evict all
70+
* smaller parallelism in the past. It's a general optimization for calculating max value
71+
* for sliding window. So we only need to maintain a list with monotonically decreasing
72+
* parallelism within the past window, and the first parallelism will be the max recommended
73+
* parallelism within the past `scale-down.interval` window period.
74+
*/
75+
private final LinkedList<RecommendedParallelism> recommendedParallelisms;
76+
77+
public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
78+
this.firstTriggerTime = firstTriggerTime;
79+
this.recommendedParallelisms = new LinkedList<>();
80+
}
4381

4482
@JsonCreator
4583
public VertexDelayedScaleDownInfo(
4684
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
47-
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
85+
@JsonProperty("recommendedParallelisms")
86+
LinkedList<RecommendedParallelism> recommendedParallelisms) {
4887
this.firstTriggerTime = firstTriggerTime;
49-
this.maxRecommendedParallelism = maxRecommendedParallelism;
88+
this.recommendedParallelisms = recommendedParallelisms;
89+
}
90+
91+
/** Record current recommended parallelism. */
92+
public void recordRecommendedParallelism(
93+
Instant triggerTime, int parallelism, boolean outsideUtilizationBound) {
94+
// Evict all recommended parallelisms that are lower than or equal to the latest
95+
// parallelism. When the past parallelism is equal to the latest parallelism,
96+
// triggerTime needs to be updated, so it also needs to be evicted.
97+
while (!recommendedParallelisms.isEmpty()
98+
&& recommendedParallelisms.peekLast().getParallelism() <= parallelism) {
99+
recommendedParallelisms.pollLast();
100+
}
101+
102+
recommendedParallelisms.addLast(
103+
new RecommendedParallelism(triggerTime, parallelism, outsideUtilizationBound));
104+
}
105+
106+
@JsonIgnore
107+
public RecommendedParallelism getMaxRecommendedParallelism(Instant windowStartTime) {
108+
// Evict all recommended parallelisms before the window start time.
109+
while (!recommendedParallelisms.isEmpty()
110+
&& recommendedParallelisms
111+
.peekFirst()
112+
.getTriggerTime()
113+
.isBefore(windowStartTime)) {
114+
recommendedParallelisms.pollFirst();
115+
}
116+
117+
var maxRecommendedParallelism = recommendedParallelisms.peekFirst();
118+
checkState(
119+
maxRecommendedParallelism != null,
120+
"The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug.");
121+
return maxRecommendedParallelism;
50122
}
51123
}
52124

@@ -60,21 +132,28 @@ public DelayedScaleDown() {
60132
this.delayedVertices = new HashMap<>();
61133
}
62134

135+
// TODO : remove this and refactor tests;
136+
public VertexDelayedScaleDownInfo triggerScaleDown(
137+
JobVertexID vertex, Instant triggerTime, int parallelism) {
138+
return triggerScaleDown(vertex, triggerTime, parallelism, false);
139+
}
140+
63141
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
64142
@Nonnull
65143
public VertexDelayedScaleDownInfo triggerScaleDown(
66-
JobVertexID vertex, Instant triggerTime, int parallelism) {
67-
var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
68-
if (vertexDelayedScaleDownInfo == null) {
69-
// It's the first trigger
70-
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
71-
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
72-
updated = true;
73-
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
74-
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
75-
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
76-
updated = true;
77-
}
144+
JobVertexID vertex,
145+
Instant triggerTime,
146+
int parallelism,
147+
boolean outsideUtilizationBound) {
148+
// The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
149+
// update the triggerTime each time.
150+
updated = true;
151+
152+
var vertexDelayedScaleDownInfo =
153+
delayedVertices.computeIfAbsent(
154+
vertex, k -> new VertexDelayedScaleDownInfo(triggerTime));
155+
vertexDelayedScaleDownInfo.recordRecommendedParallelism(
156+
triggerTime, parallelism, outsideUtilizationBound);
78157

79158
return vertexDelayedScaleDownInfo;
80159
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.SortedMap;
4545

4646
import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
47+
import static org.apache.flink.autoscaler.ScalingExecutor.outsideUtilizationBound;
4748
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
4849
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
4950
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
@@ -92,12 +93,15 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
9293
@Getter
9394
public static class ParallelismChange {
9495

95-
private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1);
96+
private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1, false);
9697

9798
private final int newParallelism;
9899

99-
private ParallelismChange(int newParallelism) {
100+
private final boolean outsideUtilizationBound;
101+
102+
private ParallelismChange(int newParallelism, boolean outsideUtilizationBound) {
100103
this.newParallelism = newParallelism;
104+
this.outsideUtilizationBound = outsideUtilizationBound;
101105
}
102106

103107
public boolean isNoChange() {
@@ -113,24 +117,29 @@ public boolean equals(Object o) {
113117
return false;
114118
}
115119
ParallelismChange that = (ParallelismChange) o;
116-
return newParallelism == that.newParallelism;
120+
return newParallelism == that.newParallelism
121+
&& outsideUtilizationBound == that.outsideUtilizationBound;
117122
}
118123

119124
@Override
120125
public int hashCode() {
121-
return Objects.hash(newParallelism);
126+
return Objects.hash(newParallelism, outsideUtilizationBound);
122127
}
123128

124129
@Override
125130
public String toString() {
126131
return isNoChange()
127132
? "NoParallelismChange"
128-
: "ParallelismChange{newParallelism=" + newParallelism + '}';
133+
: "ParallelismChange{newParallelism="
134+
+ newParallelism
135+
+ ", outsideUtilizationBound="
136+
+ outsideUtilizationBound
137+
+ "}";
129138
}
130139

131-
public static ParallelismChange build(int newParallelism) {
140+
public static ParallelismChange build(int newParallelism, boolean outsideUtilizationBound) {
132141
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
133-
return new ParallelismChange(newParallelism);
142+
return new ParallelismChange(newParallelism, outsideUtilizationBound);
134143
}
135144

136145
public static ParallelismChange noChange() {
@@ -239,6 +248,8 @@ private ParallelismChange detectBlockScaling(
239248
currentParallelism != newParallelism,
240249
"The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug.");
241250

251+
var outsideUtilizationBound = outsideUtilizationBound(vertex, evaluatedMetrics);
252+
242253
var scaledUp = currentParallelism < newParallelism;
243254

244255
if (scaledUp) {
@@ -248,7 +259,7 @@ private ParallelismChange detectBlockScaling(
248259

249260
// If we don't have past scaling actions for this vertex, don't block scale up.
250261
if (history.isEmpty()) {
251-
return ParallelismChange.build(newParallelism);
262+
return ParallelismChange.build(newParallelism, outsideUtilizationBound);
252263
}
253264

254265
var lastSummary = history.get(history.lastKey());
@@ -260,28 +271,33 @@ && detectIneffectiveScaleUp(
260271
return ParallelismChange.noChange();
261272
}
262273

263-
return ParallelismChange.build(newParallelism);
274+
return ParallelismChange.build(newParallelism, outsideUtilizationBound);
264275
} else {
265-
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
276+
return applyScaleDownInterval(
277+
delayedScaleDown, vertex, conf, newParallelism, outsideUtilizationBound);
266278
}
267279
}
268280

269281
private ParallelismChange applyScaleDownInterval(
270282
DelayedScaleDown delayedScaleDown,
271283
JobVertexID vertex,
272284
Configuration conf,
273-
int newParallelism) {
285+
int newParallelism,
286+
boolean outsideUtilizationBound) {
274287
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
275288
if (scaleDownInterval.toMillis() <= 0) {
276289
// The scale down interval is disable, so don't block scaling.
277-
return ParallelismChange.build(newParallelism);
290+
return ParallelismChange.build(newParallelism, outsideUtilizationBound);
278291
}
279292

280293
var now = clock.instant();
281-
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
294+
var windowStartTime = now.minus(scaleDownInterval);
295+
var delayedScaleDownInfo =
296+
delayedScaleDown.triggerScaleDown(
297+
vertex, now, newParallelism, outsideUtilizationBound);
282298

283299
// Never scale down within scale down interval
284-
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
300+
if (windowStartTime.isBefore(delayedScaleDownInfo.getFirstTriggerTime())) {
285301
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
286302
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
287303
} else {
@@ -293,7 +309,11 @@ private ParallelismChange applyScaleDownInterval(
293309
} else {
294310
// Using the maximum parallelism within the scale down interval window instead of the
295311
// latest parallelism when scaling down
296-
return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
312+
var maxRecommendedParallelism =
313+
delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime);
314+
return ParallelismChange.build(
315+
maxRecommendedParallelism.getParallelism(),
316+
maxRecommendedParallelism.isOutsideUtilizationBound());
297317
}
298318
}
299319

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Map;
5050
import java.util.Set;
5151
import java.util.SortedMap;
52+
import java.util.concurrent.atomic.AtomicBoolean;
5253

5354
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
5455
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
@@ -178,6 +179,7 @@ private void updateRecommendedParallelism(
178179
scalingSummary.getNewParallelism())));
179180
}
180181

182+
// TODO: how to support old testing?
181183
@VisibleForTesting
182184
static boolean allChangedVerticesWithinUtilizationTarget(
183185
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
@@ -190,32 +192,39 @@ static boolean allChangedVerticesWithinUtilizationTarget(
190192
for (JobVertexID vertex : changedVertices) {
191193
var metrics = evaluatedMetrics.get(vertex);
192194

193-
double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
194-
double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
195-
double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
196-
197-
if (trueProcessingRate < scaleUpRateThreshold
198-
|| trueProcessingRate > scaleDownRateThreshold) {
199-
LOG.debug(
200-
"Vertex {} processing rate {} is outside ({}, {})",
201-
vertex,
202-
trueProcessingRate,
203-
scaleUpRateThreshold,
204-
scaleDownRateThreshold);
195+
if (outsideUtilizationBound(vertex, metrics)) {
205196
return false;
206-
} else {
207-
LOG.debug(
208-
"Vertex {} processing rate {} is within target ({}, {})",
209-
vertex,
210-
trueProcessingRate,
211-
scaleUpRateThreshold,
212-
scaleDownRateThreshold);
213197
}
214198
}
215-
LOG.info("All vertex processing rates are within target.");
216199
return true;
217200
}
218201

202+
public static boolean outsideUtilizationBound(
203+
JobVertexID vertex, Map<ScalingMetric, EvaluatedScalingMetric> metrics) {
204+
double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
205+
double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
206+
double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
207+
208+
if (trueProcessingRate < scaleUpRateThreshold
209+
|| trueProcessingRate > scaleDownRateThreshold) {
210+
LOG.debug(
211+
"Vertex {} processing rate {} is outside ({}, {})",
212+
vertex,
213+
trueProcessingRate,
214+
scaleUpRateThreshold,
215+
scaleDownRateThreshold);
216+
return true;
217+
} else {
218+
LOG.debug(
219+
"Vertex {} processing rate {} is within target ({}, {})",
220+
vertex,
221+
trueProcessingRate,
222+
scaleUpRateThreshold,
223+
scaleDownRateThreshold);
224+
}
225+
return false;
226+
}
227+
219228
@VisibleForTesting
220229
Map<JobVertexID, ScalingSummary> computeScalingSummary(
221230
Context context,
@@ -235,6 +244,7 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
235244

236245
var excludeVertexIdList =
237246
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
247+
AtomicBoolean anyVertexOutsideBound = new AtomicBoolean(false);
238248
evaluatedMetrics
239249
.getVertexMetrics()
240250
.forEach(
@@ -260,6 +270,9 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
260270
if (parallelismChange.isNoChange()) {
261271
return;
262272
}
273+
if (parallelismChange.isOutsideUtilizationBound()) {
274+
anyVertexOutsideBound.set(true);
275+
}
263276
out.put(
264277
v,
265278
new ScalingSummary(
@@ -270,8 +283,8 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
270283
});
271284

272285
// If the Utilization of all tasks is within range, we can skip scaling.
273-
if (allChangedVerticesWithinUtilizationTarget(
274-
evaluatedMetrics.getVertexMetrics(), out.keySet())) {
286+
if (!anyVertexOutsideBound.get()) {
287+
LOG.info("All vertex processing rates are within target.");
275288
return Map.of();
276289
}
277290

0 commit comments

Comments
 (0)