Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/auto_scaler_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,22 @@
<td>Stabilization period in which no new scaling will be executed</td>
</tr>
<tr>
<td><h5>job.autoscaler.target.utilization</h5></td>
<td style="word-wrap: break-word;">0.7</td>
<td><h5>job.autoscaler.utilization.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Double</td>
<td>Target vertex utilization</td>
<td>Max vertex utilization</td>
</tr>
<tr>
<td><h5>job.autoscaler.target.utilization.boundary</h5></td>
<td style="word-wrap: break-word;">0.3</td>
<td><h5>job.autoscaler.utilization.min</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Double</td>
<td>Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]</td>
<td>Min vertex utilization</td>
</tr>
<tr>
<td><h5>job.autoscaler.utilization.target</h5></td>
<td style="word-wrap: break-word;">0.7</td>
<td>Double</td>
<td>Target vertex utilization</td>
</tr>
<tr>
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
Expand Down Expand Up @@ -158,7 +158,7 @@ public ParallelismChange computeScaleTargetParallelism(

double targetCapacity =
AutoScalerUtils.getTargetProcessingCapacity(
evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true, restartTime);
evaluatedMetrics, conf, conf.get(UTILIZATION_TARGET), true, restartTime);
if (Double.isNaN(targetCapacity)) {
LOG.warn(
"Target data rate is not available for {}, cannot compute new parallelism",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import java.util.SortedMap;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
Expand Down Expand Up @@ -284,8 +286,8 @@ protected static void computeProcessingRateThresholds(
boolean processingBacklog,
Duration restartTime) {

double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
double targetUtilization = conf.get(TARGET_UTILIZATION);
double targetUtilization = conf.get(UTILIZATION_TARGET);
double utilizationBoundary = conf.get(TARGET_UTILIZATION_BOUNDARY);

double upperUtilization;
double lowerUtilization;
Expand All @@ -296,8 +298,12 @@ protected static void computeProcessingRateThresholds(
upperUtilization = 1.0;
lowerUtilization = 0.0;
} else {
upperUtilization = targetUtilization + utilizationBoundary;
lowerUtilization = targetUtilization - utilizationBoundary;
upperUtilization =
conf.getOptional(UTILIZATION_MAX)
.orElse(targetUtilization + utilizationBoundary);
lowerUtilization =
conf.getOptional(UTILIZATION_MIN)
.orElse(targetUtilization - utilizationBoundary);
}

double scaleUpThreshold =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,17 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
+ "seconds suffix, daily expression's formation is startTime-endTime, such as 9:30:30-10:50:20, when exclude from 9:30:30-10:50:20 in Monday and Thursday "
+ "we can express it as 9:30:30-10:50:20 && * * * ? * 2,5");

public static final ConfigOption<Double> TARGET_UTILIZATION =
autoScalerConfig("target.utilization")
public static final ConfigOption<Double> UTILIZATION_TARGET =
autoScalerConfig("utilization.target")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to change this well-established key? This might confuse users, even though we have a fallback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand its cleaner, but perhaps we can simply add the new keys and leave this unchanged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand its cleaner, but perhaps we can simply add the new keys and leave this unchanged?
Do we have to change this well-established key? This might confuse users, even though we have a fallback.

I think methed withFallbackKeys good enough
WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the long term it's preferable to have consistent well named keys. I think the fallback keys should be enough. I would expect that users will gravitate towards the new min/max options and at that point this will make a lot of sense for everyone.

.doubleType()
.defaultValue(0.7)
.withFallbackKeys(oldOperatorConfigKey("target.utilization"))
.withDeprecatedKeys(autoScalerConfigKey("target.utilization"))
.withFallbackKeys(
oldOperatorConfigKey("utilization.target"),
oldOperatorConfigKey("target.utilization"))
.withDescription("Target vertex utilization");

@Deprecated
public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
autoScalerConfig("target.utilization.boundary")
.doubleType()
Expand All @@ -104,6 +108,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"Target vertex utilization boundary. Scaling won't be performed if the processing capacity is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]");

public static final ConfigOption<Double> UTILIZATION_MAX =
autoScalerConfig("utilization.max")
.doubleType()
.noDefaultValue()
.withFallbackKeys(oldOperatorConfigKey("utilization.max"))
.withDescription("Max vertex utilization");

public static final ConfigOption<Double> UTILIZATION_MIN =
autoScalerConfig("utilization.min")
.doubleType()
.noDefaultValue()
.withFallbackKeys(oldOperatorConfigKey("utilization.min"))
.withDescription("Min vertex utilization");
Comment on lines +111 to +123
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 options have no default value, I'm not sure whether it's expected.

If the target.utilization.boundary is removed in the future, how do we handle the default value?

Usually, when a new option is introduced, a default value is also designed. When deprecated options are removed in subsequent versions, we only need to remove unnecessary compatibility code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, maybe retaining the target.utilization.boundary parameter is also a feasible solution,like latest code changes, I don't know if my understanding is consistent with @gyfora and @mxm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, we will only keep the following three options in the end, and make it compatible with job.autoscaler.target.utilization.boundary before it is deprecated.

job.autoscaler.utilization.target
job.autoscaler.utilization.min
job.autoscaler.utilization.max

Do you mean we keep all 4 options in the end?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove the boundary at the end (deprecate now), but still min/max should not have fixed defaults as the default would be derived from the current target.


public static final ConfigOption<Duration> SCALE_DOWN_INTERVAL =
autoScalerConfig("scale-down.interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ public void setup() {
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void setup() {
@MethodSource("adjustmentInputsProvider")
public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies) {
var op = new JobVertexID();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
var delayedScaleDown = new DelayedScaleDown();

Expand All @@ -113,7 +114,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
restartTime,
delayedScaleDown));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
ParallelismChange.build(8),
vertexScaler.computeScaleTargetParallelism(
Expand All @@ -125,7 +126,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
restartTime,
delayedScaleDown));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
ParallelismChange.noChange(),
vertexScaler.computeScaleTargetParallelism(
Expand All @@ -137,7 +138,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
restartTime,
delayedScaleDown));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
ParallelismChange.build(8),
vertexScaler.computeScaleTargetParallelism(
Expand All @@ -160,7 +161,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
restartTime,
delayedScaleDown));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
conf.set(UTILIZATION_TARGET, 0.5);
assertEquals(
ParallelismChange.build(10),
vertexScaler.computeScaleTargetParallelism(
Expand All @@ -172,7 +173,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
restartTime,
delayedScaleDown));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
conf.set(UTILIZATION_TARGET, 0.6);
assertEquals(
ParallelismChange.build(4),
vertexScaler.computeScaleTargetParallelism(
Expand All @@ -184,7 +185,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
restartTime,
delayedScaleDown));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
assertEquals(
ParallelismChange.build(5),
Expand All @@ -209,7 +210,7 @@ public void testParallelismScaling(Collection<ShipStrategy> inputShipStrategies)
restartTime,
delayedScaleDown));

conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
assertEquals(
ParallelismChange.build(15),
Expand Down Expand Up @@ -558,7 +559,7 @@ public void testMinParallelismLimitIsUsed() {
@Test
public void testMaxParallelismLimitIsUsed() {
conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
var delayedScaleDown = new DelayedScaleDown();

assertEquals(
Expand Down Expand Up @@ -587,7 +588,7 @@ public void testMaxParallelismLimitIsUsed() {

@Test
public void testDisableScaleDownInterval() {
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(0));

var delayedScaleDown = new DelayedScaleDown();
Expand All @@ -597,7 +598,7 @@ public void testDisableScaleDownInterval() {

@Test
public void testScaleDownAfterInterval() {
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
var instant = Instant.now();

Expand Down Expand Up @@ -629,7 +630,7 @@ public void testScaleDownAfterInterval() {

@Test
public void testImmediateScaleUpWithinScaleDownInterval() {
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
var instant = Instant.now();

Expand All @@ -655,7 +656,7 @@ public void testImmediateScaleUpWithinScaleDownInterval() {

@Test
public void testCancelDelayedScaleDownAfterNewParallelismIsSame() {
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
var instant = Instant.now();

Expand Down Expand Up @@ -701,7 +702,7 @@ private void assertParallelismChange(
public void testIneffectiveScalingDetection() {
var op = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);

var evaluated = evaluated(5, 100, 50);
Expand Down Expand Up @@ -826,7 +827,7 @@ public void testIneffectiveScalingDetection() {
public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> inputShipStrategies) {
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
conf.set(UTILIZATION_TARGET, 1.0);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);

var evaluated = evaluated(5, 100, 50);
Expand Down Expand Up @@ -1082,7 +1083,7 @@ public void testNumPartitionsAdjustment() {
@Test
public void testSendingScalingLimitedEvents() {
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
conf.set(UTILIZATION_TARGET, 1.0);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
var evaluated = evaluated(10, 200, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ public void setup() {
@Test
public void testEndToEnd() throws Exception {
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);
conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);

setDefaultMetrics(metricsCollector);

Expand Down Expand Up @@ -344,8 +345,9 @@ public void testMetricCollectorWindow() throws Exception {
@Test
public void testClearHistoryOnTopoChange() throws Exception {
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
conf.set(AutoScalerOptions.UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.UTILIZATION_MIN, 1.);
conf.set(AutoScalerOptions.UTILIZATION_MAX, 1.);

setDefaultMetrics(metricsCollector);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public void setup() {
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8);
defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9);
defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7);
defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);

autoscaler =
Expand Down
Loading
Loading