Skip to content

Commit f6bab10

Browse files
author
huyuanfeng
committed
fix comment
1 parent 9a33ac1 commit f6bab10

File tree

2 files changed

+40
-44
lines changed

2 files changed

+40
-44
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -409,13 +409,20 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
409409
return newParallelism;
410410
}
411411

412-
int numKeyGroupsOrPartitions = maxParallelism;
413-
int upperBoundForAlignment;
412+
final int numKeyGroupsOrPartitions;
413+
final int upperBoundForAlignment;
414414
if (numSourcePartitions <= 0) {
415-
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
415+
numKeyGroupsOrPartitions = maxParallelism;
416+
upperBoundForAlignment =
417+
Math.min(
418+
// Optimize the case where newParallelism <= maxParallelism / 2
419+
newParallelism > maxParallelism / 2
420+
? maxParallelism
421+
: maxParallelism / 2,
422+
upperBound);
416423
} else {
417-
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
418424
numKeyGroupsOrPartitions = numSourcePartitions;
425+
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
419426
}
420427

421428
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
@@ -427,44 +434,37 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
427434
}
428435
}
429436

430-
if (numSourcePartitions > 0) {
431-
432-
// When adjust the parallelism after rounding up cannot be evenly divided by source
433-
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
434-
// current
435-
// consumption rate.
436-
int p = newParallelism;
437-
for (; p > 0; p--) {
438-
if (numSourcePartitions / p > numSourcePartitions / newParallelism) {
439-
if (numSourcePartitions % p != 0) {
440-
p++;
441-
}
442-
break;
437+
// When adjust the parallelism after rounding up cannot be evenly divided by
438+
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
439+
// current consumption rate.
440+
int p = newParallelism;
441+
for (; p > 0; p--) {
442+
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
443+
if (numKeyGroupsOrPartitions % p != 0) {
444+
p++;
443445
}
446+
break;
444447
}
445-
446-
p = Math.max(p, parallelismLowerLimit);
447-
var message =
448-
String.format(
449-
SCALE_LIMITED_MESSAGE_FORMAT,
450-
vertex,
451-
newParallelism,
452-
p,
453-
numSourcePartitions,
454-
upperBound,
455-
parallelismLowerLimit);
456-
eventHandler.handleEvent(
457-
context,
458-
AutoScalerEventHandler.Type.Warning,
459-
SCALING_LIMITED,
460-
message,
461-
SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
462-
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
463-
return p;
464448
}
465449

466-
// If parallelism adjustment fails, use originally computed parallelism
467-
return newParallelism;
450+
p = Math.max(p, parallelismLowerLimit);
451+
var message =
452+
String.format(
453+
SCALE_LIMITED_MESSAGE_FORMAT,
454+
vertex,
455+
newParallelism,
456+
p,
457+
numSourcePartitions,
458+
upperBound,
459+
parallelismLowerLimit);
460+
eventHandler.handleEvent(
461+
context,
462+
AutoScalerEventHandler.Type.Warning,
463+
SCALING_LIMITED,
464+
message,
465+
SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
466+
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
467+
return p;
468468
}
469469

470470
@VisibleForTesting

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@ private static List<Collection<ShipStrategy>> adjustmentInputsProvider() {
7474
List.of(ShipStrategy.REBALANCE, ShipStrategy.HASH, ShipStrategy.RESCALE));
7575
}
7676

77-
private static List<Collection<ShipStrategy>> sourceInputsProvider() {
78-
return List.of(List.of());
79-
}
80-
8177
@BeforeEach
8278
public void setup() {
8379
eventCollector = new TestingEventCollector<>();
@@ -444,7 +440,7 @@ public void testParallelismComputationWithLimit(Collection<ShipStrategy> inputSh
444440
eventCollector,
445441
context));
446442
assertEquals(
447-
300,
443+
240,
448444
JobVertexScaler.scale(
449445
vertex,
450446
200,
@@ -457,7 +453,7 @@ public void testParallelismComputationWithLimit(Collection<ShipStrategy> inputSh
457453
eventCollector,
458454
context));
459455
assertEquals(
460-
600,
456+
360,
461457
JobVertexScaler.scale(
462458
vertex,
463459
200,

0 commit comments

Comments
 (0)