5656import static org .apache .flink .autoscaler .metrics .ScalingMetric .MAX_PARALLELISM ;
5757import static org .apache .flink .autoscaler .metrics .ScalingMetric .NUM_SOURCE_PARTITIONS ;
5858import static org .apache .flink .autoscaler .metrics .ScalingMetric .PARALLELISM ;
59+ import static org .apache .flink .autoscaler .metrics .ScalingMetric .SCALE_DOWN_RATE_THRESHOLD ;
60+ import static org .apache .flink .autoscaler .metrics .ScalingMetric .SCALE_UP_RATE_THRESHOLD ;
5961import static org .apache .flink .autoscaler .metrics .ScalingMetric .TRUE_PROCESSING_RATE ;
6062import static org .apache .flink .autoscaler .topology .ShipStrategy .HASH ;
6163import static org .apache .flink .configuration .description .TextElement .text ;
@@ -92,12 +94,15 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
9294 @ Getter
9395 public static class ParallelismChange {
9496
95- private static final ParallelismChange NO_CHANGE = new ParallelismChange (-1 );
97+ private static final ParallelismChange NO_CHANGE = new ParallelismChange (-1 , false );
9698
9799 private final int newParallelism ;
98100
99- private ParallelismChange (int newParallelism ) {
101+ private final boolean outsideUtilizationBound ;
102+
103+ private ParallelismChange (int newParallelism , boolean outsideUtilizationBound ) {
100104 this .newParallelism = newParallelism ;
105+ this .outsideUtilizationBound = outsideUtilizationBound ;
101106 }
102107
103108 public boolean isNoChange () {
@@ -113,24 +118,29 @@ public boolean equals(Object o) {
113118 return false ;
114119 }
115120 ParallelismChange that = (ParallelismChange ) o ;
116- return newParallelism == that .newParallelism ;
121+ return newParallelism == that .newParallelism
122+ && outsideUtilizationBound == that .outsideUtilizationBound ;
117123 }
118124
119125 @ Override
120126 public int hashCode () {
121- return Objects .hash (newParallelism );
127+ return Objects .hash (newParallelism , outsideUtilizationBound );
122128 }
123129
124130 @ Override
125131 public String toString () {
126132 return isNoChange ()
127133 ? "NoParallelismChange"
128- : "ParallelismChange{newParallelism=" + newParallelism + '}' ;
134+ : "ParallelismChange{newParallelism="
135+ + newParallelism
136+ + ", outsideUtilizationBound="
137+ + outsideUtilizationBound
138+ + "}" ;
129139 }
130140
131- public static ParallelismChange build (int newParallelism ) {
141+ public static ParallelismChange build (int newParallelism , boolean outsideUtilizationBound ) {
132142 checkArgument (newParallelism > 0 , "The parallelism should be greater than 0." );
133- return new ParallelismChange (newParallelism );
143+ return new ParallelismChange (newParallelism , outsideUtilizationBound );
134144 }
135145
136146 public static ParallelismChange noChange () {
@@ -239,6 +249,8 @@ private ParallelismChange detectBlockScaling(
239249 currentParallelism != newParallelism ,
240250 "The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug." );
241251
252+ var outsideUtilizationBound = outsideUtilizationBound (vertex , evaluatedMetrics );
253+
242254 var scaledUp = currentParallelism < newParallelism ;
243255
244256 if (scaledUp ) {
@@ -248,7 +260,7 @@ private ParallelismChange detectBlockScaling(
248260
249261 // If we don't have past scaling actions for this vertex, don't block scale up.
250262 if (history .isEmpty ()) {
251- return ParallelismChange .build (newParallelism );
263+ return ParallelismChange .build (newParallelism , outsideUtilizationBound );
252264 }
253265
254266 var lastSummary = history .get (history .lastKey ());
@@ -260,28 +272,59 @@ && detectIneffectiveScaleUp(
260272 return ParallelismChange .noChange ();
261273 }
262274
263- return ParallelismChange .build (newParallelism );
275+ return ParallelismChange .build (newParallelism , outsideUtilizationBound );
276+ } else {
277+ return applyScaleDownInterval (
278+ delayedScaleDown , vertex , conf , newParallelism , outsideUtilizationBound );
279+ }
280+ }
281+
282+ private static boolean outsideUtilizationBound (
283+ JobVertexID vertex , Map <ScalingMetric , EvaluatedScalingMetric > metrics ) {
284+ double trueProcessingRate = metrics .get (TRUE_PROCESSING_RATE ).getAverage ();
285+ double scaleUpRateThreshold = metrics .get (SCALE_UP_RATE_THRESHOLD ).getCurrent ();
286+ double scaleDownRateThreshold = metrics .get (SCALE_DOWN_RATE_THRESHOLD ).getCurrent ();
287+
288+ if (trueProcessingRate < scaleUpRateThreshold
289+ || trueProcessingRate > scaleDownRateThreshold ) {
290+ LOG .debug (
291+ "Vertex {} processing rate {} is outside ({}, {})" ,
292+ vertex ,
293+ trueProcessingRate ,
294+ scaleUpRateThreshold ,
295+ scaleDownRateThreshold );
296+ return true ;
264297 } else {
265- return applyScaleDownInterval (delayedScaleDown , vertex , conf , newParallelism );
298+ LOG .debug (
299+ "Vertex {} processing rate {} is within target ({}, {})" ,
300+ vertex ,
301+ trueProcessingRate ,
302+ scaleUpRateThreshold ,
303+ scaleDownRateThreshold );
266304 }
305+ return false ;
267306 }
268307
269308 private ParallelismChange applyScaleDownInterval (
270309 DelayedScaleDown delayedScaleDown ,
271310 JobVertexID vertex ,
272311 Configuration conf ,
273- int newParallelism ) {
312+ int newParallelism ,
313+ boolean outsideUtilizationBound ) {
274314 var scaleDownInterval = conf .get (SCALE_DOWN_INTERVAL );
275315 if (scaleDownInterval .toMillis () <= 0 ) {
276316 // The scale down interval is disable, so don't block scaling.
277- return ParallelismChange .build (newParallelism );
317+ return ParallelismChange .build (newParallelism , outsideUtilizationBound );
278318 }
279319
280320 var now = clock .instant ();
281- var delayedScaleDownInfo = delayedScaleDown .triggerScaleDown (vertex , now , newParallelism );
321+ var windowStartTime = now .minus (scaleDownInterval );
322+ var delayedScaleDownInfo =
323+ delayedScaleDown .triggerScaleDown (
324+ vertex , now , newParallelism , outsideUtilizationBound );
282325
283326 // Never scale down within scale down interval
284- if (now .isBefore (delayedScaleDownInfo .getFirstTriggerTime (). plus ( scaleDownInterval ))) {
327+ if (windowStartTime .isBefore (delayedScaleDownInfo .getFirstTriggerTime ())) {
285328 if (now .equals (delayedScaleDownInfo .getFirstTriggerTime ())) {
286329 LOG .info ("The scale down of {} is delayed by {}." , vertex , scaleDownInterval );
287330 } else {
@@ -293,7 +336,11 @@ private ParallelismChange applyScaleDownInterval(
293336 } else {
294337 // Using the maximum parallelism within the scale down interval window instead of the
295338 // latest parallelism when scaling down
296- return ParallelismChange .build (delayedScaleDownInfo .getMaxRecommendedParallelism ());
339+ var maxRecommendedParallelism =
340+ delayedScaleDownInfo .getMaxRecommendedParallelism (windowStartTime );
341+ return ParallelismChange .build (
342+ maxRecommendedParallelism .getParallelism (),
343+ maxRecommendedParallelism .isOutsideUtilizationBound ());
297344 }
298345 }
299346
0 commit comments