@@ -409,13 +409,20 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
409409 return newParallelism ;
410410 }
411411
412- int numKeyGroupsOrPartitions = maxParallelism ;
413- int upperBoundForAlignment ;
412+ final int numKeyGroupsOrPartitions ;
413+ final int upperBoundForAlignment ;
414414 if (numSourcePartitions <= 0 ) {
415- upperBoundForAlignment = Math .min (maxParallelism / 2 , upperBound );
415+ numKeyGroupsOrPartitions = maxParallelism ;
416+ upperBoundForAlignment =
417+ Math .min (
418+ // Optimize the case where newParallelism <= maxParallelism / 2
419+ newParallelism > maxParallelism / 2
420+ ? maxParallelism
421+ : maxParallelism / 2 ,
422+ upperBound );
416423 } else {
417- upperBoundForAlignment = Math .min (numSourcePartitions , upperBound );
418424 numKeyGroupsOrPartitions = numSourcePartitions ;
425+ upperBoundForAlignment = Math .min (numSourcePartitions , upperBound );
419426 }
420427
421428 // When the shuffle type of vertex inputs contains keyBy or vertex is a source,
@@ -427,44 +434,37 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
427434 }
428435 }
429436
430- if (numSourcePartitions > 0 ) {
431-
432- // When adjust the parallelism after rounding up cannot be evenly divided by source
433- // numSourcePartitions, Try to find the smallest parallelism that can satisfy the
434- // current
435- // consumption rate.
436- int p = newParallelism ;
437- for (; p > 0 ; p --) {
438- if (numSourcePartitions / p > numSourcePartitions / newParallelism ) {
439- if (numSourcePartitions % p != 0 ) {
440- p ++;
441- }
442- break ;
437+ // When adjust the parallelism after rounding up cannot be evenly divided by
438+ // numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
439+ // current consumption rate.
440+ int p = newParallelism ;
441+ for (; p > 0 ; p --) {
442+ if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism ) {
443+ if (numKeyGroupsOrPartitions % p != 0 ) {
444+ p ++;
443445 }
446+ break ;
444447 }
445-
446- p = Math .max (p , parallelismLowerLimit );
447- var message =
448- String .format (
449- SCALE_LIMITED_MESSAGE_FORMAT ,
450- vertex ,
451- newParallelism ,
452- p ,
453- numSourcePartitions ,
454- upperBound ,
455- parallelismLowerLimit );
456- eventHandler .handleEvent (
457- context ,
458- AutoScalerEventHandler .Type .Warning ,
459- SCALING_LIMITED ,
460- message ,
461- SCALING_LIMITED + vertex + (scaleFactor * currentParallelism ),
462- context .getConfiguration ().get (SCALING_EVENT_INTERVAL ));
463- return p ;
464448 }
465449
466- // If parallelism adjustment fails, use originally computed parallelism
467- return newParallelism ;
450+ p = Math .max (p , parallelismLowerLimit );
451+ var message =
452+ String .format (
453+ SCALE_LIMITED_MESSAGE_FORMAT ,
454+ vertex ,
455+ newParallelism ,
456+ p ,
457+ numSourcePartitions ,
458+ upperBound ,
459+ parallelismLowerLimit );
460+ eventHandler .handleEvent (
461+ context ,
462+ AutoScalerEventHandler .Type .Warning ,
463+ SCALING_LIMITED ,
464+ message ,
465+ SCALING_LIMITED + vertex + (scaleFactor * currentParallelism ),
466+ context .getConfiguration ().get (SCALING_EVENT_INTERVAL ));
467+ return p ;
468468 }
469469
470470 @ VisibleForTesting
0 commit comments