Skip to content

Commit acc04da

Browse files
[FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy (#904)
1 parent efa3f10 commit acc04da

File tree

4 files changed

+150
-20
lines changed

4 files changed

+150
-20
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@
188188
<td>Duration</td>
189189
<td>Time interval to resend the identical event</td>
190190
</tr>
191+
<tr>
192+
<td><h5>job.autoscaler.scaling.key-group.partitions.adjust.mode</h5></td>
193+
<td style="word-wrap: break-word;">EVENLY_SPREAD</td>
194+
<td><p>Enum</p></td>
195+
<td>How to adjust the parallelism of Source vertex or upstream shuffle is keyBy<br /><br />Possible values:<ul><li>"EVENLY_SPREAD": This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks. It is particularly effective for source vertices that are aware of partition counts or vertices after 'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew.</li><li>"MAXIMIZE_UTILISATION": This model is to maximize resource utilization. In this mode, an attempt is made to set the parallelism that meets the current consumption rate requirements. It is not enforced that the number of key groups or partitions is divisible by the parallelism.</li></ul></td>
196+
</tr>
191197
<tr>
192198
<td><h5>job.autoscaler.stabilization.interval</h5></td>
193199
<td style="word-wrap: break-word;">5 min</td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.flink.autoscaler.topology.ShipStrategy;
2626
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
2727
import org.apache.flink.configuration.Configuration;
28+
import org.apache.flink.configuration.DescribedEnum;
29+
import org.apache.flink.configuration.description.InlineElement;
2830
import org.apache.flink.runtime.jobgraph.JobVertexID;
2931
import org.apache.flink.util.Preconditions;
3032

@@ -41,10 +43,12 @@
4143
import java.util.Objects;
4244
import java.util.SortedMap;
4345

46+
import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
4447
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
4548
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
4649
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
4750
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
51+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
4852
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
4953
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
5054
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
@@ -54,6 +58,7 @@
5458
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
5559
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5660
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
61+
import static org.apache.flink.configuration.description.TextElement.text;
5762
import static org.apache.flink.util.Preconditions.checkArgument;
5863

5964
/** Component responsible for computing vertex parallelism based on the scaling metrics. */
@@ -411,26 +416,29 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
411416

412417
var numKeyGroupsOrPartitions =
413418
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
414-
var upperBoundForAlignment =
415-
Math.min(
416-
// Optimize the case where newParallelism <= maxParallelism / 2
417-
newParallelism > numKeyGroupsOrPartitions / 2
418-
? numKeyGroupsOrPartitions
419-
: numKeyGroupsOrPartitions / 2,
420-
upperBound);
419+
var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound);
420+
421+
KeyGroupOrPartitionsAdjustMode mode =
422+
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
421423

422424
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
423425
// we try to adjust the parallelism such that it divides
424426
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
425427
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
426-
if (numKeyGroupsOrPartitions % p == 0) {
428+
if (numKeyGroupsOrPartitions % p == 0
429+
||
430+
// When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism
431+
// that can satisfy the current consumption rate.
432+
(mode == MAXIMIZE_UTILISATION
433+
&& numKeyGroupsOrPartitions / p
434+
< numKeyGroupsOrPartitions / newParallelism)) {
427435
return p;
428436
}
429437
}
430438

431-
// When adjust the parallelism after rounding up cannot be evenly divided by
432-
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
433-
// current consumption rate.
439+
// When adjusting the parallelism after rounding up cannot
440+
// find the parallelism to meet requirements.
441+
// Try to find the smallest parallelism that can satisfy the current consumption rate.
434442
int p = newParallelism;
435443
for (; p > 0; p--) {
436444
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
@@ -465,4 +473,29 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
465473
protected void setClock(Clock clock) {
466474
this.clock = Preconditions.checkNotNull(clock);
467475
}
476+
477+
/** The mode of the key group or parallelism adjustment. */
478+
public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
479+
EVENLY_SPREAD(
480+
"This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks"
481+
+ ". It is particularly effective for source vertices that are aware of partition counts or vertices after "
482+
+ "'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew."),
483+
484+
MAXIMIZE_UTILISATION(
485+
"This model is to maximize resource utilization. In this mode, an attempt is made to set"
486+
+ " the parallelism that meets the current consumption rate requirements. It is not enforced "
487+
+ "that the number of key groups or partitions is divisible by the parallelism."),
488+
;
489+
490+
private final InlineElement description;
491+
492+
KeyGroupOrPartitionsAdjustMode(String description) {
493+
this.description = text(description);
494+
}
495+
496+
@Override
497+
public InlineElement getDescription() {
498+
return description;
499+
}
500+
}
468501
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.config;
1919

20+
import org.apache.flink.autoscaler.JobVertexScaler;
2021
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2122
import org.apache.flink.configuration.ConfigOption;
2223
import org.apache.flink.configuration.ConfigOptions;
@@ -351,4 +352,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
351352
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
352353
.withDescription(
353354
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");
355+
356+
public static final ConfigOption<JobVertexScaler.KeyGroupOrPartitionsAdjustMode>
357+
SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
358+
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
359+
.enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
360+
.defaultValue(
361+
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
362+
.withFallbackKeys(
363+
oldOperatorConfigKey(
364+
"scaling.key-group.partitions.adjust.mode"))
365+
.withDescription(
366+
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
354367
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,8 @@ public void testParallelismComputation() {
323323
@MethodSource("adjustmentInputsProvider")
324324
public void testParallelismComputationWithAdjustment(
325325
Collection<ShipStrategy> inputShipStrategies) {
326-
final int minParallelism = 1;
327-
final int maxParallelism = Integer.MAX_VALUE;
326+
final int parallelismLowerLimit = 1;
327+
final int parallelismUpperLimit = Integer.MAX_VALUE;
328328
final var vertex = new JobVertexID();
329329

330330
assertEquals(
@@ -336,8 +336,8 @@ public void testParallelismComputationWithAdjustment(
336336
0,
337337
36,
338338
0.8,
339-
minParallelism,
340-
maxParallelism,
339+
parallelismLowerLimit,
340+
parallelismUpperLimit,
341341
eventCollector,
342342
context));
343343
assertEquals(
@@ -349,8 +349,8 @@ public void testParallelismComputationWithAdjustment(
349349
0,
350350
128,
351351
1.5,
352-
minParallelism,
353-
maxParallelism,
352+
parallelismLowerLimit,
353+
parallelismUpperLimit,
354354
eventCollector,
355355
context));
356356
assertEquals(
@@ -362,8 +362,8 @@ public void testParallelismComputationWithAdjustment(
362362
0,
363363
720,
364364
1.3,
365-
minParallelism,
366-
maxParallelism,
365+
parallelismLowerLimit,
366+
parallelismUpperLimit,
367367
eventCollector,
368368
context));
369369
assertEquals(
@@ -375,7 +375,44 @@ public void testParallelismComputationWithAdjustment(
375375
0,
376376
720,
377377
Integer.MAX_VALUE,
378-
minParallelism,
378+
parallelismLowerLimit,
379+
parallelismUpperLimit,
380+
eventCollector,
381+
context));
382+
383+
int maxParallelism = 128;
384+
double scaleFactor = 2.5;
385+
int currentParallelism = 10;
386+
int expectedEvenly = 32;
387+
int expectedMaximumUtilization = 26;
388+
assertEquals(
389+
expectedEvenly,
390+
JobVertexScaler.scale(
391+
vertex,
392+
currentParallelism,
393+
inputShipStrategies,
394+
0,
395+
maxParallelism,
396+
scaleFactor,
397+
parallelismLowerLimit,
398+
parallelismUpperLimit,
399+
eventCollector,
400+
context));
401+
402+
Configuration conf = context.getConfiguration();
403+
conf.set(
404+
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
405+
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
406+
assertEquals(
407+
expectedMaximumUtilization,
408+
JobVertexScaler.scale(
409+
vertex,
410+
currentParallelism,
411+
inputShipStrategies,
412+
0,
413+
maxParallelism,
414+
scaleFactor,
415+
parallelismLowerLimit,
379416
maxParallelism,
380417
eventCollector,
381418
context));
@@ -1004,6 +1041,47 @@ public void testNumPartitionsAdjustment() {
10041041
parallelismUpperLimit,
10051042
eventCollector,
10061043
context));
1044+
1045+
int partition = 199;
1046+
double scaleFactor = 4;
1047+
int currentParallelism = 24;
1048+
int expectedEvenly = 199;
1049+
// At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
1050+
// one subtask consumes one partition.
1051+
int expectedMaximumUtilization = 100;
1052+
1053+
assertEquals(
1054+
expectedEvenly,
1055+
JobVertexScaler.scale(
1056+
vertex,
1057+
currentParallelism,
1058+
List.of(),
1059+
partition,
1060+
parallelismUpperLimit,
1061+
scaleFactor,
1062+
parallelismLowerLimit,
1063+
parallelismUpperLimit,
1064+
eventCollector,
1065+
context));
1066+
1067+
Configuration conf = context.getConfiguration();
1068+
conf.set(
1069+
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
1070+
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
1071+
1072+
assertEquals(
1073+
expectedMaximumUtilization,
1074+
JobVertexScaler.scale(
1075+
vertex,
1076+
currentParallelism,
1077+
List.of(),
1078+
partition,
1079+
parallelismUpperLimit,
1080+
scaleFactor,
1081+
parallelismLowerLimit,
1082+
parallelismUpperLimit,
1083+
eventCollector,
1084+
context));
10071085
}
10081086

10091087
@Test

0 commit comments

Comments
 (0)