1818package org .apache .flink .autoscaler ;
1919
2020import org .apache .flink .annotation .VisibleForTesting ;
21+ import org .apache .flink .api .java .tuple .Tuple2 ;
2122import org .apache .flink .autoscaler .config .AutoScalerOptions ;
2223import org .apache .flink .autoscaler .event .AutoScalerEventHandler ;
2324import org .apache .flink .autoscaler .metrics .EvaluatedScalingMetric ;
3940import java .util .Collection ;
4041import java .util .Map ;
4142import java .util .Objects ;
43+ import java .util .Optional ;
4244import java .util .SortedMap ;
4345
4446import static org .apache .flink .autoscaler .config .AutoScalerOptions .MAX_SCALE_DOWN_FACTOR ;
5052import static org .apache .flink .autoscaler .config .AutoScalerOptions .VERTEX_MIN_PARALLELISM ;
5153import static org .apache .flink .autoscaler .metrics .ScalingMetric .EXPECTED_PROCESSING_RATE ;
5254import static org .apache .flink .autoscaler .metrics .ScalingMetric .MAX_PARALLELISM ;
55+ import static org .apache .flink .autoscaler .metrics .ScalingMetric .NUM_PARTITIONS ;
5356import static org .apache .flink .autoscaler .metrics .ScalingMetric .PARALLELISM ;
5457import static org .apache .flink .autoscaler .metrics .ScalingMetric .TRUE_PROCESSING_RATE ;
5558import static org .apache .flink .autoscaler .topology .ShipStrategy .HASH ;
@@ -66,6 +69,12 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
6669 protected static final String INEFFECTIVE_MESSAGE_FORMAT =
6770 "Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s" ;
6871
72+ @ VisibleForTesting protected static final String SCALE_LIMITED = "ScalingLimited" ;
73+
74+ @ VisibleForTesting
75+ protected static final String SCALE_LIMITED_MESSAGE_FORMAT =
76+ "Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). Scaling limited due to %s" ;
77+
6978 private Clock clock = Clock .system (ZoneId .systemDefault ());
7079
7180 private final AutoScalerEventHandler <KEY , Context > autoScalerEventHandler ;
@@ -191,16 +200,29 @@ public ParallelismChange computeScaleTargetParallelism(
191200 double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor ;
192201 LOG .debug ("Capped target processing capacity for {} is {}" , vertex , cappedTargetCapacity );
193202
194- int newParallelism =
203+ Tuple2 < Integer , Optional < String >> newParallelism =
195204 scale (
205+ vertex ,
196206 currentParallelism ,
197207 inputShipStrategies ,
208+ (int ) evaluatedMetrics .get (NUM_PARTITIONS ).getCurrent (),
198209 (int ) evaluatedMetrics .get (MAX_PARALLELISM ).getCurrent (),
199210 scaleFactor ,
200211 Math .min (currentParallelism , conf .getInteger (VERTEX_MIN_PARALLELISM )),
201212 Math .max (currentParallelism , conf .getInteger (VERTEX_MAX_PARALLELISM )));
202213
203- if (newParallelism == currentParallelism ) {
214+ newParallelism .f1 .ifPresent (
215+ message -> {
216+ autoScalerEventHandler .handleEvent (
217+ context ,
218+ AutoScalerEventHandler .Type .Warning ,
219+ SCALE_LIMITED ,
220+ message ,
221+ SCALE_LIMITED + vertex + cappedTargetCapacity ,
222+ conf .get (SCALING_EVENT_INTERVAL ));
223+ });
224+
225+ if (newParallelism .f0 == currentParallelism ) {
204226 // Clear delayed scale down request if the new parallelism is equal to
205227 // currentParallelism.
206228 delayedScaleDown .clearVertex (vertex );
@@ -219,7 +241,7 @@ public ParallelismChange computeScaleTargetParallelism(
219241 evaluatedMetrics ,
220242 history ,
221243 currentParallelism ,
222- newParallelism ,
244+ newParallelism . f0 ,
223245 delayedScaleDown );
224246 }
225247
@@ -345,11 +367,16 @@ private boolean detectIneffectiveScaleUp(
345367 * <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
346368 * parallelism for source and keyed vertex such that it divides the maxParallelism without a
347369 * remainder.
370+ *
371+ * <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
372+ * number of partitions if a vertex has a known partition count.
348373 */
349374 @ VisibleForTesting
350- protected static int scale (
375+ protected static Tuple2 <Integer , Optional <String >> scale (
376+ JobVertexID vertex ,
351377 int currentParallelism ,
352378 Collection <ShipStrategy > inputShipStrategies ,
379+ int numPartitions ,
353380 int maxParallelism ,
354381 double scaleFactor ,
355382 int parallelismLowerLimit ,
@@ -378,28 +405,70 @@ protected static int scale(
378405
379406 // Cap parallelism at either maxParallelism(number of key groups or source partitions) or
380407 // parallelism upper limit
381- final int upperBound = Math .min (maxParallelism , parallelismUpperLimit );
408+ int upperBound = Math .min (maxParallelism , parallelismUpperLimit );
382409
383410 // Apply min/max parallelism
384411 newParallelism = Math .min (Math .max (parallelismLowerLimit , newParallelism ), upperBound );
385412
386413 var adjustByMaxParallelism =
387414 inputShipStrategies .isEmpty () || inputShipStrategies .contains (HASH );
388415 if (!adjustByMaxParallelism ) {
389- return newParallelism ;
416+ return Tuple2 . of ( newParallelism , Optional . empty ()) ;
390417 }
391418
392- // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to
393- // adjust the parallelism such that it divides the maxParallelism without a remainder
394- // => data is evenly spread across subtasks
395- for (int p = newParallelism ; p <= maxParallelism / 2 && p <= upperBound ; p ++) {
396- if (maxParallelism % p == 0 ) {
397- return p ;
419+ if (numPartitions <= 0 ) {
420+ // When the shuffle type of vertex inputs contains keyBy or vertex is a source,
421+ // we try to adjust the parallelism such that it divides the maxParallelism without a
422+ // remainder => data is evenly spread across subtasks
423+ for (int p = newParallelism ; p <= maxParallelism / 2 && p <= upperBound ; p ++) {
424+ if (maxParallelism % p == 0 ) {
425+ return Tuple2 .of (p , Optional .empty ());
426+ }
427+ }
428+ // If parallelism adjustment fails, use originally computed parallelism
429+ return Tuple2 .of (newParallelism , Optional .empty ());
430+ } else {
431+
432+ // When we know the numPartitions at a vertex,
433+ // adjust the parallelism such that it divides the numPartitions without a remainder
434+ // => Data is evenly distributed among subtasks
435+ for (int p = newParallelism ; p <= upperBound && p <= numPartitions ; p ++) {
436+ if (numPartitions % p == 0 ) {
437+ return Tuple2 .of (p , Optional .empty ());
438+ }
398439 }
399- }
400440
401- // If parallelism adjustment fails, use originally computed parallelism
402- return newParallelism ;
441+ // When the degree of parallelism after rounding up cannot be evenly divided by source
442+ // PartitionCount, Try to find the smallest parallelism that can satisfy the current
443+ // consumption rate.
444+ for (int p = newParallelism ; p > parallelismLowerLimit ; p --) {
445+ if (numPartitions / p > numPartitions / newParallelism ) {
446+ if (numPartitions % p != 0 ) {
447+ p += 1 ;
448+ }
449+ var message =
450+ String .format (
451+ SCALE_LIMITED_MESSAGE_FORMAT ,
452+ vertex ,
453+ newParallelism ,
454+ p ,
455+ String .format (
456+ "numPartitions : %s,upperBound(maxParallelism or "
457+ + "parallelismUpperLimit): %s" ,
458+ numPartitions , upperBound ));
459+ return Tuple2 .of (p , Optional .of (message ));
460+ }
461+ }
462+ // If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
463+ var message =
464+ String .format (
465+ SCALE_LIMITED_MESSAGE_FORMAT ,
466+ vertex ,
467+ newParallelism ,
468+ parallelismLowerLimit ,
469+ String .format ("parallelismLowerLimit : %s" , parallelismLowerLimit ));
470+ return Tuple2 .of (parallelismLowerLimit , Optional .of (message ));
471+ }
403472 }
404473
405474 @ VisibleForTesting
0 commit comments