diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 49ff71efa2..09baf21717 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -188,6 +188,12 @@
Duration |
Time interval to resend the identical event |
+
+ job.autoscaler.scaling.key-group.partitions.adjust.mode |
+ EVENLY_SPREAD |
+ Enum |
+ How to adjust the parallelism of Source vertex or upstream shuffle is keyBy
Possible values:- "EVENLY_SPREAD": This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks. It is particularly effective for source vertices that are aware of partition counts or vertices after 'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew.
- "MAXIMIZE_UTILISATION": This model is to maximize resource utilization. In this mode, an attempt is made to set the parallelism that meets the current consumption rate requirements. It is not enforced that the number of key groups or partitions is divisible by the parallelism.
|
+
job.autoscaler.stabilization.interval |
5 min |
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 1d32b1aab6..87099bc0fb 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -25,6 +25,8 @@
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
@@ -41,10 +43,12 @@
import java.util.Objects;
import java.util.SortedMap;
+import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
@@ -54,6 +58,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Component responsible for computing vertex parallelism based on the scaling metrics. */
@@ -411,26 +416,29 @@ protected static > int scale(
var numKeyGroupsOrPartitions =
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
- var upperBoundForAlignment =
- Math.min(
- // Optimize the case where newParallelism <= maxParallelism / 2
- newParallelism > numKeyGroupsOrPartitions / 2
- ? numKeyGroupsOrPartitions
- : numKeyGroupsOrPartitions / 2,
- upperBound);
+ var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound);
+
+ KeyGroupOrPartitionsAdjustMode mode =
+ context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
- if (numKeyGroupsOrPartitions % p == 0) {
+ if (numKeyGroupsOrPartitions % p == 0
+ ||
+ // When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism
+ // that can satisfy the current consumption rate.
+ (mode == MAXIMIZE_UTILISATION
+ && numKeyGroupsOrPartitions / p
+ < numKeyGroupsOrPartitions / newParallelism)) {
return p;
}
}
- // When adjust the parallelism after rounding up cannot be evenly divided by
- // numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
- // current consumption rate.
+ // When adjusting the parallelism after rounding up cannot
+ // find the parallelism to meet requirements.
+ // Try to find the smallest parallelism that can satisfy the current consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
@@ -465,4 +473,29 @@ protected static > int scale(
protected void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}
+
+ /** The mode of the key group or parallelism adjustment. */
+ public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
+ EVENLY_SPREAD(
+ "This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks"
+ + ". It is particularly effective for source vertices that are aware of partition counts or vertices after "
+ + "'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew."),
+
+ MAXIMIZE_UTILISATION(
+ "This model is to maximize resource utilization. In this mode, an attempt is made to set"
+ + " the parallelism that meets the current consumption rate requirements. It is not enforced "
+ + "that the number of key groups or partitions is divisible by the parallelism."),
+ ;
+
+ private final InlineElement description;
+
+ KeyGroupOrPartitionsAdjustMode(String description) {
+ this.description = text(description);
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return description;
+ }
+ }
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index e1ea6a8695..a5ffb0f9d8 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -17,6 +17,7 @@
package org.apache.flink.autoscaler.config;
+import org.apache.flink.autoscaler.JobVertexScaler;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -351,4 +352,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
.withDescription(
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");
+
+ public static final ConfigOption
+ SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
+ autoScalerConfig("scaling.key-group.partitions.adjust.mode")
+ .enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
+ .defaultValue(
+ JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
+ .withFallbackKeys(
+ oldOperatorConfigKey(
+ "scaling.key-group.partitions.adjust.mode"))
+ .withDescription(
+ "How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index ae832073e2..704ddd58e3 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -323,8 +323,8 @@ public void testParallelismComputation() {
@MethodSource("adjustmentInputsProvider")
public void testParallelismComputationWithAdjustment(
Collection inputShipStrategies) {
- final int minParallelism = 1;
- final int maxParallelism = Integer.MAX_VALUE;
+ final int parallelismLowerLimit = 1;
+ final int parallelismUpperLimit = Integer.MAX_VALUE;
final var vertex = new JobVertexID();
assertEquals(
@@ -336,8 +336,8 @@ public void testParallelismComputationWithAdjustment(
0,
36,
0.8,
- minParallelism,
- maxParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
eventCollector,
context));
assertEquals(
@@ -349,8 +349,8 @@ public void testParallelismComputationWithAdjustment(
0,
128,
1.5,
- minParallelism,
- maxParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
eventCollector,
context));
assertEquals(
@@ -362,8 +362,8 @@ public void testParallelismComputationWithAdjustment(
0,
720,
1.3,
- minParallelism,
- maxParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
eventCollector,
context));
assertEquals(
@@ -375,7 +375,44 @@ public void testParallelismComputationWithAdjustment(
0,
720,
Integer.MAX_VALUE,
- minParallelism,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
+
+ int maxParallelism = 128;
+ double scaleFactor = 2.5;
+ int currentParallelism = 10;
+ int expectedEvenly = 32;
+ int expectedMaximumUtilization = 26;
+ assertEquals(
+ expectedEvenly,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ inputShipStrategies,
+ 0,
+ maxParallelism,
+ scaleFactor,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
+
+ Configuration conf = context.getConfiguration();
+ conf.set(
+ AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
+ JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
+ assertEquals(
+ expectedMaximumUtilization,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ inputShipStrategies,
+ 0,
+ maxParallelism,
+ scaleFactor,
+ parallelismLowerLimit,
maxParallelism,
eventCollector,
context));
@@ -1004,6 +1041,47 @@ public void testNumPartitionsAdjustment() {
parallelismUpperLimit,
eventCollector,
context));
+
+ int partition = 199;
+ double scaleFactor = 4;
+ int currentParallelism = 24;
+ int expectedEvenly = 199;
+ // At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
+ // one subtask consumes one partition.
+ int expectedMaximumUtilization = 100;
+
+ assertEquals(
+ expectedEvenly,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ List.of(),
+ partition,
+ parallelismUpperLimit,
+ scaleFactor,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
+
+ Configuration conf = context.getConfiguration();
+ conf.set(
+ AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
+ JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
+
+ assertEquals(
+ expectedMaximumUtilization,
+ JobVertexScaler.scale(
+ vertex,
+ currentParallelism,
+ List.of(),
+ partition,
+ parallelismUpperLimit,
+ scaleFactor,
+ parallelismLowerLimit,
+ parallelismUpperLimit,
+ eventCollector,
+ context));
}
@Test