Skip to content

Commit d22c463

Browse files
author
huyuanfeng
committed
[FLINK-36527][autoscaler] Refactor JobVertexScaler to implement the parallelism adjustment logic in a separate component
1 parent 670bfcf commit d22c463

File tree

2 files changed

+121
-62
lines changed

2 files changed

+121
-62
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 12 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
5454
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
56-
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
5756
import static org.apache.flink.util.Preconditions.checkArgument;
5857

5958
/** Component responsible for computing vertex parallelism based on the scaling metrics. */
@@ -355,12 +354,9 @@ private boolean detectIneffectiveScaleUp(
355354
* But we limit newParallelism between parallelismLowerLimit and min(parallelismUpperLimit,
356355
* maxParallelism).
357356
*
358-
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
359-
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
360-
* remainder.
361-
*
362-
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
363-
* number of source partitions if a vertex has a known source partition count.
357+
* <p>Also, if we know the number of partitions or key groups corresponding to the current
358+
* vertex, the parallelism will be adjusted accordingly. For specific logic, please refer to
359+
* {@link ParallelismAdjuster}.
364360
*/
365361
@VisibleForTesting
366362
protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
@@ -403,62 +399,16 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
403399
// Apply min/max parallelism
404400
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
405401

406-
var adjustByMaxParallelismOrPartitions =
407-
numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
408-
if (!adjustByMaxParallelismOrPartitions) {
409-
return newParallelism;
410-
}
411-
412-
var numKeyGroupsOrPartitions =
413-
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
414-
var upperBoundForAlignment =
415-
Math.min(
416-
// Optimize the case where newParallelism <= maxParallelism / 2
417-
newParallelism > numKeyGroupsOrPartitions / 2
418-
? numKeyGroupsOrPartitions
419-
: numKeyGroupsOrPartitions / 2,
420-
upperBound);
421-
422-
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
423-
// we try to adjust the parallelism such that it divides
424-
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
425-
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
426-
if (numKeyGroupsOrPartitions % p == 0) {
427-
return p;
428-
}
429-
}
430-
431-
// When adjust the parallelism after rounding up cannot be evenly divided by
432-
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
433-
// current consumption rate.
434-
int p = newParallelism;
435-
for (; p > 0; p--) {
436-
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
437-
if (numKeyGroupsOrPartitions % p != 0) {
438-
p++;
439-
}
440-
break;
441-
}
442-
}
443-
444-
p = Math.max(p, parallelismLowerLimit);
445-
var message =
446-
String.format(
447-
SCALE_LIMITED_MESSAGE_FORMAT,
448-
vertex,
449-
newParallelism,
450-
p,
451-
numKeyGroupsOrPartitions,
452-
upperBound,
453-
parallelismLowerLimit);
454-
eventHandler.handleEvent(
402+
return ParallelismAdjuster.adjust(
403+
vertex,
455404
context,
456-
AutoScalerEventHandler.Type.Warning,
457-
SCALING_LIMITED,
458-
message,
459-
SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
460-
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
461-
return p;
405+
eventHandler,
406+
maxParallelism,
407+
numSourcePartitions,
408+
newParallelism,
409+
upperBound,
410+
parallelismLowerLimit,
411+
inputShipStrategies);
462412
}
463413

464414
@VisibleForTesting
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler;
19+
20+
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
21+
import org.apache.flink.autoscaler.topology.ShipStrategy;
22+
import org.apache.flink.runtime.jobgraph.JobVertexID;
23+
24+
import java.util.Collection;
25+
26+
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
27+
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
28+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
29+
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
30+
31+
/**
32+
* Component responsible adjusts the parallelism of a vertex.
33+
*
34+
* <p>When input vertex {@link ShipStrategy} is {@link ShipStrategy#HASH} or knows the number of
35+
* current partitions of vertex. We hope to adjust the parallelism of the current vertex according
36+
* to the number of key groups or partitions to achieve the goal of evenly distributing data among
37+
* subtasks or maximizing utilization.
38+
*/
39+
public class ParallelismAdjuster {
40+
41+
public static <KEY, Context extends JobAutoScalerContext<KEY>> int adjust(
42+
JobVertexID vertex,
43+
Context context,
44+
AutoScalerEventHandler<KEY, Context> eventHandler,
45+
int maxParallelism,
46+
int numSourcePartitions,
47+
int newParallelism,
48+
int upperBound,
49+
int parallelismLowerLimit,
50+
Collection<ShipStrategy> inputShipStrategies) {
51+
52+
var adjustByMaxParallelismOrPartitions =
53+
numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
54+
if (!adjustByMaxParallelismOrPartitions) {
55+
return newParallelism;
56+
}
57+
58+
var numKeyGroupsOrPartitions =
59+
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
60+
var upperBoundForAlignment =
61+
Math.min(
62+
// Optimize the case where newParallelism <= maxParallelism / 2
63+
newParallelism > numKeyGroupsOrPartitions / 2
64+
? numKeyGroupsOrPartitions
65+
: numKeyGroupsOrPartitions / 2,
66+
upperBound);
67+
68+
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
69+
// we try to adjust the parallelism such that it divides
70+
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
71+
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
72+
if (numKeyGroupsOrPartitions % p == 0) {
73+
return p;
74+
}
75+
}
76+
77+
// When adjust the parallelism after rounding up cannot be evenly divided by
78+
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
79+
// current consumption rate.
80+
int p = newParallelism;
81+
for (; p > 0; p--) {
82+
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
83+
if (numKeyGroupsOrPartitions % p != 0) {
84+
p++;
85+
}
86+
break;
87+
}
88+
}
89+
90+
p = Math.max(p, parallelismLowerLimit);
91+
var message =
92+
String.format(
93+
SCALE_LIMITED_MESSAGE_FORMAT,
94+
vertex,
95+
newParallelism,
96+
p,
97+
numKeyGroupsOrPartitions,
98+
upperBound,
99+
parallelismLowerLimit);
100+
eventHandler.handleEvent(
101+
context,
102+
AutoScalerEventHandler.Type.Warning,
103+
SCALING_LIMITED,
104+
message,
105+
SCALING_LIMITED + vertex + newParallelism,
106+
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
107+
return p;
108+
}
109+
}

0 commit comments

Comments
 (0)