Skip to content

Commit f87d50d

Browse files
author
huyuanfeng
committed
[FLINK-36527][autoscaler] Refactor JobVertexScaler to implement the parallelism adjustment logic in a separate component
1 parent 670bfcf commit f87d50d

File tree

2 files changed

+113
-52
lines changed

2 files changed

+113
-52
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 10 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,9 @@ private boolean detectIneffectiveScaleUp(
355355
* But we limit newParallelism between parallelismLowerLimit and min(parallelismUpperLimit,
356356
* maxParallelism).
357357
*
358-
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
359-
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
360-
* remainder.
361-
*
362-
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
363-
* number of source partitions if a vertex has a known source partition count.
358+
* <p>Also, if we know the number of partitions or key groups corresponding to the current
359+
* vertex, the parallelism will be adjusted accordingly. For specific logic, please refer to
360+
* {@link ParallelismAdjuster}.
364361
*/
365362
@VisibleForTesting
366363
protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
@@ -411,54 +408,15 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
411408

412409
var numKeyGroupsOrPartitions =
413410
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
414-
var upperBoundForAlignment =
415-
Math.min(
416-
// Optimize the case where newParallelism <= maxParallelism / 2
417-
newParallelism > numKeyGroupsOrPartitions / 2
418-
? numKeyGroupsOrPartitions
419-
: numKeyGroupsOrPartitions / 2,
420-
upperBound);
421-
422-
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
423-
// we try to adjust the parallelism such that it divides
424-
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
425-
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
426-
if (numKeyGroupsOrPartitions % p == 0) {
427-
return p;
428-
}
429-
}
430-
431-
// When adjust the parallelism after rounding up cannot be evenly divided by
432-
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
433-
// current consumption rate.
434-
int p = newParallelism;
435-
for (; p > 0; p--) {
436-
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
437-
if (numKeyGroupsOrPartitions % p != 0) {
438-
p++;
439-
}
440-
break;
441-
}
442-
}
443411

444-
p = Math.max(p, parallelismLowerLimit);
445-
var message =
446-
String.format(
447-
SCALE_LIMITED_MESSAGE_FORMAT,
448-
vertex,
449-
newParallelism,
450-
p,
451-
numKeyGroupsOrPartitions,
452-
upperBound,
453-
parallelismLowerLimit);
454-
eventHandler.handleEvent(
412+
return ParallelismAdjuster.adjustViaNumKeyGroupsOrPartitions(
413+
vertex,
455414
context,
456-
AutoScalerEventHandler.Type.Warning,
457-
SCALING_LIMITED,
458-
message,
459-
SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
460-
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
461-
return p;
415+
eventHandler,
416+
numKeyGroupsOrPartitions,
417+
newParallelism,
418+
upperBound,
419+
parallelismLowerLimit);
462420
}
463421

464422
@VisibleForTesting
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler;
19+
20+
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
21+
import org.apache.flink.autoscaler.topology.ShipStrategy;
22+
import org.apache.flink.runtime.jobgraph.JobVertexID;
23+
24+
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
25+
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
26+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
27+
28+
/**
29+
* Component responsible adjusts the parallelism of a vertex.
30+
*
31+
* <p>When input vertex {@link ShipStrategy} is {@link ShipStrategy#HASH} or knows the number of
32+
* current partitions of vertex. We hope to adjust the parallelism of the current vertex according
33+
* to the number of key groups or partitions to achieve the goal of evenly distributing data among
34+
* subtasks or maximizing utilization.
35+
*/
36+
public class ParallelismAdjuster {
37+
38+
public static <KEY, Context extends JobAutoScalerContext<KEY>>
39+
int adjustViaNumKeyGroupsOrPartitions(
40+
JobVertexID vertex,
41+
Context context,
42+
AutoScalerEventHandler<KEY, Context> eventHandler,
43+
int numKeyGroupsOrPartitions,
44+
int newParallelism,
45+
int upperBound,
46+
int parallelismLowerLimit) {
47+
var upperBoundForAlignment =
48+
Math.min(
49+
// Optimize the case where newParallelism <= maxParallelism / 2
50+
newParallelism > numKeyGroupsOrPartitions / 2
51+
? numKeyGroupsOrPartitions
52+
: numKeyGroupsOrPartitions / 2,
53+
upperBound);
54+
55+
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
56+
// we try to adjust the parallelism such that it divides
57+
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
58+
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
59+
if (numKeyGroupsOrPartitions % p == 0) {
60+
return p;
61+
}
62+
}
63+
64+
// When adjusting the parallelism after rounding up cannot
65+
// find the right parallelism to meet requirements,
66+
// Try to find the smallest parallelism that can satisfy the current consumption rate.
67+
int p =
68+
calculateMinimumParallelism(
69+
numKeyGroupsOrPartitions, newParallelism, parallelismLowerLimit);
70+
var message =
71+
String.format(
72+
SCALE_LIMITED_MESSAGE_FORMAT,
73+
vertex,
74+
newParallelism,
75+
p,
76+
numKeyGroupsOrPartitions,
77+
upperBound,
78+
parallelismLowerLimit);
79+
eventHandler.handleEvent(
80+
context,
81+
AutoScalerEventHandler.Type.Warning,
82+
SCALING_LIMITED,
83+
message,
84+
SCALING_LIMITED + vertex + newParallelism,
85+
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
86+
return p;
87+
}
88+
89+
private static int calculateMinimumParallelism(
90+
int numKeyGroupsOrPartitions, int newParallelism, int parallelismLowerLimit) {
91+
int p = newParallelism;
92+
for (; p > 0; p--) {
93+
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
94+
if (numKeyGroupsOrPartitions % p != 0) {
95+
p++;
96+
}
97+
break;
98+
}
99+
}
100+
p = Math.max(p, parallelismLowerLimit);
101+
return p;
102+
}
103+
}

0 commit comments

Comments
 (0)