5050import static org .apache .flink .autoscaler .config .AutoScalerOptions .VERTEX_MIN_PARALLELISM ;
5151import static org .apache .flink .autoscaler .metrics .ScalingMetric .EXPECTED_PROCESSING_RATE ;
5252import static org .apache .flink .autoscaler .metrics .ScalingMetric .MAX_PARALLELISM ;
53+ import static org .apache .flink .autoscaler .metrics .ScalingMetric .NUM_SOURCE_PARTITIONS ;
5354import static org .apache .flink .autoscaler .metrics .ScalingMetric .PARALLELISM ;
5455import static org .apache .flink .autoscaler .metrics .ScalingMetric .TRUE_PROCESSING_RATE ;
5556import static org .apache .flink .autoscaler .topology .ShipStrategy .HASH ;
@@ -66,6 +67,14 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
6667 protected static final String INEFFECTIVE_MESSAGE_FORMAT =
6768 "Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s" ;
6869
70+ @ VisibleForTesting protected static final String SCALING_LIMITED = "ScalingLimited" ;
71+
72+ @ VisibleForTesting
73+ protected static final String SCALE_LIMITED_MESSAGE_FORMAT =
74+ "Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). "
75+ + "Scaling limited due to numKeyGroupsOrPartitions : %s,"
76+ + "upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, parallelismLowerLimit: %s." ;
77+
6978 private Clock clock = Clock .system (ZoneId .systemDefault ());
7079
7180 private final AutoScalerEventHandler <KEY , Context > autoScalerEventHandler ;
@@ -193,12 +202,16 @@ public ParallelismChange computeScaleTargetParallelism(
193202
194203 int newParallelism =
195204 scale (
205+ vertex ,
196206 currentParallelism ,
197207 inputShipStrategies ,
208+ (int ) evaluatedMetrics .get (NUM_SOURCE_PARTITIONS ).getCurrent (),
198209 (int ) evaluatedMetrics .get (MAX_PARALLELISM ).getCurrent (),
199210 scaleFactor ,
200211 Math .min (currentParallelism , conf .getInteger (VERTEX_MIN_PARALLELISM )),
201- Math .max (currentParallelism , conf .getInteger (VERTEX_MAX_PARALLELISM )));
212+ Math .max (currentParallelism , conf .getInteger (VERTEX_MAX_PARALLELISM )),
213+ autoScalerEventHandler ,
214+ context );
202215
203216 if (newParallelism == currentParallelism ) {
204217 // Clear delayed scale down request if the new parallelism is equal to
@@ -345,15 +358,22 @@ private boolean detectIneffectiveScaleUp(
345358 * <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
346359 * parallelism for source and keyed vertex such that it divides the maxParallelism without a
347360 * remainder.
361+ *
362+ * <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
363+ * number of source partitions if a vertex has a known source partition count.
348364 */
349365 @ VisibleForTesting
350- protected static int scale (
366+ protected static <KEY , Context extends JobAutoScalerContext <KEY >> int scale (
367+ JobVertexID vertex ,
351368 int currentParallelism ,
352369 Collection <ShipStrategy > inputShipStrategies ,
370+ int numSourcePartitions ,
353371 int maxParallelism ,
354372 double scaleFactor ,
355373 int parallelismLowerLimit ,
356- int parallelismUpperLimit ) {
374+ int parallelismUpperLimit ,
375+ AutoScalerEventHandler <KEY , Context > eventHandler ,
376+ Context context ) {
357377 checkArgument (
358378 parallelismLowerLimit <= parallelismUpperLimit ,
359379 "The parallelism lower limitation must not be greater than the parallelism upper limitation." );
@@ -383,23 +403,62 @@ protected static int scale(
383403 // Apply min/max parallelism
384404 newParallelism = Math .min (Math .max (parallelismLowerLimit , newParallelism ), upperBound );
385405
386- var adjustByMaxParallelism =
387- inputShipStrategies . isEmpty () || inputShipStrategies .contains (HASH );
388- if (!adjustByMaxParallelism ) {
406+ var adjustByMaxParallelismOrPartitions =
407+ numSourcePartitions > 0 || inputShipStrategies .contains (HASH );
408+ if (!adjustByMaxParallelismOrPartitions ) {
389409 return newParallelism ;
390410 }
391411
392- // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to
393- // adjust the parallelism such that it divides the maxParallelism without a remainder
394- // => data is evenly spread across subtasks
395- for (int p = newParallelism ; p <= maxParallelism / 2 && p <= upperBound ; p ++) {
396- if (maxParallelism % p == 0 ) {
412+ var numKeyGroupsOrPartitions =
413+ numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions ;
414+ var upperBoundForAlignment =
415+ Math .min (
416+ // Optimize the case where newParallelism <= maxParallelism / 2
417+ newParallelism > numKeyGroupsOrPartitions / 2
418+ ? numKeyGroupsOrPartitions
419+ : numKeyGroupsOrPartitions / 2 ,
420+ upperBound );
421+
422+ // When the shuffle type of vertex inputs contains keyBy or vertex is a source,
423+ // we try to adjust the parallelism such that it divides
424+ // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
425+ for (int p = newParallelism ; p <= upperBoundForAlignment ; p ++) {
426+ if (numKeyGroupsOrPartitions % p == 0 ) {
397427 return p ;
398428 }
399429 }
400430
401- // If parallelism adjustment fails, use originally computed parallelism
402- return newParallelism ;
431+ // When adjust the parallelism after rounding up cannot be evenly divided by
432+ // numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
433+ // current consumption rate.
434+ int p = newParallelism ;
435+ for (; p > 0 ; p --) {
436+ if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism ) {
437+ if (numKeyGroupsOrPartitions % p != 0 ) {
438+ p ++;
439+ }
440+ break ;
441+ }
442+ }
443+
444+ p = Math .max (p , parallelismLowerLimit );
445+ var message =
446+ String .format (
447+ SCALE_LIMITED_MESSAGE_FORMAT ,
448+ vertex ,
449+ newParallelism ,
450+ p ,
451+ numKeyGroupsOrPartitions ,
452+ upperBound ,
453+ parallelismLowerLimit );
454+ eventHandler .handleEvent (
455+ context ,
456+ AutoScalerEventHandler .Type .Warning ,
457+ SCALING_LIMITED ,
458+ message ,
459+ SCALING_LIMITED + vertex + (scaleFactor * currentParallelism ),
460+ context .getConfiguration ().get (SCALING_EVENT_INTERVAL ));
461+ return p ;
403462 }
404463
405464 @ VisibleForTesting
0 commit comments