1818package org .apache .flink .autoscaler ;
1919
2020import org .apache .flink .annotation .VisibleForTesting ;
21- import org .apache .flink .api .java .tuple .Tuple2 ;
2221import org .apache .flink .autoscaler .config .AutoScalerOptions ;
2322import org .apache .flink .autoscaler .event .AutoScalerEventHandler ;
2423import org .apache .flink .autoscaler .metrics .EvaluatedScalingMetric ;
4039import java .util .Collection ;
4140import java .util .Map ;
4241import java .util .Objects ;
43- import java .util .Optional ;
4442import java .util .SortedMap ;
43+ import java .util .function .Consumer ;
4544
4645import static org .apache .flink .autoscaler .config .AutoScalerOptions .MAX_SCALE_DOWN_FACTOR ;
4746import static org .apache .flink .autoscaler .config .AutoScalerOptions .MAX_SCALE_UP_FACTOR ;
@@ -69,7 +68,7 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
6968 protected static final String INEFFECTIVE_MESSAGE_FORMAT =
7069 "Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s" ;
7170
72- @ VisibleForTesting protected static final String SCALE_LIMITED = "ScalingLimited" ;
71+ @ VisibleForTesting protected static final String SCALING_LIMITED = "ScalingLimited" ;
7372
7473 @ VisibleForTesting
7574 protected static final String SCALE_LIMITED_MESSAGE_FORMAT =
@@ -200,7 +199,7 @@ public ParallelismChange computeScaleTargetParallelism(
200199 double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor ;
201200 LOG .debug ("Capped target processing capacity for {} is {}" , vertex , cappedTargetCapacity );
202201
203- Tuple2 < Integer , Optional < String >> newParallelism =
202+ int newParallelism =
204203 scale (
205204 vertex ,
206205 currentParallelism ,
@@ -209,20 +208,17 @@ public ParallelismChange computeScaleTargetParallelism(
209208 (int ) evaluatedMetrics .get (MAX_PARALLELISM ).getCurrent (),
210209 scaleFactor ,
211210 Math .min (currentParallelism , conf .getInteger (VERTEX_MIN_PARALLELISM )),
212- Math .max (currentParallelism , conf .getInteger (VERTEX_MAX_PARALLELISM )));
213-
214- newParallelism .f1 .ifPresent (
215- message -> {
216- autoScalerEventHandler .handleEvent (
217- context ,
218- AutoScalerEventHandler .Type .Warning ,
219- SCALE_LIMITED ,
220- message ,
221- SCALE_LIMITED + vertex + cappedTargetCapacity ,
222- conf .get (SCALING_EVENT_INTERVAL ));
223- });
224-
225- if (newParallelism .f0 == currentParallelism ) {
211+ Math .max (currentParallelism , conf .getInteger (VERTEX_MAX_PARALLELISM )),
212+ message ->
213+ autoScalerEventHandler .handleEvent (
214+ context ,
215+ AutoScalerEventHandler .Type .Warning ,
216+ SCALING_LIMITED ,
217+ message ,
218+ SCALING_LIMITED + vertex + cappedTargetCapacity ,
219+ conf .get (SCALING_EVENT_INTERVAL )));
220+
221+ if (newParallelism == currentParallelism ) {
226222 // Clear delayed scale down request if the new parallelism is equal to
227223 // currentParallelism.
228224 delayedScaleDown .clearVertex (vertex );
@@ -241,7 +237,7 @@ public ParallelismChange computeScaleTargetParallelism(
241237 evaluatedMetrics ,
242238 history ,
243239 currentParallelism ,
244- newParallelism . f0 ,
240+ newParallelism ,
245241 delayedScaleDown );
246242 }
247243
@@ -372,15 +368,16 @@ private boolean detectIneffectiveScaleUp(
372368 * number of partitions if a vertex has a known partition count.
373369 */
374370 @ VisibleForTesting
375- protected static Tuple2 < Integer , Optional < String >> scale (
371+ protected static int scale (
376372 JobVertexID vertex ,
377373 int currentParallelism ,
378374 Collection <ShipStrategy > inputShipStrategies ,
379375 int numPartitions ,
380376 int maxParallelism ,
381377 double scaleFactor ,
382378 int parallelismLowerLimit ,
383- int parallelismUpperLimit ) {
379+ int parallelismUpperLimit ,
380+ Consumer <String > consumer ) {
384381 checkArgument (
385382 parallelismLowerLimit <= parallelismUpperLimit ,
386383 "The parallelism lower limitation must not be greater than the parallelism upper limitation." );
@@ -413,62 +410,63 @@ protected static Tuple2<Integer, Optional<String>> scale(
413410 var adjustByMaxParallelism =
414411 inputShipStrategies .isEmpty () || inputShipStrategies .contains (HASH );
415412 if (!adjustByMaxParallelism ) {
416- return Tuple2 . of ( newParallelism , Optional . empty ()) ;
413+ return newParallelism ;
417414 }
418415
416+ int adjustableMaxParallelism = maxParallelism ;
417+ int adjustableUpperBound ;
419418 if (numPartitions <= 0 ) {
420- // When the shuffle type of vertex inputs contains keyBy or vertex is a source,
421- // we try to adjust the parallelism such that it divides the maxParallelism without a
422- // remainder => data is evenly spread across subtasks
423- for (int p = newParallelism ; p <= maxParallelism / 2 && p <= upperBound ; p ++) {
424- if (maxParallelism % p == 0 ) {
425- return Tuple2 .of (p , Optional .empty ());
426- }
427- }
428- // If parallelism adjustment fails, use originally computed parallelism
429- return Tuple2 .of (newParallelism , Optional .empty ());
419+ adjustableUpperBound = Math .min (maxParallelism / 2 , upperBound );
430420 } else {
421+ adjustableUpperBound = Math .min (numPartitions , upperBound );
422+ adjustableMaxParallelism = numPartitions ;
423+ }
431424
432- // When we know the numPartitions at a vertex ,
433- // adjust the parallelism such that it divides the numPartitions without a remainder
434- // => Data is evenly distributed among subtasks
435- for ( int p = newParallelism ; p <= upperBound && p <= numPartitions ; p ++) {
436- if ( numPartitions % p == 0 ) {
437- return Tuple2 . of ( p , Optional . empty ());
438- }
425+ // When the shuffle type of vertex inputs contains keyBy or vertex is a source ,
426+ // we try to adjust the parallelism such that it divides the adjustableMaxParallelism
427+ // without a
428+ // remainder => data is evenly spread across subtasks
429+ for ( int p = newParallelism ; p <= adjustableUpperBound ; p ++ ) {
430+ if ( adjustableMaxParallelism % p == 0 ) {
431+ return p ;
439432 }
433+ }
434+
435+ if (numPartitions > 0 ) {
440436
441- // When the degree of parallelism after rounding up cannot be evenly divided by source
442- // PartitionCount , Try to find the smallest parallelism that can satisfy the current
437+ // When adjust the parallelism after rounding up cannot be evenly divided by source
438+ // numPartitions , Try to find the smallest parallelism that can satisfy the current
443439 // consumption rate.
444440 for (int p = newParallelism ; p > parallelismLowerLimit ; p --) {
445441 if (numPartitions / p > numPartitions / newParallelism ) {
446- if (numPartitions % p != 0 ) {
447- p += 1 ;
442+ if (maxParallelism % p != 0 ) {
443+ p ++ ;
448444 }
449- var message =
445+ consumer . accept (
450446 String .format (
451447 SCALE_LIMITED_MESSAGE_FORMAT ,
452448 vertex ,
453449 newParallelism ,
454450 p ,
455451 String .format (
456- "numPartitions : %s,upperBound(maxParallelism or "
457- + "parallelismUpperLimit): %s" ,
458- numPartitions , upperBound ));
459- return Tuple2 .of (p , Optional .of (message ));
452+ "numPartitions : %s,upperBound(maxParallelism or upperBound): %s" ,
453+ numPartitions , upperBound )));
454+ return p ;
460455 }
461456 }
462- // If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
463- var message =
457+
458+ consumer . accept (
464459 String .format (
465460 SCALE_LIMITED_MESSAGE_FORMAT ,
466461 vertex ,
467462 newParallelism ,
468463 parallelismLowerLimit ,
469- String .format ("parallelismLowerLimit : %s" , parallelismLowerLimit ));
470- return Tuple2 . of ( parallelismLowerLimit , Optional . of ( message )) ;
464+ String .format ("parallelismLowerLimit : %s" , parallelismLowerLimit ))) ;
465+ return parallelismLowerLimit ;
471466 }
467+
468+ // If parallelism adjustment fails, use originally computed parallelism
469+ return newParallelism ;
472470 }
473471
474472 @ VisibleForTesting
0 commit comments