From a339203ba0333d127d4a1207db804fcb41452e0e Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Wed, 13 Nov 2024 18:25:07 +0800 Subject: [PATCH 1/2] [FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy --- .../generated/auto_scaler_configuration.html | 6 ++ .../flink/autoscaler/JobVertexScaler.java | 56 ++++++++--- .../autoscaler/config/AutoScalerOptions.java | 13 +++ .../flink/autoscaler/JobVertexScalerTest.java | 96 +++++++++++++++++-- 4 files changed, 151 insertions(+), 20 deletions(-) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index 49ff71efa2..09baf21717 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -188,6 +188,12 @@ Duration Time interval to resend the identical event + +
job.autoscaler.scaling.key-group.partitions.adjust.mode
+ EVENLY_SPREAD +

Enum

+ How to adjust the parallelism of Source vertex or upstream shuffle is keyBy

Possible values: +
job.autoscaler.stabilization.interval
5 min diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 1d32b1aab6..a2c0a4d87d 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -25,6 +25,8 @@ import org.apache.flink.autoscaler.topology.ShipStrategy; import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -41,10 +43,12 @@ import java.util.Objects; import java.util.SortedMap; +import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE; import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; @@ -54,6 +58,7 @@ import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH; +import static org.apache.flink.configuration.description.TextElement.text; import static org.apache.flink.util.Preconditions.checkArgument; /** Component responsible for computing vertex parallelism based on the scaling metrics. */ @@ -411,26 +416,30 @@ protected static > int scale( var numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions; - var upperBoundForAlignment = - Math.min( - // Optimize the case where newParallelism <= maxParallelism / 2 - newParallelism > numKeyGroupsOrPartitions / 2 - ? numKeyGroupsOrPartitions - : numKeyGroupsOrPartitions / 2, - upperBound); + + KeyGroupOrPartitionsAdjustMode mode = + context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE); + + var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound); // When the shuffle type of vertex inputs contains keyBy or vertex is a source, // we try to adjust the parallelism such that it divides // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks for (int p = newParallelism; p <= upperBoundForAlignment; p++) { - if (numKeyGroupsOrPartitions % p == 0) { + if (numKeyGroupsOrPartitions % p == 0 + || + // When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism + // that can satisfy the current consumption rate. + (mode == MAXIMIZE_UTILISATION + && numKeyGroupsOrPartitions / p + < numKeyGroupsOrPartitions / newParallelism)) { return p; } } - // When adjust the parallelism after rounding up cannot be evenly divided by - // numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the - // current consumption rate. + // When adjusting the parallelism after rounding up cannot + // find the parallelism to meet requirements. + // Try to find the smallest parallelism that can satisfy the current consumption rate. int p = newParallelism; for (; p > 0; p--) { if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) { @@ -465,4 +474,29 @@ protected static > int scale( protected void setClock(Clock clock) { this.clock = Preconditions.checkNotNull(clock); } + + /** The mode of the key group or parallelism adjustment. */ + public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum { + EVENLY_SPREAD( + "This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks" + + ". It is particularly effective for source vertices that are aware of partition counts or vertices after " + + "'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew."), + + MAXIMIZE_UTILISATION( + "This model is to maximize resource utilization. In this mode, an attempt is made to set" + + " the parallelism that meets the current consumption rate requirements. It is not enforced " + + "that the number of key groups or partitions is divisible by the parallelism."), + ; + + private final InlineElement description; + + KeyGroupOrPartitionsAdjustMode(String description) { + this.description = text(description); + } + + @Override + public InlineElement getDescription() { + return description; + } + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index e1ea6a8695..a5ffb0f9d8 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -17,6 +17,7 @@ package org.apache.flink.autoscaler.config; +import org.apache.flink.autoscaler.JobVertexScaler; import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -351,4 +352,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withFallbackKeys(oldOperatorConfigKey("quota.cpu")) .withDescription( "Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen."); + + public static final ConfigOption + SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE = + autoScalerConfig("scaling.key-group.partitions.adjust.mode") + .enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class) + .defaultValue( + JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD) + .withFallbackKeys( + oldOperatorConfigKey( + "scaling.key-group.partitions.adjust.mode")) + .withDescription( + "How to adjust the parallelism of Source vertex or upstream shuffle is keyBy"); } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index ae832073e2..704ddd58e3 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -323,8 +323,8 @@ public void testParallelismComputation() { @MethodSource("adjustmentInputsProvider") public void testParallelismComputationWithAdjustment( Collection inputShipStrategies) { - final int minParallelism = 1; - final int maxParallelism = Integer.MAX_VALUE; + final int parallelismLowerLimit = 1; + final int parallelismUpperLimit = Integer.MAX_VALUE; final var vertex = new JobVertexID(); assertEquals( @@ -336,8 +336,8 @@ public void testParallelismComputationWithAdjustment( 0, 36, 0.8, - minParallelism, - maxParallelism, + parallelismLowerLimit, + parallelismUpperLimit, eventCollector, context)); assertEquals( @@ -349,8 +349,8 @@ public void testParallelismComputationWithAdjustment( 0, 128, 1.5, - minParallelism, - maxParallelism, + parallelismLowerLimit, + parallelismUpperLimit, eventCollector, context)); assertEquals( @@ -362,8 +362,8 @@ public void testParallelismComputationWithAdjustment( 0, 720, 1.3, - minParallelism, - maxParallelism, + parallelismLowerLimit, + parallelismUpperLimit, eventCollector, context)); assertEquals( @@ -375,7 +375,44 @@ public void testParallelismComputationWithAdjustment( 0, 720, Integer.MAX_VALUE, - minParallelism, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); + + int maxParallelism = 128; + double scaleFactor = 2.5; + int currentParallelism = 10; + int expectedEvenly = 32; + int expectedMaximumUtilization = 26; + assertEquals( + expectedEvenly, + JobVertexScaler.scale( + vertex, + currentParallelism, + inputShipStrategies, + 0, + maxParallelism, + scaleFactor, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); + + Configuration conf = context.getConfiguration(); + conf.set( + AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE, + JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION); + assertEquals( + expectedMaximumUtilization, + JobVertexScaler.scale( + vertex, + currentParallelism, + inputShipStrategies, + 0, + maxParallelism, + scaleFactor, + parallelismLowerLimit, maxParallelism, eventCollector, context)); @@ -1004,6 +1041,47 @@ public void testNumPartitionsAdjustment() { parallelismUpperLimit, eventCollector, context)); + + int partition = 199; + double scaleFactor = 4; + int currentParallelism = 24; + int expectedEvenly = 199; + // At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions, + // one subtask consumes one partition. + int expectedMaximumUtilization = 100; + + assertEquals( + expectedEvenly, + JobVertexScaler.scale( + vertex, + currentParallelism, + List.of(), + partition, + parallelismUpperLimit, + scaleFactor, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); + + Configuration conf = context.getConfiguration(); + conf.set( + AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE, + JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION); + + assertEquals( + expectedMaximumUtilization, + JobVertexScaler.scale( + vertex, + currentParallelism, + List.of(), + partition, + parallelismUpperLimit, + scaleFactor, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); } @Test From f18634afa3c5f973e2212c412d047d66054fc7cf Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Tue, 19 Nov 2024 22:53:52 +0800 Subject: [PATCH 2/2] [FLINK-36527][autoscaler] keep `var upperBoundForAlignment = ...` position remains unchanged --- .../main/java/org/apache/flink/autoscaler/JobVertexScaler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index a2c0a4d87d..87099bc0fb 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -416,12 +416,11 @@ protected static > int scale( var numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions; + var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound); KeyGroupOrPartitionsAdjustMode mode = context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE); - var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound); - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, // we try to adjust the parallelism such that it divides // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks