Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
try {
return deserializeDelayedScaleDown(delayedScaleDown.get());
} catch (JacksonException e) {
LOG.error(
LOG.warn(
"Could not deserialize delayed scale down, possibly the format changed. Discarding...",
e);
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN);
Expand Down Expand Up @@ -330,13 +330,11 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides)

private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown)
throws JacksonException {
return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
return YAML_MAPPER.writeValueAsString(delayedScaleDown);
}

private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown)
throws JacksonException {
Map<JobVertexID, Instant> firstTriggerTime =
YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
return new DelayedScaleDown(firstTriggerTime);
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,81 @@

import org.apache.flink.runtime.jobgraph.JobVertexID;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import lombok.Data;
import lombok.Getter;

import javax.annotation.Nonnull;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/** All delayed scale down requests. */
public class DelayedScaleDown {

@Getter private final Map<JobVertexID, Instant> firstTriggerTime;
/** The delayed scale down info for vertex. */
@Data
public static class VertexDelayedScaleDownInfo {
private final Instant firstTriggerTime;
private int maxRecommendedParallelism;

@JsonCreator
public VertexDelayedScaleDownInfo(
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
this.firstTriggerTime = firstTriggerTime;
this.maxRecommendedParallelism = maxRecommendedParallelism;
}
}

@Getter private final Map<JobVertexID, VertexDelayedScaleDownInfo> delayedVertices;

// Have any scale down request been updated? It doesn't need to be stored, it is only used to
// determine whether DelayedScaleDown needs to be stored.
@Getter private boolean isUpdated = false;
@JsonIgnore @Getter private boolean updated = false;

public DelayedScaleDown() {
this.firstTriggerTime = new HashMap<>();
}

public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
this.firstTriggerTime = firstTriggerTime;
this.delayedVertices = new HashMap<>();
}

Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
return Optional.ofNullable(firstTriggerTime.get(vertex));
}
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
@Nonnull
public VertexDelayedScaleDownInfo triggerScaleDown(
JobVertexID vertex, Instant triggerTime, int parallelism) {
var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
if (vertexDelayedScaleDownInfo == null) {
// It's the first trigger
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
updated = true;
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
updated = true;
}

void updateTriggerTime(JobVertexID vertex, Instant instant) {
firstTriggerTime.put(vertex, instant);
isUpdated = true;
return vertexDelayedScaleDownInfo;
}

// Clear the delayed scale down for corresponding vertex when the recommended parallelism is
// greater than or equal to the currentParallelism.
void clearVertex(JobVertexID vertex) {
Instant removed = firstTriggerTime.remove(vertex);
VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex);
if (removed != null) {
isUpdated = true;
updated = true;
}
}

// Clear all delayed scale down when rescale happens.
void clearAll() {
if (firstTriggerTime.isEmpty()) {
if (delayedVertices.isEmpty()) {
return;
}
firstTriggerTime.clear();
isUpdated = true;
delayedVertices.clear();
updated = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
this.autoScalerEventHandler = autoScalerEventHandler;
}

/** The parallelism change type of {@link ParallelismChange}. */
public enum ParallelismChangeType {
NO_CHANGE,
REQUIRED_CHANGE,
OPTIONAL_CHANGE;
}

/**
* The rescaling will be triggered if any vertex's ParallelismChange is required. This means
* that if all vertices' ParallelismChange is optional, rescaling will be ignored.
*/
/** The rescaling will be triggered if any vertex's {@link ParallelismChange} is changed. */
@Getter
public static class ParallelismChange {

private static final ParallelismChange NO_CHANGE =
new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);
private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1);

private final ParallelismChangeType changeType;
private final int newParallelism;

private ParallelismChange(ParallelismChangeType changeType, int newParallelism) {
this.changeType = changeType;
private ParallelismChange(int newParallelism) {
this.newParallelism = newParallelism;
}

public boolean isNoChange() {
return this == NO_CHANGE;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -122,30 +113,24 @@ public boolean equals(Object o) {
return false;
}
ParallelismChange that = (ParallelismChange) o;
return changeType == that.changeType && newParallelism == that.newParallelism;
return newParallelism == that.newParallelism;
}

@Override
public int hashCode() {
return Objects.hash(changeType, newParallelism);
return Objects.hash(newParallelism);
}

@Override
public String toString() {
return "ParallelismChange{"
+ "changeType="
+ changeType
+ ", newParallelism="
+ newParallelism
+ '}';
return isNoChange()
? "NoParallelismChange"
: "ParallelismChange{newParallelism=" + newParallelism + '}';
}

public static ParallelismChange required(int newParallelism) {
return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
}

public static ParallelismChange optional(int newParallelism) {
return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
public static ParallelismChange build(int newParallelism) {
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
return new ParallelismChange(newParallelism);
}

public static ParallelismChange noChange() {
Expand Down Expand Up @@ -263,7 +248,7 @@ private ParallelismChange detectBlockScaling(

// If we don't have past scaling actions for this vertex, don't block scale up.
if (history.isEmpty()) {
return ParallelismChange.required(newParallelism);
return ParallelismChange.build(newParallelism);
}

var lastSummary = history.get(history.lastKey());
Expand All @@ -275,7 +260,7 @@ && detectIneffectiveScaleUp(
return ParallelismChange.noChange();
}

return ParallelismChange.required(newParallelism);
return ParallelismChange.build(newParallelism);
} else {
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
}
Expand All @@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
if (scaleDownInterval.toMillis() <= 0) {
// The scale down interval is disable, so don't block scaling.
return ParallelismChange.required(newParallelism);
}

var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
if (firstTriggerTime.isEmpty()) {
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
delayedScaleDown.updateTriggerTime(vertex, clock.instant());
return ParallelismChange.optional(newParallelism);
return ParallelismChange.build(newParallelism);
}

if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex);
return ParallelismChange.optional(newParallelism);
var now = clock.instant();
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);

// Never scale down within scale down interval
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename this method to getFirstScaledUpTime()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a typo?

Do you mean rename the firstTriggerTime to firstScaleDownTime inside of VertexDelayedScaleDownInfo?

I prefer to use firstTriggerTime because the class name includes DelayedScaleDown, so field name or method name doesn't need to includes scale down again. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that getFirstTriggerTime() returns the first time we scaled up, but we are actually recording the time we first try to scale down.

I'm not sure this is correct. We want to delay scale down from the first time we scale up, not the first time we scaled down.

Copy link
Member Author

@1996fanrui 1996fanrui Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mxm for the clarification!

Actually, we expect to use the first scale down time instead of scale up time as the first trigger time.

  • job.autoscaler.scale-up.grace-period hopes to use scale up time
  • And job.autoscaler.scale-down.interval hopes to use the first scale down time, it scale-down.interval is 1 hour, we expect the scale down can be executed after 1 hour.
    • It's delayed scale down
    • And It could merge multiple scale down requests into one scale down execution.
  • If scale-down.interval uses the scale up time, when scale down request comes, job will execute scale down directly if job is scaled up 1 hour ago. (It cannot merge multiple scale down request into one execution)

As I understand, the scale down interval wanna to reduce the scale down frequency.

Please correct me if anything is wrong, thank you~

if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
} else {
LOG.debug(
"Try to skip immediate scale down within scale-down interval for {}",
vertex);
}
return ParallelismChange.noChange();
} else {
return ParallelismChange.required(newParallelism);
// Using the maximum parallelism within the scale down interval window instead of the
// latest parallelism when scaling down
return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
Expand Down Expand Up @@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
}

@VisibleForTesting
static boolean allRequiredVerticesWithinUtilizationTarget(
static boolean allChangedVerticesWithinUtilizationTarget(
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Set<JobVertexID> requiredVertices) {
// All vertices' ParallelismChange is optional, rescaling will be ignored.
if (requiredVertices.isEmpty()) {
Set<JobVertexID> changedVertices) {
// No vertices with changed parallelism.
if (changedVertices.isEmpty()) {
return true;
}

for (JobVertexID vertex : requiredVertices) {
for (JobVertexID vertex : changedVertices) {
var metrics = evaluatedMetrics.get(vertex);

double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
Expand Down Expand Up @@ -234,7 +231,6 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
}

var out = new HashMap<JobVertexID, ScalingSummary>();
var requiredVertices = new HashSet<JobVertexID>();

var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
Expand All @@ -260,10 +256,8 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
v, Collections.emptySortedMap()),
restartTime,
delayedScaleDown);
if (NO_CHANGE == parallelismChange.getChangeType()) {
if (parallelismChange.isNoChange()) {
return;
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
requiredVertices.add(v);
}
out.put(
v,
Expand All @@ -274,10 +268,9 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
}
});

// If the Utilization of all required tasks is within range, we can skip scaling.
// It means that if only optional tasks are out of scope, we still need to ignore scale.
if (allRequiredVerticesWithinUtilizationTarget(
evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
// If the Utilization of all tasks is within range, we can skip scaling.
if (allChangedVerticesWithinUtilizationTarget(
evaluatedMetrics.getVertexMetrics(), out.keySet())) {
return Map.of();
}

Expand Down
Loading