Skip to content

Commit d2a68eb

Browse files
author
huyuanfeng
committed
Add a new NumKeyGroupsOrPartitionsParallelismAdjuster to make the logic clearer
1 parent 9f6fba9 commit d2a68eb

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NumKeyGroupsOrPartitionsParallelismAdjuster.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.flink.autoscaler;
219

320
import org.apache.flink.autoscaler.config.AutoScalerOptions;
@@ -34,23 +51,16 @@ public static <KEY, Context extends JobAutoScalerContext<KEY>> int adjust(
3451
context.getConfiguration()
3552
.get(AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
3653

37-
var upperBoundForAlignment =
38-
Math.min(
39-
// Optimize the case where newParallelism <= maxParallelism / 2
40-
newParallelism > numKeyGroupsOrPartitions / 2
41-
? numKeyGroupsOrPartitions
42-
: numKeyGroupsOrPartitions / 2 + numKeyGroupsOrPartitions % 2,
43-
upperBound);
54+
var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound);
4455

4556
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
4657
// we try to adjust the parallelism such that it divides
4758
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
4859
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
4960
if (numKeyGroupsOrPartitions % p == 0
5061
||
51-
// When MAXIMIZE_UTILISATION is enabled, Try to find the smallest parallelism
52-
// that
53-
// can satisfy the current consumption rate.
62+
// When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism
63+
// that can satisfy the current consumption rate.
5464
(mode == Mode.MAXIMIZE_UTILISATION
5565
&& numKeyGroupsOrPartitions / p
5666
< numKeyGroupsOrPartitions / newParallelism)) {

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,6 @@ public void testParallelismComputationWithAdjustment(
394394
eventCollector,
395395
context));
396396

397-
// scaling.radical.enabled = true
398397
Configuration conf = context.getConfiguration();
399398
conf.set(
400399
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
@@ -1052,7 +1051,6 @@ public void testNumPartitionsAdjustment() {
10521051
eventCollector,
10531052
context));
10541053

1055-
// scaling.radical.enabled = true
10561054
Configuration conf = context.getConfiguration();
10571055
conf.set(
10581056
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,

0 commit comments

Comments
 (0)