@@ -402,7 +402,7 @@ protected static int scale(
402402
403403 // Cap parallelism at either maxParallelism(number of key groups or source partitions) or
404404 // parallelism upper limit
405- int upperBound = Math .min (maxParallelism , parallelismUpperLimit );
405+ final int upperBound = Math .min (maxParallelism , parallelismUpperLimit );
406406
407407 // Apply min/max parallelism
408408 newParallelism = Math .min (Math .max (parallelismLowerLimit , newParallelism ), upperBound );
@@ -423,9 +423,8 @@ protected static int scale(
423423 }
424424
425425 // When the shuffle type of vertex inputs contains keyBy or vertex is a source,
426- // we try to adjust the parallelism such that it divides the adjustableMaxParallelism
427- // without a
428- // remainder => data is evenly spread across subtasks
426+ // we try to adjust the parallelism such that it divides
427+ // the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
429428 for (int p = newParallelism ; p <= adjustableUpperBound ; p ++) {
430429 if (adjustableMaxParallelism % p == 0 ) {
431430 return p ;
@@ -437,32 +436,26 @@ protected static int scale(
437436 // When adjust the parallelism after rounding up cannot be evenly divided by source
438437 // numPartitions, Try to find the smallest parallelism that can satisfy the current
439438 // consumption rate.
440- for (int p = newParallelism ; p > parallelismLowerLimit ; p --) {
441- if (numPartitions / p > numPartitions / newParallelism ) {
442- if (numPartitions % p != 0 ) {
443- p ++;
439+ int finalParallelism = newParallelism ;
440+ for (; finalParallelism > parallelismLowerLimit ; finalParallelism --) {
441+ if (numPartitions / finalParallelism > numPartitions / newParallelism ) {
442+ if (numPartitions % finalParallelism != 0 ) {
443+ finalParallelism ++;
444444 }
445- consumer .accept (
446- String .format (
447- SCALE_LIMITED_MESSAGE_FORMAT ,
448- vertex ,
449- newParallelism ,
450- p ,
451- String .format (
452- "numPartitions : %s,upperBound(maxParallelism or parallelismUpperLimit): %s" ,
453- numPartitions , upperBound )));
454- return p ;
445+ break ;
455446 }
456447 }
457-
458448 consumer .accept (
459449 String .format (
460450 SCALE_LIMITED_MESSAGE_FORMAT ,
461451 vertex ,
462452 newParallelism ,
463- parallelismLowerLimit ,
464- String .format ("parallelismLowerLimit : %s" , parallelismLowerLimit )));
465- return parallelismLowerLimit ;
453+ finalParallelism ,
454+ String .format (
455+ "numPartitions : %s,upperBound(maxParallelism or parallelismUpperLimit): %s, "
456+ + "parallelismLowerLimit: %s." ,
457+ numPartitions , upperBound , parallelismLowerLimit )));
458+ return finalParallelism ;
466459 }
467460
468461 // If parallelism adjustment fails, use originally computed parallelism
0 commit comments