Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@
<td>Duration</td>
<td>Time interval to resend the identical event</td>
</tr>
<tr>
<td><h5>job.autoscaler.scaling.key-group.partitions.adjust.mode</h5></td>
<td style="word-wrap: break-word;">EVENLY_SPREAD</td>
<td><p>Enum</p></td>
<td>How to adjust the parallelism of Source vertex or upstream shuffle is keyBy<br /><br />Possible values:<ul><li>"EVENLY_SPREAD": This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks. It is particularly effective for source vertices that are aware of partition counts or vertices after 'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew.</li><li>"MAXIMIZE_UTILISATION": This model is to maximize resource utilization. In this mode, an attempt is made to set the parallelism that meets the current consumption rate requirements. It is not enforced that the number of key groups or partitions is divisible by the parallelism.</li></ul></td>
</tr>
<tr>
<td><h5>job.autoscaler.stabilization.interval</h5></td>
<td style="word-wrap: break-word;">5 min</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;

Expand All @@ -41,10 +43,12 @@
import java.util.Objects;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
Expand All @@ -54,6 +58,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;

/** Component responsible for computing vertex parallelism based on the scaling metrics. */
Expand Down Expand Up @@ -411,26 +416,30 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(

var numKeyGroupsOrPartitions =
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
var upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > numKeyGroupsOrPartitions / 2
? numKeyGroupsOrPartitions
: numKeyGroupsOrPartitions / 2,
upperBound);

KeyGroupOrPartitionsAdjustMode mode =
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);

var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the upperBoundForAlignment logic is updated? Would you mind sharing one case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimization to use half of the keyspace when the parallelism is less/equal to the keyspace doesn't work for the new parallelism adjustment mode. It was anyways just a shortcut, to avoid checking all divisors until the maximum. No harm removing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the upperBoundForAlignment logic is updated? Would you mind sharing one case?

When I ran the test case, i found this logic is wrong when keygroups or partitionnum are not even numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me give you an example:

partition=199, newParallelism=96
At this time, the original logical upperBoundForAlignment will be calculated as 99,

But under EVENLY_SPREAD, I think the result is 199 as expected

Under MAXIMIZE_UTILISATION, the result is 100 as expected,

But due to this wrong logic, we cannot get the expected results. This is a bug. If it is not appropriate to fix it in this PR, is it necessary to fix it in another PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's seems it's wrong logic when numKeyGroupsOrPartitions is odd number.

Thanks for the clarification! This change makes sense to me.


// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
if (numKeyGroupsOrPartitions % p == 0) {
if (numKeyGroupsOrPartitions % p == 0
||
// When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism
// that can satisfy the current consumption rate.
(mode == MAXIMIZE_UTILISATION
&& numKeyGroupsOrPartitions / p
< numKeyGroupsOrPartitions / newParallelism)) {
return p;
}
}

// When adjust the parallelism after rounding up cannot be evenly divided by
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
// current consumption rate.
// When adjusting the parallelism after rounding up cannot
// find the parallelism to meet requirements.
// Try to find the smallest parallelism that can satisfy the current consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
Expand Down Expand Up @@ -465,4 +474,29 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
protected void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}

/** The mode of the key group or parallelism adjustment. */
public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum {
EVENLY_SPREAD(
"This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks"
+ ". It is particularly effective for source vertices that are aware of partition counts or vertices after "
+ "'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew."),

MAXIMIZE_UTILISATION(
"This model is to maximize resource utilization. In this mode, an attempt is made to set"
+ " the parallelism that meets the current consumption rate requirements. It is not enforced "
+ "that the number of key groups or partitions is divisible by the parallelism."),
;

private final InlineElement description;

KeyGroupOrPartitionsAdjustMode(String description) {
this.description = text(description);
}

@Override
public InlineElement getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.autoscaler.config;

import org.apache.flink.autoscaler.JobVertexScaler;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand Down Expand Up @@ -351,4 +352,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
.withDescription(
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");

public static final ConfigOption<JobVertexScaler.KeyGroupOrPartitionsAdjustMode>
SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
.enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
.defaultValue(
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
.withFallbackKeys(
oldOperatorConfigKey(
"scaling.key-group.partitions.adjust.mode"))
.withDescription(
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ public void testParallelismComputation() {
@MethodSource("adjustmentInputsProvider")
public void testParallelismComputationWithAdjustment(
Collection<ShipStrategy> inputShipStrategies) {
final int minParallelism = 1;
final int maxParallelism = Integer.MAX_VALUE;
final int parallelismLowerLimit = 1;
final int parallelismUpperLimit = Integer.MAX_VALUE;
final var vertex = new JobVertexID();

assertEquals(
Expand All @@ -336,8 +336,8 @@ public void testParallelismComputationWithAdjustment(
0,
36,
0.8,
minParallelism,
maxParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
assertEquals(
Expand All @@ -349,8 +349,8 @@ public void testParallelismComputationWithAdjustment(
0,
128,
1.5,
minParallelism,
maxParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
assertEquals(
Expand All @@ -362,8 +362,8 @@ public void testParallelismComputationWithAdjustment(
0,
720,
1.3,
minParallelism,
maxParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
assertEquals(
Expand All @@ -375,7 +375,44 @@ public void testParallelismComputationWithAdjustment(
0,
720,
Integer.MAX_VALUE,
minParallelism,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));

int maxParallelism = 128;
double scaleFactor = 2.5;
int currentParallelism = 10;
int expectedEvenly = 32;
int expectedMaximumUtilization = 26;
assertEquals(
expectedEvenly,
JobVertexScaler.scale(
vertex,
currentParallelism,
inputShipStrategies,
0,
maxParallelism,
scaleFactor,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));

Configuration conf = context.getConfiguration();
conf.set(
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);
assertEquals(
expectedMaximumUtilization,
JobVertexScaler.scale(
vertex,
currentParallelism,
inputShipStrategies,
0,
maxParallelism,
scaleFactor,
parallelismLowerLimit,
maxParallelism,
eventCollector,
context));
Expand Down Expand Up @@ -1004,6 +1041,47 @@ public void testNumPartitionsAdjustment() {
parallelismUpperLimit,
eventCollector,
context));

int partition = 199;
double scaleFactor = 4;
int currentParallelism = 24;
int expectedEvenly = 199;
// At MAXIMIZE_UTILISATION, 99 subtasks consume two partitions,
// one subtask consumes one partition.
int expectedMaximumUtilization = 100;

assertEquals(
expectedEvenly,
JobVertexScaler.scale(
vertex,
currentParallelism,
List.of(),
partition,
parallelismUpperLimit,
scaleFactor,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));

Configuration conf = context.getConfiguration();
conf.set(
AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE,
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION);

assertEquals(
expectedMaximumUtilization,
JobVertexScaler.scale(
vertex,
currentParallelism,
List.of(),
partition,
parallelismUpperLimit,
scaleFactor,
parallelismLowerLimit,
parallelismUpperLimit,
eventCollector,
context));
}

@Test
Expand Down
Loading