2424import java .time .Instant ;
2525
2626import static org .assertj .core .api .Assertions .assertThat ;
27+ import static org .assertj .core .api .Assertions .assertThatThrownBy ;
2728
2829/** Test for {@link DelayedScaleDown}. */
2930public class DelayedScaleDownTest {
3031
3132 private final JobVertexID vertex = new JobVertexID ();
3233
34+ @ Test
35+ void testWrongWindowStartTime () {
36+ var instant = Instant .now ();
37+ var delayedScaleDown = new DelayedScaleDown ();
38+
39+ // First trigger time as the trigger time, and it won't be updated.
40+ var vertexDelayedScaleDownInfo = delayedScaleDown .triggerScaleDown (vertex , instant , 5 );
41+ assertVertexDelayedScaleDownInfo (vertexDelayedScaleDownInfo , instant , 5 , instant );
42+
43+ // Get the max recommended parallelism from a wrong window, and no any recommended
44+ // parallelism since the start window.
45+ assertThatThrownBy (
46+ () ->
47+ vertexDelayedScaleDownInfo .getMaxRecommendedParallelism (
48+ instant .plusSeconds (1 )))
49+ .isInstanceOf (IllegalStateException .class );
50+ }
51+
52+ @ Test
53+ void testMaxRecommendedParallelismForSlidingWindow () {
54+ var instant = Instant .now ();
55+ var delayedScaleDown = new DelayedScaleDown ();
56+ assertThat (delayedScaleDown .isUpdated ()).isFalse ();
57+
58+ // [5] -> 5
59+ assertVertexDelayedScaleDownInfo (
60+ delayedScaleDown .triggerScaleDown (vertex , instant , 5 ), instant , 5 , instant );
61+ // [5, 8] -> 8
62+ assertVertexDelayedScaleDownInfo (
63+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (1 ), 8 ),
64+ instant ,
65+ 8 ,
66+ instant );
67+ // [5, 8, 6] -> 8
68+ assertVertexDelayedScaleDownInfo (
69+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (2 ), 6 ),
70+ instant ,
71+ 8 ,
72+ instant );
73+ // [5, 8, 6, 4] -> 8
74+ assertVertexDelayedScaleDownInfo (
75+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (3 ), 4 ),
76+ instant ,
77+ 8 ,
78+ instant );
79+ // 5, [8, 6, 4, 3] -> 8
80+ assertVertexDelayedScaleDownInfo (
81+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (4 ), 3 ),
82+ instant ,
83+ 8 ,
84+ instant .plusSeconds (1 ));
85+ // 5, 8, [6, 4, 3, 3] -> 6
86+ assertVertexDelayedScaleDownInfo (
87+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (5 ), 3 ),
88+ instant ,
89+ 6 ,
90+ instant .plusSeconds (2 ));
91+ // 5, 8, 6, [4, 3, 3, 3] -> 4
92+ assertVertexDelayedScaleDownInfo (
93+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (6 ), 3 ),
94+ instant ,
95+ 4 ,
96+ instant .plusSeconds (3 ));
97+ // 5, 8, 6, 4, [3, 3, 3, 3] -> 3
98+ assertVertexDelayedScaleDownInfo (
99+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (7 ), 3 ),
100+ instant ,
101+ 3 ,
102+ instant .plusSeconds (4 ));
103+ // 5, 8, 6, 4, [3, 3, 3, 3, 9] -> 9
104+ assertVertexDelayedScaleDownInfo (
105+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (8 ), 9 ),
106+ instant ,
107+ 9 ,
108+ instant .plusSeconds (4 ));
109+ }
110+
33111 @ Test
34112 void testTriggerUpdateAndClean () {
35113 var instant = Instant .now ();
@@ -38,40 +116,49 @@ void testTriggerUpdateAndClean() {
38116
39117 // First trigger time as the trigger time, and it won't be updated.
40118 assertVertexDelayedScaleDownInfo (
41- delayedScaleDown .triggerScaleDown (vertex , instant , 5 ), instant , 5 );
119+ delayedScaleDown .triggerScaleDown (vertex , instant , 5 ), instant , 5 , instant );
42120 assertThat (delayedScaleDown .isUpdated ()).isTrue ();
43121
44122 // The lower parallelism doesn't update the result
45123 assertVertexDelayedScaleDownInfo (
46- delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (5 ), 3 ), instant , 5 );
124+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (5 ), 3 ),
125+ instant ,
126+ 5 ,
127+ instant );
47128
48129 // The higher parallelism will update the result
49130 assertVertexDelayedScaleDownInfo (
50- delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (10 ), 8 ), instant , 8 );
131+ delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (10 ), 8 ),
132+ instant ,
133+ 8 ,
134+ instant );
51135
52136 // The scale down could be re-triggered again after clean
53137 delayedScaleDown .clearVertex (vertex );
54138 assertThat (delayedScaleDown .getDelayedVertices ()).isEmpty ();
55139 assertVertexDelayedScaleDownInfo (
56140 delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (15 ), 4 ),
57141 instant .plusSeconds (15 ),
58- 4 );
142+ 4 ,
143+ instant );
59144
60145 // The scale down could be re-triggered again after cleanAll
61146 delayedScaleDown .clearAll ();
62147 assertThat (delayedScaleDown .getDelayedVertices ()).isEmpty ();
63148 assertVertexDelayedScaleDownInfo (
64149 delayedScaleDown .triggerScaleDown (vertex , instant .plusSeconds (15 ), 2 ),
65150 instant .plusSeconds (15 ),
66- 2 );
151+ 2 ,
152+ instant );
67153 }
68154
69155 void assertVertexDelayedScaleDownInfo (
70156 DelayedScaleDown .VertexDelayedScaleDownInfo vertexDelayedScaleDownInfo ,
71157 Instant expectedTriggerTime ,
72- int expectedMaxRecommendedParallelism ) {
158+ int expectedMaxRecommendedParallelism ,
159+ Instant windowStartTime ) {
73160 assertThat (vertexDelayedScaleDownInfo .getFirstTriggerTime ()).isEqualTo (expectedTriggerTime );
74- assertThat (vertexDelayedScaleDownInfo .getMaxRecommendedParallelism ())
161+ assertThat (vertexDelayedScaleDownInfo .getMaxRecommendedParallelism (windowStartTime ))
75162 .isEqualTo (expectedMaxRecommendedParallelism );
76163 }
77164}
0 commit comments