Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,96 @@

import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkState;

/** All delayed scale down requests. */
public class DelayedScaleDown {

/** Details of the recommended parallelism. */
@Data
public static class RecommendedParallelism {
@Nonnull private final Instant triggerTime;
private final int parallelism;
private final boolean outsideUtilizationBound;

@JsonCreator
public RecommendedParallelism(
@Nonnull @JsonProperty("triggerTime") Instant triggerTime,
@JsonProperty("parallelism") int parallelism,
@JsonProperty("outsideUtilizationBound") boolean outsideUtilizationBound) {
this.triggerTime = triggerTime;
this.parallelism = parallelism;
this.outsideUtilizationBound = outsideUtilizationBound;
}
}

/** The delayed scale down info for vertex. */
@Data
public static class VertexDelayedScaleDownInfo {
private final Instant firstTriggerTime;
private int maxRecommendedParallelism;

/**
* In theory, it maintains all recommended parallelisms at each time within the past
* `scale-down.interval` window period, so all recommended parallelisms before the window
* start time will be evicted.
*
* <p>Also, if latest parallelism is greater than the past parallelism, all smaller
* parallelism in the past never be the max recommended parallelism, so we could evict all
* smaller parallelism in the past. It's a general optimization for calculating max value
* for sliding window. So we only need to maintain a list with monotonically decreasing
* parallelism within the past window, and the first parallelism will be the max recommended
* parallelism within the past `scale-down.interval` window period.
*/
private final LinkedList<RecommendedParallelism> recommendedParallelisms;

public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
this.firstTriggerTime = firstTriggerTime;
this.recommendedParallelisms = new LinkedList<>();
}

@JsonCreator
public VertexDelayedScaleDownInfo(
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
@JsonProperty("recommendedParallelisms")
LinkedList<RecommendedParallelism> recommendedParallelisms) {
this.firstTriggerTime = firstTriggerTime;
this.maxRecommendedParallelism = maxRecommendedParallelism;
this.recommendedParallelisms = recommendedParallelisms;
}

/** Record current recommended parallelism. */
public void recordRecommendedParallelism(
Instant triggerTime, int parallelism, boolean outsideUtilizationBound) {
// Evict all recommended parallelisms that are lower than or equal to the latest
// parallelism. When the past parallelism is equal to the latest parallelism,
// triggerTime needs to be updated, so it also needs to be evicted.
while (!recommendedParallelisms.isEmpty()
&& recommendedParallelisms.peekLast().getParallelism() <= parallelism) {
recommendedParallelisms.pollLast();
}
Comment on lines +98 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good optimization 👍

Copy link
Member Author

@1996fanrui 1996fanrui Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gyfora for the comment in advance, I'm still adding more tests and comments for this PR.

I have some other works to do this week, so I expect this PR will be ready next week.


recommendedParallelisms.addLast(
new RecommendedParallelism(triggerTime, parallelism, outsideUtilizationBound));
}

@JsonIgnore
public RecommendedParallelism getMaxRecommendedParallelism(Instant windowStartTime) {
// Evict all recommended parallelisms before the window start time.
while (!recommendedParallelisms.isEmpty()
&& recommendedParallelisms
.peekFirst()
.getTriggerTime()
.isBefore(windowStartTime)) {
recommendedParallelisms.pollFirst();
}

var maxRecommendedParallelism = recommendedParallelisms.peekFirst();
checkState(
maxRecommendedParallelism != null,
"The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug.");
return maxRecommendedParallelism;
}
}

Expand All @@ -63,18 +136,19 @@ public DelayedScaleDown() {
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
@Nonnull
public VertexDelayedScaleDownInfo triggerScaleDown(
JobVertexID vertex, Instant triggerTime, int parallelism) {
var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
if (vertexDelayedScaleDownInfo == null) {
// It's the first trigger
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
updated = true;
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
updated = true;
}
JobVertexID vertex,
Instant triggerTime,
int parallelism,
boolean outsideUtilizationBound) {
// The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
// update the triggerTime each time.
updated = true;

var vertexDelayedScaleDownInfo =
delayedVertices.computeIfAbsent(
vertex, k -> new VertexDelayedScaleDownInfo(triggerTime));
vertexDelayedScaleDownInfo.recordRecommendedParallelism(
triggerTime, parallelism, outsideUtilizationBound);

return vertexDelayedScaleDownInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,5 +256,7 @@ private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(Context ctx) {
@VisibleForTesting
void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
this.metricsCollector.setClock(clock);
this.scalingExecutor.setClock(clock);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
import static org.apache.flink.configuration.description.TextElement.text;
Expand Down Expand Up @@ -92,12 +94,15 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
@Getter
public static class ParallelismChange {

private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1);
private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1, false);

private final int newParallelism;

private ParallelismChange(int newParallelism) {
private final boolean outsideUtilizationBound;

private ParallelismChange(int newParallelism, boolean outsideUtilizationBound) {
this.newParallelism = newParallelism;
this.outsideUtilizationBound = outsideUtilizationBound;
}

public boolean isNoChange() {
Expand All @@ -113,24 +118,29 @@ public boolean equals(Object o) {
return false;
}
ParallelismChange that = (ParallelismChange) o;
return newParallelism == that.newParallelism;
return newParallelism == that.newParallelism
&& outsideUtilizationBound == that.outsideUtilizationBound;
}

@Override
public int hashCode() {
return Objects.hash(newParallelism);
return Objects.hash(newParallelism, outsideUtilizationBound);
}

@Override
public String toString() {
return isNoChange()
? "NoParallelismChange"
: "ParallelismChange{newParallelism=" + newParallelism + '}';
: "ParallelismChange{newParallelism="
+ newParallelism
+ ", outsideUtilizationBound="
+ outsideUtilizationBound
+ "}";
}

public static ParallelismChange build(int newParallelism) {
public static ParallelismChange build(int newParallelism, boolean outsideUtilizationBound) {
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
return new ParallelismChange(newParallelism);
return new ParallelismChange(newParallelism, outsideUtilizationBound);
}

public static ParallelismChange noChange() {
Expand Down Expand Up @@ -239,6 +249,8 @@ private ParallelismChange detectBlockScaling(
currentParallelism != newParallelism,
"The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug.");

var outsideUtilizationBound = outsideUtilizationBound(vertex, evaluatedMetrics);

var scaledUp = currentParallelism < newParallelism;

if (scaledUp) {
Expand All @@ -248,7 +260,7 @@ private ParallelismChange detectBlockScaling(

// If we don't have past scaling actions for this vertex, don't block scale up.
if (history.isEmpty()) {
return ParallelismChange.build(newParallelism);
return ParallelismChange.build(newParallelism, outsideUtilizationBound);
}

var lastSummary = history.get(history.lastKey());
Expand All @@ -260,28 +272,59 @@ && detectIneffectiveScaleUp(
return ParallelismChange.noChange();
}

return ParallelismChange.build(newParallelism);
return ParallelismChange.build(newParallelism, outsideUtilizationBound);
} else {
return applyScaleDownInterval(
delayedScaleDown, vertex, conf, newParallelism, outsideUtilizationBound);
}
}

private static boolean outsideUtilizationBound(
JobVertexID vertex, Map<ScalingMetric, EvaluatedScalingMetric> metrics) {
double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();

if (trueProcessingRate < scaleUpRateThreshold
|| trueProcessingRate > scaleDownRateThreshold) {
LOG.debug(
"Vertex {} processing rate {} is outside ({}, {})",
vertex,
trueProcessingRate,
scaleUpRateThreshold,
scaleDownRateThreshold);
return true;
} else {
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
LOG.debug(
"Vertex {} processing rate {} is within target ({}, {})",
vertex,
trueProcessingRate,
scaleUpRateThreshold,
scaleDownRateThreshold);
}
return false;
}

private ParallelismChange applyScaleDownInterval(
DelayedScaleDown delayedScaleDown,
JobVertexID vertex,
Configuration conf,
int newParallelism) {
int newParallelism,
boolean outsideUtilizationBound) {
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
if (scaleDownInterval.toMillis() <= 0) {
// The scale down interval is disable, so don't block scaling.
return ParallelismChange.build(newParallelism);
return ParallelismChange.build(newParallelism, outsideUtilizationBound);
}

var now = clock.instant();
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
var windowStartTime = now.minus(scaleDownInterval);
var delayedScaleDownInfo =
delayedScaleDown.triggerScaleDown(
vertex, now, newParallelism, outsideUtilizationBound);

// Never scale down within scale down interval
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
if (windowStartTime.isBefore(delayedScaleDownInfo.getFirstTriggerTime())) {
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
} else {
Expand All @@ -293,7 +336,11 @@ private ParallelismChange applyScaleDownInterval(
} else {
// Using the maximum parallelism within the scale down interval window instead of the
// latest parallelism when scaling down
return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
var maxRecommendedParallelism =
delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime);
return ParallelismChange.build(
maxRecommendedParallelism.getParallelism(),
maxRecommendedParallelism.isOutsideUtilizationBound());
}
}

Expand Down
Loading
Loading