2828
2929import javax .annotation .Nonnull ;
3030
31+ import java .time .Duration ;
3132import java .time .Instant ;
3233import java .util .HashMap ;
34+ import java .util .LinkedList ;
3335import java .util .Map ;
3436
37+ import static org .apache .flink .util .Preconditions .checkState ;
38+
3539/** All delayed scale down requests. */
3640public class DelayedScaleDown {
3741
42+ @ Data
43+ private static class RecommendedParallelism {
44+ @ Nonnull private final Instant triggerTime ;
45+ private final int parallelism ;
46+
47+ @ JsonCreator
48+ public RecommendedParallelism (
49+ @ Nonnull @ JsonProperty ("triggerTime" ) Instant triggerTime ,
50+ @ JsonProperty ("parallelism" ) int parallelism ) {
51+ this .triggerTime = triggerTime ;
52+ this .parallelism = parallelism ;
53+ }
54+ }
55+
3856 /** The delayed scale down info for vertex. */
3957 @ Data
4058 public static class VertexDelayedScaleDownInfo {
4159 private final Instant firstTriggerTime ;
42- private int maxRecommendedParallelism ;
60+ // TODO : add the comment to explain how to calculate the max parallelism within the sliding
61+ // window.
62+ private final LinkedList <RecommendedParallelism > recommendedParallelisms ;
63+
64+ public VertexDelayedScaleDownInfo (Instant firstTriggerTime ) {
65+ this .firstTriggerTime = firstTriggerTime ;
66+ this .recommendedParallelisms = new LinkedList <>();
67+ }
4368
4469 @ JsonCreator
4570 public VertexDelayedScaleDownInfo (
4671 @ JsonProperty ("firstTriggerTime" ) Instant firstTriggerTime ,
47- @ JsonProperty ("maxRecommendedParallelism" ) int maxRecommendedParallelism ) {
72+ @ JsonProperty ("recommendedParallelisms" )
73+ LinkedList <RecommendedParallelism > recommendedParallelisms ) {
4874 this .firstTriggerTime = firstTriggerTime ;
49- this .maxRecommendedParallelism = maxRecommendedParallelism ;
75+ this .recommendedParallelisms = recommendedParallelisms ;
76+ }
77+
78+ /** Record current recommended parallelism. */
79+ public void recordRecommendedParallelism (
80+ Instant triggerTime , int parallelism , Duration scaleDownInterval ) {
81+ var windowStartTime = triggerTime .minus (scaleDownInterval );
82+
83+ // Remove all recommended parallelisms before the window start time.
84+ while (!recommendedParallelisms .isEmpty ()
85+ && recommendedParallelisms
86+ .peekFirst ()
87+ .getTriggerTime ()
88+ .isBefore (windowStartTime )) {
89+ recommendedParallelisms .pollFirst ();
90+ }
91+
92+ // Remove all recommended parallelisms that are lower than the latest parallelism.
93+ while (!recommendedParallelisms .isEmpty ()
94+ && recommendedParallelisms .peekLast ().getParallelism () <= parallelism ) {
95+ recommendedParallelisms .pollLast ();
96+ }
97+
98+ recommendedParallelisms .addLast (new RecommendedParallelism (triggerTime , parallelism ));
99+ }
100+
101+ @ JsonIgnore
102+ public int getMaxRecommendedParallelism () {
103+ var maxRecommendedParallelism = recommendedParallelisms .peekFirst ();
104+ checkState (
105+ maxRecommendedParallelism != null ,
106+ "The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug." );
107+ return maxRecommendedParallelism .getParallelism ();
50108 }
51109 }
52110
@@ -63,18 +121,18 @@ public DelayedScaleDown() {
63121 /** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
64122 @ Nonnull
65123 public VertexDelayedScaleDownInfo triggerScaleDown (
66- JobVertexID vertex , Instant triggerTime , int parallelism ) {
124+ JobVertexID vertex , Instant triggerTime , int parallelism , Duration scaleDownInterval ) {
125+ // The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
126+ // update the triggerTime each time.
127+ updated = true ;
128+
67129 var vertexDelayedScaleDownInfo = delayedVertices .get (vertex );
68130 if (vertexDelayedScaleDownInfo == null ) {
69- // It's the first trigger
70- vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo (triggerTime , parallelism );
131+ vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo (triggerTime );
71132 delayedVertices .put (vertex , vertexDelayedScaleDownInfo );
72- updated = true ;
73- } else if (parallelism > vertexDelayedScaleDownInfo .getMaxRecommendedParallelism ()) {
74- // Not the first trigger, but the maxRecommendedParallelism needs to be updated.
75- vertexDelayedScaleDownInfo .setMaxRecommendedParallelism (parallelism );
76- updated = true ;
77133 }
134+ vertexDelayedScaleDownInfo .recordRecommendedParallelism (
135+ triggerTime , parallelism , scaleDownInterval );
78136
79137 return vertexDelayedScaleDownInfo ;
80138 }
0 commit comments