Skip to content

Commit 4166c97

Browse files
committed
[FLINK-36863][autoscaler] Use the maximum parallelism in the past scale-down.interval window when scaling down
1 parent 091e803 commit 4166c97

File tree

3 files changed

+179
-19
lines changed

3 files changed

+179
-19
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,88 @@
3030

3131
import java.time.Instant;
3232
import java.util.HashMap;
33+
import java.util.LinkedList;
3334
import java.util.Map;
3435

36+
import static org.apache.flink.util.Preconditions.checkState;
37+
3538
/** All delayed scale down requests. */
3639
public class DelayedScaleDown {
3740

41+
@Data
42+
private static class RecommendedParallelism {
43+
@Nonnull private final Instant triggerTime;
44+
private final int parallelism;
45+
46+
@JsonCreator
47+
public RecommendedParallelism(
48+
@Nonnull @JsonProperty("triggerTime") Instant triggerTime,
49+
@JsonProperty("parallelism") int parallelism) {
50+
this.triggerTime = triggerTime;
51+
this.parallelism = parallelism;
52+
}
53+
}
54+
3855
/** The delayed scale down info for vertex. */
3956
@Data
4057
public static class VertexDelayedScaleDownInfo {
4158
private final Instant firstTriggerTime;
42-
private int maxRecommendedParallelism;
59+
60+
/**
61+
* It maintains all recommended parallelisms at each time within the past
62+
* `scale-down.interval` window period. So all recommended parallelisms before the window
63+
* start time will be evicted.
64+
*
65+
* <p>Also, if latest parallelism is greater than the past parallelism, all smaller
66+
* parallelism in the past never be the max recommended parallelism, so we could evict all
67+
* smaller parallelism in the past. It's a general optimization for calculating max value
68+
* for sliding window. So We only need to maintain a list with monotonically decreasing
69+
* parallelism within the past window, and the first parallelism will be the max recommended
70+
* parallelism within the past scale-down.interval window period.
71+
*/
72+
private final LinkedList<RecommendedParallelism> recommendedParallelisms;
73+
74+
public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
75+
this.firstTriggerTime = firstTriggerTime;
76+
this.recommendedParallelisms = new LinkedList<>();
77+
}
4378

4479
@JsonCreator
4580
public VertexDelayedScaleDownInfo(
4681
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
47-
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
82+
@JsonProperty("recommendedParallelisms")
83+
LinkedList<RecommendedParallelism> recommendedParallelisms) {
4884
this.firstTriggerTime = firstTriggerTime;
49-
this.maxRecommendedParallelism = maxRecommendedParallelism;
85+
this.recommendedParallelisms = recommendedParallelisms;
86+
}
87+
88+
/** Record current recommended parallelism. */
89+
public void recordRecommendedParallelism(Instant triggerTime, int parallelism) {
90+
// Evict all recommended parallelisms that are lower than the latest parallelism.
91+
while (!recommendedParallelisms.isEmpty()
92+
&& recommendedParallelisms.peekLast().getParallelism() <= parallelism) {
93+
recommendedParallelisms.pollLast();
94+
}
95+
96+
recommendedParallelisms.addLast(new RecommendedParallelism(triggerTime, parallelism));
97+
}
98+
99+
@JsonIgnore
100+
public int getMaxRecommendedParallelism(Instant windowStartTime) {
101+
// Evict all recommended parallelisms before the window start time.
102+
while (!recommendedParallelisms.isEmpty()
103+
&& recommendedParallelisms
104+
.peekFirst()
105+
.getTriggerTime()
106+
.isBefore(windowStartTime)) {
107+
recommendedParallelisms.pollFirst();
108+
}
109+
110+
var maxRecommendedParallelism = recommendedParallelisms.peekFirst();
111+
checkState(
112+
maxRecommendedParallelism != null,
113+
"The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug.");
114+
return maxRecommendedParallelism.getParallelism();
50115
}
51116
}
52117

@@ -64,17 +129,16 @@ public DelayedScaleDown() {
64129
@Nonnull
65130
public VertexDelayedScaleDownInfo triggerScaleDown(
66131
JobVertexID vertex, Instant triggerTime, int parallelism) {
132+
// The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
133+
// update the triggerTime each time.
134+
updated = true;
135+
67136
var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
68137
if (vertexDelayedScaleDownInfo == null) {
69-
// It's the first trigger
70-
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
138+
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime);
71139
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
72-
updated = true;
73-
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
74-
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
75-
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
76-
updated = true;
77140
}
141+
vertexDelayedScaleDownInfo.recordRecommendedParallelism(triggerTime, parallelism);
78142

79143
return vertexDelayedScaleDownInfo;
80144
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,11 @@ private ParallelismChange applyScaleDownInterval(
278278
}
279279

280280
var now = clock.instant();
281+
var windowStartTime = now.minus(scaleDownInterval);
281282
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
282283

283284
// Never scale down within scale down interval
284-
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
285+
if (windowStartTime.isBefore(delayedScaleDownInfo.getFirstTriggerTime())) {
285286
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
286287
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
287288
} else {
@@ -293,7 +294,8 @@ private ParallelismChange applyScaleDownInterval(
293294
} else {
294295
// Using the maximum parallelism within the scale down interval window instead of the
295296
// latest parallelism when scaling down
296-
return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
297+
return ParallelismChange.build(
298+
delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime));
297299
}
298300
}
299301

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,97 @@
2424
import java.time.Instant;
2525

2626
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2728

2829
/** Test for {@link DelayedScaleDown}. */
2930
public class DelayedScaleDownTest {
3031

3132
private final JobVertexID vertex = new JobVertexID();
3233

34+
@Test
35+
void testWrongWindowStartTime() {
36+
var instant = Instant.now();
37+
var delayedScaleDown = new DelayedScaleDown();
38+
39+
// First trigger time as the trigger time, and it won't be updated.
40+
var vertexDelayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, instant, 5);
41+
assertVertexDelayedScaleDownInfo(vertexDelayedScaleDownInfo, instant, 5, instant);
42+
43+
// Get the max recommended parallelism from a wrong window, and no any recommended
44+
// parallelism since the start window.
45+
assertThatThrownBy(
46+
() ->
47+
vertexDelayedScaleDownInfo.getMaxRecommendedParallelism(
48+
instant.plusSeconds(1)))
49+
.isInstanceOf(IllegalStateException.class);
50+
}
51+
52+
@Test
53+
void testMaxRecommendedParallelismForSlidingWindow() {
54+
var instant = Instant.now();
55+
var delayedScaleDown = new DelayedScaleDown();
56+
assertThat(delayedScaleDown.isUpdated()).isFalse();
57+
58+
// [5] -> 5
59+
assertVertexDelayedScaleDownInfo(
60+
delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5, instant);
61+
// [5, 8] -> 8
62+
assertVertexDelayedScaleDownInfo(
63+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(1), 8),
64+
instant,
65+
8,
66+
instant);
67+
// [5, 8, 6] -> 8
68+
assertVertexDelayedScaleDownInfo(
69+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(2), 6),
70+
instant,
71+
8,
72+
instant);
73+
// [5, 8, 6, 4] -> 8
74+
assertVertexDelayedScaleDownInfo(
75+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(3), 4),
76+
instant,
77+
8,
78+
instant);
79+
// 5, [8, 6, 4, 3] -> 8
80+
assertVertexDelayedScaleDownInfo(
81+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(4), 3),
82+
instant,
83+
8,
84+
instant.plusSeconds(1));
85+
// 5, 8, [6, 4, 3, 3] -> 6
86+
assertVertexDelayedScaleDownInfo(
87+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3),
88+
instant,
89+
6,
90+
instant.plusSeconds(2));
91+
// 5, 8, 6, [4, 3, 3, 3] -> 4
92+
assertVertexDelayedScaleDownInfo(
93+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(6), 3),
94+
instant,
95+
4,
96+
instant.plusSeconds(3));
97+
// 5, 8, 6, 4, [3, 3, 3, 3] -> 3
98+
assertVertexDelayedScaleDownInfo(
99+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(7), 3),
100+
instant,
101+
3,
102+
instant.plusSeconds(4));
103+
// Check the timestamp of latest parallelism is maintained correctly.
104+
// 5, 8, 6, 4, 3, 3, 3, 3, [3] -> 3
105+
assertVertexDelayedScaleDownInfo(
106+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(8), 3),
107+
instant,
108+
3,
109+
instant.plusSeconds(8));
110+
// 5, 8, 6, 4, 3, 3, 3, 3, [3, 9] -> 9
111+
assertVertexDelayedScaleDownInfo(
112+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(9), 9),
113+
instant,
114+
9,
115+
instant.plusSeconds(8));
116+
}
117+
33118
@Test
34119
void testTriggerUpdateAndClean() {
35120
var instant = Instant.now();
@@ -38,40 +123,49 @@ void testTriggerUpdateAndClean() {
38123

39124
// First trigger time as the trigger time, and it won't be updated.
40125
assertVertexDelayedScaleDownInfo(
41-
delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5);
126+
delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5, instant);
42127
assertThat(delayedScaleDown.isUpdated()).isTrue();
43128

44129
// The lower parallelism doesn't update the result
45130
assertVertexDelayedScaleDownInfo(
46-
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3), instant, 5);
131+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3),
132+
instant,
133+
5,
134+
instant);
47135

48136
// The higher parallelism will update the result
49137
assertVertexDelayedScaleDownInfo(
50-
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8), instant, 8);
138+
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8),
139+
instant,
140+
8,
141+
instant);
51142

52143
// The scale down could be re-triggered again after clean
53144
delayedScaleDown.clearVertex(vertex);
54145
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
55146
assertVertexDelayedScaleDownInfo(
56147
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4),
57148
instant.plusSeconds(15),
58-
4);
149+
4,
150+
instant);
59151

60152
// The scale down could be re-triggered again after cleanAll
61153
delayedScaleDown.clearAll();
62154
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
63155
assertVertexDelayedScaleDownInfo(
64156
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 2),
65157
instant.plusSeconds(15),
66-
2);
158+
2,
159+
instant);
67160
}
68161

69162
void assertVertexDelayedScaleDownInfo(
70163
DelayedScaleDown.VertexDelayedScaleDownInfo vertexDelayedScaleDownInfo,
71164
Instant expectedTriggerTime,
72-
int expectedMaxRecommendedParallelism) {
165+
int expectedMaxRecommendedParallelism,
166+
Instant windowStartTime) {
73167
assertThat(vertexDelayedScaleDownInfo.getFirstTriggerTime()).isEqualTo(expectedTriggerTime);
74-
assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism())
168+
assertThat(vertexDelayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime))
75169
.isEqualTo(expectedMaxRecommendedParallelism);
76170
}
77171
}

0 commit comments

Comments
 (0)