4040import java .util .Map ;
4141import java .util .Objects ;
4242import java .util .SortedMap ;
43- import java .util .function .Consumer ;
4443
4544import static org .apache .flink .autoscaler .config .AutoScalerOptions .MAX_SCALE_DOWN_FACTOR ;
4645import static org .apache .flink .autoscaler .config .AutoScalerOptions .MAX_SCALE_UP_FACTOR ;
@@ -209,14 +208,8 @@ public ParallelismChange computeScaleTargetParallelism(
209208 scaleFactor ,
210209 Math .min (currentParallelism , conf .getInteger (VERTEX_MIN_PARALLELISM )),
211210 Math .max (currentParallelism , conf .getInteger (VERTEX_MAX_PARALLELISM )),
212- message ->
213- autoScalerEventHandler .handleEvent (
214- context ,
215- AutoScalerEventHandler .Type .Warning ,
216- SCALING_LIMITED ,
217- message ,
218- SCALING_LIMITED + vertex + cappedTargetCapacity ,
219- conf .get (SCALING_EVENT_INTERVAL )));
211+ autoScalerEventHandler ,
212+ context );
220213
221214 if (newParallelism == currentParallelism ) {
222215 // Clear delayed scale down request if the new parallelism is equal to
@@ -368,7 +361,7 @@ private boolean detectIneffectiveScaleUp(
368361 * number of partitions if a vertex has a known partition count.
369362 */
370363 @ VisibleForTesting
371- protected static int scale (
364+ protected static < KEY , Context extends JobAutoScalerContext < KEY >> int scale (
372365 JobVertexID vertex ,
373366 int currentParallelism ,
374367 Collection <ShipStrategy > inputShipStrategies ,
@@ -377,7 +370,8 @@ protected static int scale(
377370 double scaleFactor ,
378371 int parallelismLowerLimit ,
379372 int parallelismUpperLimit ,
380- Consumer <String > consumer ) {
373+ AutoScalerEventHandler <KEY , Context > eventHandler ,
374+ Context context ) {
381375 checkArgument (
382376 parallelismLowerLimit <= parallelismUpperLimit ,
383377 "The parallelism lower limitation must not be greater than the parallelism upper limitation." );
@@ -402,7 +396,7 @@ protected static int scale(
402396
403397 // Cap parallelism at either maxParallelism(number of key groups or source partitions) or
404398 // parallelism upper limit
405- int upperBound = Math .min (maxParallelism , parallelismUpperLimit );
399+ final int upperBound = Math .min (maxParallelism , parallelismUpperLimit );
406400
407401 // Apply min/max parallelism
408402 newParallelism = Math .min (Math .max (parallelismLowerLimit , newParallelism ), upperBound );
@@ -413,21 +407,20 @@ protected static int scale(
413407 return newParallelism ;
414408 }
415409
416- int adjustableMaxParallelism = maxParallelism ;
417- int adjustableUpperBound ;
410+ int numKeyGroupsOrPartitions = maxParallelism ;
411+ int upperBoundForAlignment ;
418412 if (numPartitions <= 0 ) {
419- adjustableUpperBound = Math .min (maxParallelism / 2 , upperBound );
413+ upperBoundForAlignment = Math .min (maxParallelism / 2 , upperBound );
420414 } else {
421- adjustableUpperBound = Math .min (numPartitions , upperBound );
422- adjustableMaxParallelism = numPartitions ;
415+ upperBoundForAlignment = Math .min (numPartitions , upperBound );
416+ numKeyGroupsOrPartitions = numPartitions ;
423417 }
424418
425419 // When the shuffle type of vertex inputs contains keyBy or vertex is a source,
426- // we try to adjust the parallelism such that it divides the adjustableMaxParallelism
427- // without a
428- // remainder => data is evenly spread across subtasks
429- for (int p = newParallelism ; p <= adjustableUpperBound ; p ++) {
430- if (adjustableMaxParallelism % p == 0 ) {
420+ // we try to adjust the parallelism such that it divides
421+ // the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
422+ for (int p = newParallelism ; p <= upperBoundForAlignment ; p ++) {
423+ if (numKeyGroupsOrPartitions % p == 0 ) {
431424 return p ;
432425 }
433426 }
@@ -437,32 +430,36 @@ protected static int scale(
437430 // When adjust the parallelism after rounding up cannot be evenly divided by source
438431 // numPartitions, Try to find the smallest parallelism that can satisfy the current
439432 // consumption rate.
440- for (int p = newParallelism ; p > parallelismLowerLimit ; p --) {
433+ int p = newParallelism ;
434+ for (; p > 0 ; p --) {
441435 if (numPartitions / p > numPartitions / newParallelism ) {
442436 if (numPartitions % p != 0 ) {
443437 p ++;
444438 }
445- consumer .accept (
446- String .format (
447- SCALE_LIMITED_MESSAGE_FORMAT ,
448- vertex ,
449- newParallelism ,
450- p ,
451- String .format (
452- "numPartitions : %s,upperBound(maxParallelism or parallelismUpperLimit): %s" ,
453- numPartitions , upperBound )));
454- return p ;
439+ break ;
455440 }
456441 }
457442
458- consumer .accept (
443+ p = Math .max (p , parallelismLowerLimit );
444+
445+ var message =
459446 String .format (
460447 SCALE_LIMITED_MESSAGE_FORMAT ,
461448 vertex ,
462449 newParallelism ,
463- parallelismLowerLimit ,
464- String .format ("parallelismLowerLimit : %s" , parallelismLowerLimit )));
465- return parallelismLowerLimit ;
450+ p ,
451+ String .format (
452+ "numPartitions : %s,upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, "
453+ + "parallelismLowerLimit: %s." ,
454+ numPartitions , upperBound , parallelismLowerLimit ));
455+ eventHandler .handleEvent (
456+ context ,
457+ AutoScalerEventHandler .Type .Warning ,
458+ SCALING_LIMITED ,
459+ message ,
460+ SCALING_LIMITED + vertex + (scaleFactor * currentParallelism ),
461+ context .getConfiguration ().get (SCALING_EVENT_INTERVAL ));
462+ return p ;
466463 }
467464
468465 // If parallelism adjustment fails, use originally computed parallelism
0 commit comments