Skip to content

Commit 0aa6795

Browse files
author
huyuanfeng
committed
[FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy
1 parent f87d50d commit 0aa6795

File tree

4 files changed

+148
-17
lines changed

4 files changed

+148
-17
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@
188188
<td>Duration</td>
189189
<td>Time interval to resend the identical event</td>
190190
</tr>
191+
<tr>
192+
<td><h5>job.autoscaler.scaling.key-group.partitions.adjust.mode</h5></td>
193+
<td style="word-wrap: break-word;">ABSOLUTELY_EVENLY_DISTRIBUTION</td>
194+
<td><p>Enum</p></td>
195+
<td>How to adjust the parallelism of Source vertex or upstream shuffle is keyBy<br /><br />Possible values:<ul><li>"ABSOLUTELY_EVENLY_DISTRIBUTION": This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks. It is particularly effective for source vertices that are aware of partition counts or vertices after 'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew.</li><li>"MAXIMIZE_UTILISATION": This model is to maximize resource utilization. In this mode, an attempt is made to set the parallelism that meets the current consumption rate requirements. Unlike the default mode, it is not enforced that the number of key groups or partitions is divisible by the parallelism.</li></ul></td>
196+
</tr>
191197
<tr>
192198
<td><h5>job.autoscaler.stabilization.interval</h5></td>
193199
<td style="word-wrap: break-word;">5 min</td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ParallelismAdjuster.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@
1919

2020
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
2121
import org.apache.flink.autoscaler.topology.ShipStrategy;
22+
import org.apache.flink.configuration.DescribedEnum;
23+
import org.apache.flink.configuration.description.InlineElement;
2224
import org.apache.flink.runtime.jobgraph.JobVertexID;
2325

2426
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
2527
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
28+
import static org.apache.flink.autoscaler.ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
2629
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
30+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
31+
import static org.apache.flink.configuration.description.TextElement.text;
2732

2833
/**
2934
* Component responsible adjusts the parallelism of a vertex.
@@ -44,19 +49,23 @@ int adjustViaNumKeyGroupsOrPartitions(
4449
int newParallelism,
4550
int upperBound,
4651
int parallelismLowerLimit) {
47-
var upperBoundForAlignment =
48-
Math.min(
49-
// Optimize the case where newParallelism <= maxParallelism / 2
50-
newParallelism > numKeyGroupsOrPartitions / 2
51-
? numKeyGroupsOrPartitions
52-
: numKeyGroupsOrPartitions / 2,
53-
upperBound);
52+
53+
KeyGroupOrPartitionsAdjustMode mode =
54+
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
55+
56+
var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound);
5457

5558
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
5659
// we try to adjust the parallelism such that it divides
5760
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
5861
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
59-
if (numKeyGroupsOrPartitions % p == 0) {
62+
if (numKeyGroupsOrPartitions % p == 0
63+
||
64+
// When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism
65+
// that can satisfy the current consumption rate.
66+
(mode == MAXIMIZE_UTILISATION
67+
&& numKeyGroupsOrPartitions / p
68+
< numKeyGroupsOrPartitions / newParallelism)) {
6069
return p;
6170
}
6271
}
@@ -100,4 +109,28 @@ private static int calculateMinimumParallelism(
100109
p = Math.max(p, parallelismLowerLimit);
101110
return p;
102111
}
112+
113+
/** The mode of the key group or parallelism adjustment. */
114+
public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
115+
ABSOLUTELY_EVENLY_DISTRIBUTION(
116+
"This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks"
117+
+ ". It is particularly effective for source vertices that are aware of partition counts or vertices after "
118+
+ "'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew."),
119+
120+
MAXIMIZE_UTILISATION(
121+
"This model is to maximize resource utilization. In this mode, an attempt is made to set"
122+
+ " the parallelism that meets the current consumption rate requirements. Unlike the default mode, it is not enforced that the number of key groups or partitions is divisible by the parallelism."),
123+
;
124+
125+
private final InlineElement description;
126+
127+
KeyGroupOrPartitionsAdjustMode(String description) {
128+
this.description = text(description);
129+
}
130+
131+
@Override
132+
public InlineElement getDescription() {
133+
return description;
134+
}
135+
}
103136
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.config;
1919

20+
import org.apache.flink.autoscaler.ParallelismAdjuster;
2021
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2122
import org.apache.flink.configuration.ConfigOption;
2223
import org.apache.flink.configuration.ConfigOptions;
@@ -351,4 +352,17 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
351352
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
352353
.withDescription(
353354
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");
355+
356+
public static final ConfigOption<ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode>
357+
SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
358+
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
359+
.enumType(ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.class)
360+
.defaultValue(
361+
ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode
362+
.ABSOLUTELY_EVENLY_DISTRIBUTION)
363+
.withFallbackKeys(
364+
oldOperatorConfigKey(
365+
"scaling.key-group.partitions.adjust.mode"))
366+
.withDescription(
367+
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
354368
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,8 @@ public void testParallelismComputation() {
323323
@MethodSource("adjustmentInputsProvider")
324324
public void testParallelismComputationWithAdjustment(
325325
Collection<ShipStrategy> inputShipStrategies) {
326-
final int minParallelism = 1;
327-
final int maxParallelism = Integer.MAX_VALUE;
326+
final int parallelismLowerLimit = 1;
327+
final int parallelismUpperLimit = Integer.MAX_VALUE;
328328
final var vertex = new JobVertexID();
329329

330330
assertEquals(
@@ -336,8 +336,8 @@ public void testParallelismComputationWithAdjustment(
336336
0,
337337
36,
338338
0.8,
339-
minParallelism,
340-
maxParallelism,
339+
parallelismLowerLimit,
340+
parallelismUpperLimit,
341341
eventCollector,
342342
context));
343343
assertEquals(
@@ -349,8 +349,8 @@ public void testParallelismComputationWithAdjustment(
349349
0,
350350
128,
351351
1.5,
352-
minParallelism,
353-
maxParallelism,
352+
parallelismLowerLimit,
353+
parallelismUpperLimit,
354354
eventCollector,
355355
context));
356356
assertEquals(
@@ -362,8 +362,8 @@ public void testParallelismComputationWithAdjustment(
362362
0,
363363
720,
364364
1.3,
365-
minParallelism,
366-
maxParallelism,
365+
parallelismLowerLimit,
366+
parallelismUpperLimit,
367367
eventCollector,
368368
context));
369369
assertEquals(
@@ -375,7 +375,44 @@ public void testParallelismComputationWithAdjustment(
375375
0,
376376
720,
377377
Integer.MAX_VALUE,
378-
minParallelism,
378+
parallelismLowerLimit,
379+
parallelismUpperLimit,
380+
eventCollector,
381+
context));
382+
383+
int maxParallelism = 128;
384+
double scaleFactor = 2.5;
385+
int currentParallelism = 10;
386+
int expectedEvenly = 32;
387+
int expectedMaximumUtilization = 26;
388+
assertEquals(
389+
expectedEvenly,
390+
JobVertexScaler.scale(
391+
vertex,
392+
currentParallelism,
393+
inputShipStrategies,
394+
0,
395+
maxParallelism,
396+
scaleFactor,
397+
parallelismLowerLimit,
398+
parallelismUpperLimit,
399+
eventCollector,
400+
context));
401+
402+
Configuration conf = context.getConfiguration();
403+
conf.set(
404+
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
405+
ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
406+
assertEquals(
407+
expectedMaximumUtilization,
408+
JobVertexScaler.scale(
409+
vertex,
410+
currentParallelism,
411+
inputShipStrategies,
412+
0,
413+
maxParallelism,
414+
scaleFactor,
415+
parallelismLowerLimit,
379416
maxParallelism,
380417
eventCollector,
381418
context));
@@ -1004,6 +1041,47 @@ public void testNumPartitionsAdjustment() {
10041041
parallelismUpperLimit,
10051042
eventCollector,
10061043
context));
1044+
1045+
int partition = 199;
1046+
double scaleFactor = 4;
1047+
int currentParallelism = 24;
1048+
int expectedEvenly = 199;
1049+
// At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
1050+
// one subtask consumes one partition.
1051+
int expectedMaximumUtilization = 100;
1052+
1053+
assertEquals(
1054+
expectedEvenly,
1055+
JobVertexScaler.scale(
1056+
vertex,
1057+
currentParallelism,
1058+
List.of(),
1059+
partition,
1060+
parallelismUpperLimit,
1061+
scaleFactor,
1062+
parallelismLowerLimit,
1063+
parallelismUpperLimit,
1064+
eventCollector,
1065+
context));
1066+
1067+
Configuration conf = context.getConfiguration();
1068+
conf.set(
1069+
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
1070+
ParallelismAdjuster.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
1071+
1072+
assertEquals(
1073+
expectedMaximumUtilization,
1074+
JobVertexScaler.scale(
1075+
vertex,
1076+
currentParallelism,
1077+
List.of(),
1078+
partition,
1079+
parallelismUpperLimit,
1080+
scaleFactor,
1081+
parallelismLowerLimit,
1082+
parallelismUpperLimit,
1083+
eventCollector,
1084+
context));
10071085
}
10081086

10091087
@Test

0 commit comments

Comments
 (0)