Skip to content

Commit 2c6df55

Browse files
committed
[FLINK-36863][autoscaler] Use the maximum parallelism in the past scale-down.interval window when scaling down
1 parent d9e8cce commit 2c6df55

File tree

4 files changed

+83
-19
lines changed

4 files changed

+83
-19
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,74 @@
2828

2929
import javax.annotation.Nonnull;
3030

31+
import java.time.Duration;
3132
import java.time.Instant;
3233
import java.util.HashMap;
34+
import java.util.LinkedList;
3335
import java.util.Map;
3436

37+
import static org.apache.flink.util.Preconditions.checkState;
38+
3539
/** All delayed scale down requests. */
3640
public class DelayedScaleDown {
3741

42+
@Data
43+
private static class RecommendedParallelism {
44+
@Nonnull private final Instant triggerTime;
45+
private final int parallelism;
46+
}
47+
3848
/** The delayed scale down info for vertex. */
3949
@Data
4050
public static class VertexDelayedScaleDownInfo {
4151
private final Instant firstTriggerTime;
42-
private int maxRecommendedParallelism;
52+
// TODO : add the comment to explain how to calculate the max parallelism within the sliding
53+
// window.
54+
private final LinkedList<RecommendedParallelism> recommendedParallelisms;
55+
56+
public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
57+
this.firstTriggerTime = firstTriggerTime;
58+
this.recommendedParallelisms = new LinkedList<>();
59+
}
4360

4461
@JsonCreator
4562
public VertexDelayedScaleDownInfo(
4663
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
47-
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
64+
@JsonProperty("recommendedParallelisms")
65+
LinkedList<RecommendedParallelism> recommendedParallelisms) {
4866
this.firstTriggerTime = firstTriggerTime;
49-
this.maxRecommendedParallelism = maxRecommendedParallelism;
67+
this.recommendedParallelisms = recommendedParallelisms;
68+
}
69+
70+
/** Record current recommended parallelism. */
71+
public void recordRecommendedParallelism(
72+
Instant triggerTime, int parallelism, Duration scaleDownInterval) {
73+
var windowStartTime = triggerTime.minus(scaleDownInterval);
74+
75+
// Remove all recommended parallelisms before the window start time.
76+
while (!recommendedParallelisms.isEmpty()
77+
&& recommendedParallelisms
78+
.peekFirst()
79+
.getTriggerTime()
80+
.isBefore(windowStartTime)) {
81+
recommendedParallelisms.pollFirst();
82+
}
83+
84+
// Remove all recommended parallelisms that are lower than the latest parallelism.
85+
while (!recommendedParallelisms.isEmpty()
86+
&& recommendedParallelisms.peekLast().getParallelism() <= parallelism) {
87+
recommendedParallelisms.pollLast();
88+
}
89+
90+
recommendedParallelisms.addLast(new RecommendedParallelism(triggerTime, parallelism));
91+
}
92+
93+
public int getMaxRecommendedParallelism() {
94+
var maxRecommendedParallelism = recommendedParallelisms.peekFirst();
95+
checkState(
96+
maxRecommendedParallelism != null,
97+
"The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug.");
98+
return maxRecommendedParallelism.getParallelism();
5099
}
51100
}
52101

@@ -63,18 +112,18 @@ public DelayedScaleDown() {
63112
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
64113
@Nonnull
65114
public VertexDelayedScaleDownInfo triggerScaleDown(
66-
JobVertexID vertex, Instant triggerTime, int parallelism) {
115+
JobVertexID vertex, Instant triggerTime, int parallelism, Duration scaleDownInterval) {
116+
// The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
117+
// update the triggerTime each time.
118+
updated = true;
119+
67120
var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
68121
if (vertexDelayedScaleDownInfo == null) {
69-
// It's the first trigger
70-
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
122+
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime);
71123
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
72-
updated = true;
73-
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
74-
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
75-
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
76-
updated = true;
77124
}
125+
vertexDelayedScaleDownInfo.recordRecommendedParallelism(
126+
triggerTime, parallelism, scaleDownInterval);
78127

79128
return vertexDelayedScaleDownInfo;
80129
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ private ParallelismChange applyScaleDownInterval(
278278
}
279279

280280
var now = clock.instant();
281-
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
281+
var delayedScaleDownInfo =
282+
delayedScaleDown.triggerScaleDown(vertex, now, newParallelism, scaleDownInterval);
282283

283284
// Never scale down within scale down interval
284285
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.junit.jupiter.api.Test;
2323

24+
import java.time.Duration;
2425
import java.time.Instant;
2526

2627
import static org.assertj.core.api.Assertions.assertThat;
@@ -33,35 +34,46 @@ public class DelayedScaleDownTest {
3334
@Test
3435
void testTriggerUpdateAndClean() {
3536
var instant = Instant.now();
37+
var scaleDownInterval = Duration.ofHours(1);
3638
var delayedScaleDown = new DelayedScaleDown();
3739
assertThat(delayedScaleDown.isUpdated()).isFalse();
3840

3941
// First trigger time as the trigger time, and it won't be updated.
4042
assertVertexDelayedScaleDownInfo(
41-
delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5);
43+
delayedScaleDown.triggerScaleDown(vertex, instant, 5, scaleDownInterval),
44+
instant,
45+
5);
4246
assertThat(delayedScaleDown.isUpdated()).isTrue();
4347

4448
// The lower parallelism doesn't update the result
4549
assertVertexDelayedScaleDownInfo(
46-
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3), instant, 5);
50+
delayedScaleDown.triggerScaleDown(
51+
vertex, instant.plusSeconds(5), 3, scaleDownInterval),
52+
instant,
53+
5);
4754

4855
// The higher parallelism will update the result
4956
assertVertexDelayedScaleDownInfo(
50-
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8), instant, 8);
57+
delayedScaleDown.triggerScaleDown(
58+
vertex, instant.plusSeconds(10), 8, scaleDownInterval),
59+
instant,
60+
8);
5161

5262
// The scale down could be re-triggered again after clean
5363
delayedScaleDown.clearVertex(vertex);
5464
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
5565
assertVertexDelayedScaleDownInfo(
56-
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4),
66+
delayedScaleDown.triggerScaleDown(
67+
vertex, instant.plusSeconds(15), 4, scaleDownInterval),
5768
instant.plusSeconds(15),
5869
4);
5970

6071
// The scale down could be re-triggered again after cleanAll
6172
delayedScaleDown.clearAll();
6273
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
6374
assertVertexDelayedScaleDownInfo(
64-
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 2),
75+
delayedScaleDown.triggerScaleDown(
76+
vertex, instant.plusSeconds(15), 2, scaleDownInterval),
6577
instant.plusSeconds(15),
6678
2);
6779
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,10 @@ protected void testDiscardAllState() throws Exception {
207207
stateStore.storeScalingTracking(ctx, scalingTracking);
208208

209209
var delayedScaleDown = new DelayedScaleDown();
210-
delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10);
211-
delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now().plusSeconds(10), 12);
210+
var scaleDownInterval = Duration.ofHours(1);
211+
delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10, scaleDownInterval);
212+
delayedScaleDown.triggerScaleDown(
213+
new JobVertexID(), Instant.now().plusSeconds(10), 12, scaleDownInterval);
212214

213215
stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
214216

0 commit comments

Comments
 (0)