3030
3131import java .time .Instant ;
3232import java .util .HashMap ;
33+ import java .util .LinkedList ;
3334import java .util .Map ;
3435
36+ import static org .apache .flink .util .Preconditions .checkState ;
37+
3538/** All delayed scale down requests. */
3639public class DelayedScaleDown {
3740
41+ @ Data
42+ private static class RecommendedParallelism {
43+ @ Nonnull private final Instant triggerTime ;
44+ private final int parallelism ;
45+
46+ @ JsonCreator
47+ public RecommendedParallelism (
48+ @ Nonnull @ JsonProperty ("triggerTime" ) Instant triggerTime ,
49+ @ JsonProperty ("parallelism" ) int parallelism ) {
50+ this .triggerTime = triggerTime ;
51+ this .parallelism = parallelism ;
52+ }
53+ }
54+
3855 /** The delayed scale down info for vertex. */
3956 @ Data
4057 public static class VertexDelayedScaleDownInfo {
4158 private final Instant firstTriggerTime ;
42- private int maxRecommendedParallelism ;
59+ // TODO : add the comment to explain how to calculate the max parallelism within the sliding
60+ // window.
61+ private final LinkedList <RecommendedParallelism > recommendedParallelisms ;
62+
63+ public VertexDelayedScaleDownInfo (Instant firstTriggerTime ) {
64+ this .firstTriggerTime = firstTriggerTime ;
65+ this .recommendedParallelisms = new LinkedList <>();
66+ }
4367
4468 @ JsonCreator
4569 public VertexDelayedScaleDownInfo (
4670 @ JsonProperty ("firstTriggerTime" ) Instant firstTriggerTime ,
47- @ JsonProperty ("maxRecommendedParallelism" ) int maxRecommendedParallelism ) {
71+ @ JsonProperty ("recommendedParallelisms" )
72+ LinkedList <RecommendedParallelism > recommendedParallelisms ) {
4873 this .firstTriggerTime = firstTriggerTime ;
49- this .maxRecommendedParallelism = maxRecommendedParallelism ;
74+ this .recommendedParallelisms = recommendedParallelisms ;
75+ }
76+
77+ /** Record current recommended parallelism. */
78+ public void recordRecommendedParallelism (Instant triggerTime , int parallelism ) {
79+
80+ // Remove all recommended parallelisms that are lower than the latest parallelism.
81+ while (!recommendedParallelisms .isEmpty ()
82+ && recommendedParallelisms .peekLast ().getParallelism () <= parallelism ) {
83+ recommendedParallelisms .pollLast ();
84+ }
85+
86+ recommendedParallelisms .addLast (new RecommendedParallelism (triggerTime , parallelism ));
87+ }
88+
89+ @ JsonIgnore
90+ public int getMaxRecommendedParallelism (Instant windowStartTime ) {
91+ // Remove all recommended parallelisms before the window start time.
92+ while (!recommendedParallelisms .isEmpty ()
93+ && recommendedParallelisms
94+ .peekFirst ()
95+ .getTriggerTime ()
96+ .isBefore (windowStartTime )) {
97+ recommendedParallelisms .pollFirst ();
98+ }
99+
100+ var maxRecommendedParallelism = recommendedParallelisms .peekFirst ();
101+ checkState (
102+ maxRecommendedParallelism != null ,
103+ "The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug." );
104+ return maxRecommendedParallelism .getParallelism ();
50105 }
51106 }
52107
@@ -64,17 +119,16 @@ public DelayedScaleDown() {
64119 @ Nonnull
65120 public VertexDelayedScaleDownInfo triggerScaleDown (
66121 JobVertexID vertex , Instant triggerTime , int parallelism ) {
122+ // The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
123+ // update the triggerTime each time.
124+ updated = true ;
125+
67126 var vertexDelayedScaleDownInfo = delayedVertices .get (vertex );
68127 if (vertexDelayedScaleDownInfo == null ) {
69- // It's the first trigger
70- vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo (triggerTime , parallelism );
128+ vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo (triggerTime );
71129 delayedVertices .put (vertex , vertexDelayedScaleDownInfo );
72- updated = true ;
73- } else if (parallelism > vertexDelayedScaleDownInfo .getMaxRecommendedParallelism ()) {
74- // Not the first trigger, but the maxRecommendedParallelism needs to be updated.
75- vertexDelayedScaleDownInfo .setMaxRecommendedParallelism (parallelism );
76- updated = true ;
77130 }
131+ vertexDelayedScaleDownInfo .recordRecommendedParallelism (triggerTime , parallelism );
78132
79133 return vertexDelayedScaleDownInfo ;
80134 }
0 commit comments