Skip to content

Commit 1d730f1

Browse files
author
huyuanfeng
committed
[FLINK-36527][autoscaler] Introduce a parameter to support adopt a more radical strategy when source vertex or upstream shuffle is keyBy
1 parent d72e3ce commit 1d730f1

File tree

4 files changed

+92
-5
lines changed

4 files changed

+92
-5
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@
188188
<td>Duration</td>
189189
<td>Time interval to resend the identical event</td>
190190
</tr>
191+
<tr>
192+
<td><h5>job.autoscaler.scaling.radical.enabled</h5></td>
193+
<td style="word-wrap: break-word;">false</td>
194+
<td>Boolean</td>
195+
<td>If this option is enabled, The determination of parallelism will be more radical, which will maximize resource utilization, but may also cause data skew in some vertex.</td>
196+
</tr>
191197
<tr>
192198
<td><h5>job.autoscaler.stabilization.interval</h5></td>
193199
<td style="word-wrap: break-word;">5 min</td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -416,21 +416,31 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
416416
// Optimize the case where newParallelism <= maxParallelism / 2
417417
newParallelism > numKeyGroupsOrPartitions / 2
418418
? numKeyGroupsOrPartitions
419-
: numKeyGroupsOrPartitions / 2,
419+
: numKeyGroupsOrPartitions / 2 + numKeyGroupsOrPartitions % 2,
420420
upperBound);
421421

422+
boolean scalingRadical =
423+
context.getConfiguration().get(AutoScalerOptions.SCALING_RADICAL_ENABLED);
424+
422425
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
423426
// we try to adjust the parallelism such that it divides
424427
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
425428
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
426-
if (numKeyGroupsOrPartitions % p == 0) {
429+
if (numKeyGroupsOrPartitions % p == 0
430+
||
431+
// When scaling radical is enabled, Try to find the smallest parallelism that
432+
// can satisfy the
433+
// current consumption rate.
434+
(scalingRadical
435+
&& numKeyGroupsOrPartitions / p
436+
< numKeyGroupsOrPartitions / newParallelism)) {
427437
return p;
428438
}
429439
}
430440

431-
// When adjust the parallelism after rounding up cannot be evenly divided by
432-
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
433-
// current consumption rate.
441+
// When adjust the parallelism after rounding up cannot be
442+
// find the right degree of parallelism to meet requirements,
443+
// Try to find the smallest parallelism that can satisfy the current consumption rate.
434444
int p = newParallelism;
435445
for (; p > 0; p--) {
436446
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,4 +351,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
351351
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
352352
.withDescription(
353353
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");
354+
355+
public static final ConfigOption<Boolean> SCALING_RADICAL_ENABLED =
356+
autoScalerConfig("scaling.radical.enabled")
357+
.booleanType()
358+
.defaultValue(false)
359+
.withFallbackKeys(oldOperatorConfigKey("scaling.radical.enabled"))
360+
.withDescription(
361+
"If this option is enabled, The determination of parallelism will be more radical, which"
362+
+ " will maximize resource utilization, but may also cause data skew in some vertex.");
354363
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,37 @@ public void testParallelismComputationWithAdjustment(
379379
maxParallelism,
380380
eventCollector,
381381
context));
382+
383+
assertEquals(
384+
32,
385+
JobVertexScaler.scale(
386+
vertex,
387+
10,
388+
inputShipStrategies,
389+
0,
390+
128,
391+
2.5,
392+
minParallelism,
393+
maxParallelism,
394+
eventCollector,
395+
context));
396+
397+
// scaling.radical.enabled = true
398+
Configuration conf = context.getConfiguration();
399+
conf.set(AutoScalerOptions.SCALING_RADICAL_ENABLED, true);
400+
assertEquals(
401+
26,
402+
JobVertexScaler.scale(
403+
vertex,
404+
10,
405+
inputShipStrategies,
406+
0,
407+
128,
408+
2.5,
409+
minParallelism,
410+
maxParallelism,
411+
eventCollector,
412+
context));
382413
}
383414

384415
@ParameterizedTest
@@ -1004,6 +1035,37 @@ public void testNumPartitionsAdjustment() {
10041035
parallelismUpperLimit,
10051036
eventCollector,
10061037
context));
1038+
1039+
assertEquals(
1040+
199,
1041+
JobVertexScaler.scale(
1042+
vertex,
1043+
24,
1044+
List.of(),
1045+
199,
1046+
256,
1047+
4,
1048+
parallelismLowerLimit,
1049+
parallelismUpperLimit,
1050+
eventCollector,
1051+
context));
1052+
1053+
// scaling.radical.enabled = true
1054+
Configuration conf = context.getConfiguration();
1055+
conf.set(AutoScalerOptions.SCALING_RADICAL_ENABLED, true);
1056+
assertEquals(
1057+
100,
1058+
JobVertexScaler.scale(
1059+
vertex,
1060+
24,
1061+
List.of(),
1062+
199,
1063+
256,
1064+
4,
1065+
parallelismLowerLimit,
1066+
parallelismUpperLimit,
1067+
eventCollector,
1068+
context));
10071069
}
10081070

10091071
@Test

0 commit comments

Comments
 (0)