Skip to content

Commit 9419df9

Browse files
author
huyuanfeng
committed
optimization
1 parent d2a68eb commit 9419df9

File tree

4 files changed

+88
-70
lines changed

4 files changed

+88
-70
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
5454
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
56-
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
5756
import static org.apache.flink.util.Preconditions.checkArgument;
5857

5958
/** Component responsible for computing vertex parallelism based on the scaling metrics. */
@@ -355,12 +354,9 @@ private boolean detectIneffectiveScaleUp(
355354
* But we limit newParallelism between parallelismLowerLimit and min(parallelismUpperLimit,
356355
* maxParallelism).
357356
*
358-
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
359-
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
360-
* remainder.
361-
*
362-
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
363-
* number of source partitions if a vertex has a known source partition count.
357+
* <p>Also, if we know the number of partitions or key groups corresponding to the current
358+
* vertex, the degree of parallelism will be adjusted accordingly. For specific logic, please
359+
* refer to {@link ParallelismAdjuster}.
364360
*/
365361
@VisibleForTesting
366362
protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
@@ -403,23 +399,16 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
403399
// Apply min/max parallelism
404400
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
405401

406-
var adjustByMaxParallelismOrPartitions =
407-
numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
408-
if (!adjustByMaxParallelismOrPartitions) {
409-
return newParallelism;
410-
}
411-
412-
newParallelism =
413-
NumKeyGroupsOrPartitionsParallelismAdjuster.adjust(
414-
vertex,
415-
context,
416-
eventHandler,
417-
maxParallelism,
418-
numSourcePartitions,
419-
newParallelism,
420-
upperBound,
421-
parallelismLowerLimit);
422-
return newParallelism;
402+
return ParallelismAdjuster.adjust(
403+
vertex,
404+
context,
405+
eventHandler,
406+
maxParallelism,
407+
numSourcePartitions,
408+
newParallelism,
409+
upperBound,
410+
parallelismLowerLimit,
411+
inputShipStrategies);
423412
}
424413

425414
@VisibleForTesting
Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,29 @@
1919

2020
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2121
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
22+
import org.apache.flink.autoscaler.topology.ShipStrategy;
2223
import org.apache.flink.configuration.DescribedEnum;
2324
import org.apache.flink.configuration.description.InlineElement;
2425
import org.apache.flink.runtime.jobgraph.JobVertexID;
2526

27+
import java.util.Collection;
28+
2629
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
2730
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
31+
import static org.apache.flink.autoscaler.ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
2832
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
33+
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
2934
import static org.apache.flink.configuration.description.TextElement.text;
3035

3136
/**
32-
* Component responsible adjusts the parallelism of a vertex that knows the number of partitions or
33-
* a vertex whose upstream shuffle is key by.
37+
* Component responsible adjusts the parallelism of a vertex.
38+
*
39+
* <p>When input vertex {@link ShipStrategy} is {@link ShipStrategy#HASH} or knows the number of
40+
* current partitions of vertex. We hope to adjust the parallelism of the current vertex according
41+
* to the number of key groups or partitions to achieve the goal of evenly distributing data among
42+
* subtasks or maximizing utilization.
3443
*/
35-
public class NumKeyGroupsOrPartitionsParallelismAdjuster {
44+
public class ParallelismAdjuster {
3645

3746
public static <KEY, Context extends JobAutoScalerContext<KEY>> int adjust(
3847
JobVertexID vertex,
@@ -42,12 +51,17 @@ public static <KEY, Context extends JobAutoScalerContext<KEY>> int adjust(
4251
int numSourcePartitions,
4352
int newParallelism,
4453
int upperBound,
45-
int parallelismLowerLimit) {
46-
54+
int parallelismLowerLimit,
55+
Collection<ShipStrategy> inputShipStrategies) {
56+
var adjustByMaxParallelismOrPartitions =
57+
numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
58+
if (!adjustByMaxParallelismOrPartitions) {
59+
return newParallelism;
60+
}
4761
var numKeyGroupsOrPartitions =
4862
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
4963

50-
Mode mode =
64+
KeyGroupOrPartitionsAdjustMode mode =
5165
context.getConfiguration()
5266
.get(AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
5367

@@ -61,7 +75,7 @@ public static <KEY, Context extends JobAutoScalerContext<KEY>> int adjust(
6175
||
6276
// When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism
6377
// that can satisfy the current consumption rate.
64-
(mode == Mode.MAXIMIZE_UTILISATION
78+
(mode == MAXIMIZE_UTILISATION
6579
&& numKeyGroupsOrPartitions / p
6680
< numKeyGroupsOrPartitions / newParallelism)) {
6781
return p;
@@ -108,8 +122,8 @@ private static int calculateMinimumParallelism(
108122
return p;
109123
}
110124

111-
/** The mode of the parallelism adjustment. */
112-
public enum Mode implements DescribedEnum {
125+
/** The mode of the key group or parallelism adjustment. */
126+
public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
113127
DEFAULT(
114128
"This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks"
115129
+ ". It is particularly effective for source vertices that are aware of partition counts or vertices after "
@@ -122,7 +136,7 @@ public enum Mode implements DescribedEnum {
122136

123137
private final InlineElement description;
124138

125-
Mode(String description) {
139+
KeyGroupOrPartitionsAdjustMode(String description) {
126140
this.description = text(description);
127141
}
128142

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.config;
1919

20-
import org.apache.flink.autoscaler.NumKeyGroupsOrPartitionsParallelismAdjuster;
20+
import org.apache.flink.autoscaler.ParallelismAdjuster;
2121
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2222
import org.apache.flink.configuration.ConfigOption;
2323
import org.apache.flink.configuration.ConfigOptions;
@@ -353,11 +353,12 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
353353
.withDescription(
354354
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");
355355

356-
public static final ConfigOption<NumKeyGroupsOrPartitionsParallelismAdjuster.Mode>
356+
public static final ConfigOption<ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode>
357357
SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
358358
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
359-
.enumType(NumKeyGroupsOrPartitionsParallelismAdjuster.Mode.class)
360-
.defaultValue(NumKeyGroupsOrPartitionsParallelismAdjuster.Mode.DEFAULT)
359+
.enumType(ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.class)
360+
.defaultValue(
361+
ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.DEFAULT)
361362
.withFallbackKeys(
362363
oldOperatorConfigKey(
363364
"scaling.key-group.partitions.adjust.mode"))

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,8 @@ public void testParallelismComputation() {
323323
@MethodSource("adjustmentInputsProvider")
324324
public void testParallelismComputationWithAdjustment(
325325
Collection<ShipStrategy> inputShipStrategies) {
326-
final int minParallelism = 1;
327-
final int maxParallelism = Integer.MAX_VALUE;
326+
final int parallelismLowerLimit = 1;
327+
final int parallelismUpperLimit = Integer.MAX_VALUE;
328328
final var vertex = new JobVertexID();
329329

330330
assertEquals(
@@ -336,8 +336,8 @@ public void testParallelismComputationWithAdjustment(
336336
0,
337337
36,
338338
0.8,
339-
minParallelism,
340-
maxParallelism,
339+
parallelismLowerLimit,
340+
parallelismUpperLimit,
341341
eventCollector,
342342
context));
343343
assertEquals(
@@ -349,8 +349,8 @@ public void testParallelismComputationWithAdjustment(
349349
0,
350350
128,
351351
1.5,
352-
minParallelism,
353-
maxParallelism,
352+
parallelismLowerLimit,
353+
parallelismUpperLimit,
354354
eventCollector,
355355
context));
356356
assertEquals(
@@ -362,8 +362,8 @@ public void testParallelismComputationWithAdjustment(
362362
0,
363363
720,
364364
1.3,
365-
minParallelism,
366-
maxParallelism,
365+
parallelismLowerLimit,
366+
parallelismUpperLimit,
367367
eventCollector,
368368
context));
369369
assertEquals(
@@ -375,39 +375,44 @@ public void testParallelismComputationWithAdjustment(
375375
0,
376376
720,
377377
Integer.MAX_VALUE,
378-
minParallelism,
379-
maxParallelism,
378+
parallelismLowerLimit,
379+
parallelismUpperLimit,
380380
eventCollector,
381381
context));
382382

383+
int maxParallelism = 128;
384+
double scaleFactor = 2.5;
385+
int currentParallelism = 10;
386+
int expectedEvenly = 32;
387+
int expectedMaximumUtilization = 26;
383388
assertEquals(
384-
32,
389+
expectedEvenly,
385390
JobVertexScaler.scale(
386391
vertex,
387-
10,
392+
currentParallelism,
388393
inputShipStrategies,
389394
0,
390-
128,
391-
2.5,
392-
minParallelism,
393395
maxParallelism,
396+
scaleFactor,
397+
parallelismLowerLimit,
398+
parallelismUpperLimit,
394399
eventCollector,
395400
context));
396401

397402
Configuration conf = context.getConfiguration();
398403
conf.set(
399404
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
400-
NumKeyGroupsOrPartitionsParallelismAdjuster.Mode.MAXIMIZE_UTILISATION);
405+
ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
401406
assertEquals(
402-
26,
407+
expectedMaximumUtilization,
403408
JobVertexScaler.scale(
404409
vertex,
405-
10,
410+
currentParallelism,
406411
inputShipStrategies,
407412
0,
408-
128,
409-
2.5,
410-
minParallelism,
413+
maxParallelism,
414+
scaleFactor,
415+
parallelismLowerLimit,
411416
maxParallelism,
412417
eventCollector,
413418
context));
@@ -1037,15 +1042,23 @@ public void testNumPartitionsAdjustment() {
10371042
eventCollector,
10381043
context));
10391044

1045+
int partition = 199;
1046+
double scaleFactor = 4;
1047+
int currentParallelism = 24;
1048+
int expectedEvenly = 199;
1049+
// At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
1050+
// one subtask consumes one partition.
1051+
int expectedMaximumUtilization = 100;
1052+
10401053
assertEquals(
1041-
199,
1054+
expectedEvenly,
10421055
JobVertexScaler.scale(
10431056
vertex,
1044-
24,
1057+
currentParallelism,
10451058
List.of(),
1046-
199,
1047-
256,
1048-
4,
1059+
partition,
1060+
parallelismUpperLimit,
1061+
scaleFactor,
10491062
parallelismLowerLimit,
10501063
parallelismUpperLimit,
10511064
eventCollector,
@@ -1054,16 +1067,17 @@ public void testNumPartitionsAdjustment() {
10541067
Configuration conf = context.getConfiguration();
10551068
conf.set(
10561069
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
1057-
NumKeyGroupsOrPartitionsParallelismAdjuster.Mode.MAXIMIZE_UTILISATION);
1070+
ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
1071+
10581072
assertEquals(
1059-
100,
1073+
expectedMaximumUtilization,
10601074
JobVertexScaler.scale(
10611075
vertex,
1062-
24,
1076+
currentParallelism,
10631077
List.of(),
1064-
199,
1065-
256,
1066-
4,
1078+
partition,
1079+
parallelismUpperLimit,
1080+
scaleFactor,
10671081
parallelismLowerLimit,
10681082
parallelismUpperLimit,
10691083
eventCollector,

0 commit comments

Comments
 (0)