Skip to content

Commit 991ba9d

Browse files
author
huyuanfeng
committed
fixed
1 parent f6bab10 commit 991ba9d

File tree

3 files changed

+12
-17
lines changed

3 files changed

+12
-17
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -409,25 +409,19 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
409409
return newParallelism;
410410
}
411411

412-
final int numKeyGroupsOrPartitions;
413-
final int upperBoundForAlignment;
414-
if (numSourcePartitions <= 0) {
415-
numKeyGroupsOrPartitions = maxParallelism;
416-
upperBoundForAlignment =
417-
Math.min(
418-
// Optimize the case where newParallelism <= maxParallelism / 2
419-
newParallelism > maxParallelism / 2
420-
? maxParallelism
421-
: maxParallelism / 2,
422-
upperBound);
423-
} else {
424-
numKeyGroupsOrPartitions = numSourcePartitions;
425-
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
426-
}
412+
var numKeyGroupsOrPartitions =
413+
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
414+
var upperBoundForAlignment =
415+
Math.min(
416+
// Optimize the case where newParallelism <= maxParallelism / 2
417+
newParallelism > numKeyGroupsOrPartitions / 2
418+
? numKeyGroupsOrPartitions
419+
: numKeyGroupsOrPartitions / 2,
420+
upperBound);
427421

428422
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
429423
// we try to adjust the parallelism such that it divides
430-
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
424+
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
431425
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
432426
if (numKeyGroupsOrPartitions % p == 0) {
433427
return p;

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public enum ScalingMetric {
5656

5757
/** Source vertex partition count. */
5858
NUM_SOURCE_PARTITIONS(false),
59+
5960
/** Upper boundary of the target data rate range. */
6061
SCALE_UP_RATE_THRESHOLD(false),
6162

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ public void testNumPartitionsAdjustment() {
958958
List.of(),
959959
15,
960960
128,
961-
0.8,
961+
1.2,
962962
minParallelism,
963963
maxParallelism,
964964
eventCollector,

0 commit comments

Comments
 (0)