Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@
<td>Duration</td>
<td>Time interval to resend the identical event</td>
</tr>
<tr>
<td><h5>job.autoscaler.scaling.key-group.partitions.adjust.mode</h5></td>
<td style="word-wrap: break-word;">EVENLY_SPREAD</td>
<td><p>Enum</p></td>
<td>How to adjust the parallelism of Source vertex or upstream shuffle is keyBy<br /><br />Possible values:<ul><li>"EVENLY_SPREAD": This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks. It is particularly effective for source vertices that are aware of partition counts or vertices after 'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew.</li><li>"MAXIMIZE_UTILISATION": This model is to maximize resource utilization. In this mode, an attempt is made to set the parallelism that meets the current consumption rate requirements. It is not enforced that the number of key groups or partitions is divisible by the parallelism.</li></ul></td>
</tr>
<tr>
<td><h5>job.autoscaler.stabilization.interval</h5></td>
<td style="word-wrap: break-word;">5 min</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;

Expand All @@ -41,10 +43,12 @@
import java.util.Objects;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
Expand All @@ -54,6 +58,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;

/** Component responsible for computing vertex parallelism based on the scaling metrics. */
Expand Down Expand Up @@ -411,26 +416,29 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(

var numKeyGroupsOrPartitions =
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
var upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > numKeyGroupsOrPartitions / 2
? numKeyGroupsOrPartitions
: numKeyGroupsOrPartitions / 2,
upperBound);
var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound);

KeyGroupOrPartitionsAdjustMode mode =
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);

// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
if (numKeyGroupsOrPartitions % p == 0) {
if (numKeyGroupsOrPartitions % p == 0
||
// When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism
// that can satisfy the current consumption rate.
(mode == MAXIMIZE_UTILISATION
&& numKeyGroupsOrPartitions / p
< numKeyGroupsOrPartitions / newParallelism)) {
return p;
}
}

// When adjust the parallelism after rounding up cannot be evenly divided by
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
// current consumption rate.
// When adjusting the parallelism after rounding up cannot
// find the parallelism to meet requirements.
// Try to find the smallest parallelism that can satisfy the current consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
Expand Down Expand Up @@ -465,4 +473,29 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
protected void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}

/** The mode of the key group or parallelism adjustment. */
public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
EVENLY_SPREAD(
"This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks"
+ ". It is particularly effective for source vertices that are aware of partition counts or vertices after "
+ "'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew."),

MAXIMIZE_UTILISATION(
"This model is to maximize resource utilization. In this mode, an attempt is made to set"
+ " the parallelism that meets the current consumption rate requirements. It is not enforced "
+ "that the number of key groups or partitions is divisible by the parallelism."),
;

private final InlineElement description;

KeyGroupOrPartitionsAdjustMode(String description) {
this.description = text(description);
}

@Override
public InlineElement getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.autoscaler.config;

import org.apache.flink.autoscaler.JobVertexScaler;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand Down Expand Up @@ -351,4 +352,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
.withDescription(
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");

public static final ConfigOption<JobVertexScaler.KeyGroupOrPartitionsAdjustMode>
SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
.enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
.defaultValue(
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
.withFallbackKeys(
oldOperatorConfigKey(
"scaling.key-group.partitions.adjust.mode"))
.withDescription(
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ public void testParallelismComputation() {
@MethodSource("adjustmentInputsProvider")
public void testParallelismComputationWithAdjustment(
Collection<ShipStrategy> inputShipStrategies) {
final int minParallelism = 1;
final int maxParallelism = Integer.MAX_VALUE;
final int parallelismLowerLimit = 1;
final int parallelismUpperLimit = Integer.MAX_VALUE;
final var vertex = new JobVertexID();

assertEquals(
Expand All @@ -336,8 +336,8 @@ public void testParallelismComputationWithAdjustment(
0,
36,
0.8,
minParallelism,
maxParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
assertEquals(
Expand All @@ -349,8 +349,8 @@ public void testParallelismComputationWithAdjustment(
0,
128,
1.5,
minParallelism,
maxParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
assertEquals(
Expand All @@ -362,8 +362,8 @@ public void testParallelismComputationWithAdjustment(
0,
720,
1.3,
minParallelism,
maxParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
assertEquals(
Expand All @@ -375,7 +375,44 @@ public void testParallelismComputationWithAdjustment(
0,
720,
Integer.MAX_VALUE,
minParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));

int maxParallelism = 128;
double scaleFactor = 2.5;
int currentParallelism = 10;
int expectedEvenly = 32;
int expectedMaximumUtilization = 26;
assertEquals(
expectedEvenly,
JobVertexScaler.scale(
vertex,
currentParallelism,
inputShipStrategies,
0,
maxParallelism,
scaleFactor,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));

Configuration conf = context.getConfiguration();
conf.set(
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
assertEquals(
expectedMaximumUtilization,
JobVertexScaler.scale(
vertex,
currentParallelism,
inputShipStrategies,
0,
maxParallelism,
scaleFactor,
parallelismLowerLimit,
maxParallelism,
eventCollector,
context));
Expand Down Expand Up @@ -1004,6 +1041,47 @@ public void testNumPartitionsAdjustment() {
parallelismUpperLimit,
eventCollector,
context));

int partition = 199;
double scaleFactor = 4;
int currentParallelism = 24;
int expectedEvenly = 199;
// At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
// one subtask consumes one partition.
int expectedMaximumUtilization = 100;

assertEquals(
expectedEvenly,
JobVertexScaler.scale(
vertex,
currentParallelism,
List.of(),
partition,
parallelismUpperLimit,
scaleFactor,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));

Configuration conf = context.getConfiguration();
conf.set(
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);

assertEquals(
expectedMaximumUtilization,
JobVertexScaler.scale(
vertex,
currentParallelism,
List.of(),
partition,
parallelismUpperLimit,
scaleFactor,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
}

@Test
Expand Down