Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
Expand Down Expand Up @@ -178,6 +184,12 @@ public ParallelismChange computeScaleTargetParallelism(

LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
double scalingCoefficient =
JobVertexScaler.calculateObservedScalingCoefficient(
history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS));
scaleFactor = scaleFactor / scalingCoefficient;
}
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
if (scaleFactor < minScaleFactor) {
Expand Down Expand Up @@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism(
delayedScaleDown);
}

/**
* Calculates the scaling coefficient based on historical scaling data.
*
* <p>The scaling coefficient is computed using a weighted least squares approach, where more
* recent data points and those with higher parallelism are given higher weights. If there are
* not enough observations, or if the computed coefficient is invalid, a default value of {@code
* 1.0} is returned, assuming linear scaling.
*
* @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary}
* @param minObservations The minimum number of observations required to compute the scaling
* coefficient. If the number of historical entries is less than this threshold, a default
* coefficient of {@code 1.0} is returned.
* @return The computed scaling coefficient.
*/
@VisibleForTesting
protected static double calculateObservedScalingCoefficient(
SortedMap<Instant, ScalingSummary> history, int minObservations) {
/*
* The scaling coefficient is computed using a **weighted least squares** approach
* to fit a linear model:
*
* R_i = β * P_i * α
*
* where:
* - R_i = observed processing rate
* - P_i = parallelism
* - β = baseline processing rate
* - α = scaling coefficient to optimize
*
* The optimization minimizes the **weighted sum of squared errors**:
*
* Loss = ∑ w_i * (R_i - β * α * P_i)^2
*
* Differentiating w.r.t. α and solving for α:
*
* α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β)
*
* We keep the system conservative for higher returns scenario by clamping computed α within 1.0.
*/

// not enough data to compute scaling coefficient. we assume linear scaling.
if (history.isEmpty() || history.size() < minObservations) {
return 1.0;
}

var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history);

if (Double.isNaN(baselineProcessingRate)) {
return 1.0;
}

Instant latestTimestamp = history.lastKey();

List<Double> parallelismList = new ArrayList<>();
List<Double> processingRateList = new ArrayList<>();
List<Double> weightList = new ArrayList<>();

for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
Instant timestamp = entry.getKey();
ScalingSummary summary = entry.getValue();
double parallelism = summary.getCurrentParallelism();
double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();

if (Double.isNaN(processingRate)) {
LOG.warn(
"True processing rate is not available in scaling history. Cannot compute scaling coefficient.");
return 1.0;
}

// Compute weight based on recency & parallelism
double timeDiff =
Duration.between(timestamp, latestTimestamp).getSeconds()
+ 1; // Avoid division by zero
double weight = parallelism / timeDiff;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide on this particular weighting approach? To be specific, what's the benefit compared to:

  • Not weighting
  • Using weights based on the difference with the current parallelism (locally weighted regression)

I think overall weighting makes sense but maybe weighing based on the parallelism difference ( and time) makes more sense then simply parallelism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also add an enum configuration with some strategies here if we feel that would be required, but maybe an overkill initially

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering whether applying too much recency bias could hurt the model. Simply weighing by the parallelism should already produce good results. I see how scalability of a pipeline might change over time due to factors like growing state, so maybe using recency bias is smart, as long as the recency influence isn't too strong.


parallelismList.add(parallelism);
processingRateList.add(processingRate);
weightList.add(weight);
}

var coefficient =
AutoScalerUtils.optimizeLinearScalingCoefficient(
parallelismList,
processingRateList,
weightList,
baselineProcessingRate,
1,
0.01);
return BigDecimal.valueOf(coefficient).setScale(2, RoundingMode.CEILING).doubleValue();
}

private ParallelismChange detectBlockScaling(
Context context,
JobVertexID vertex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
"scaling.key-group.partitions.adjust.mode"))
.withDescription(
"How to adjust the parallelism of Source vertex or upstream shuffle is keyBy");

public static final ConfigOption<Boolean> OBSERVED_SCALABILITY_ENABLED =
autoScalerConfig("observed-scalability.enabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.enabled"))
.withDescription(
"Enables the use of an observed scalability coefficient when computing target parallelism. If enabled, the system will estimate the scalability coefficient based on historical scaling data instead of assuming perfect linear scaling. This helps account for real-world inefficiencies such as network overhead and coordination costs.");

public static final ConfigOption<Integer> OBSERVED_SCALABILITY_MIN_OBSERVATIONS =
autoScalerConfig("observed-scalability.min-observations")
.intType()
.defaultValue(5)
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.min-observations"))
.withDescription(
"Defines the minimum number of historical scaling observations required to estimate the scalability coefficient. If the number of available observations is below this threshold, the system falls back to assuming linear scaling.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

package org.apache.flink.autoscaler.utils;

import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/** AutoScaler utilities. */
public class AutoScalerUtils {
Expand Down Expand Up @@ -94,4 +99,94 @@ public static boolean excludeVerticesFromScaling(
conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new ArrayList<>(excludedIds));
return anyAdded;
}

/**
* Computes the optimized linear scaling coefficient (α) by minimizing the weighted least
* squares error.
*
* <p>This method estimates the scaling coefficient in a linear scaling model by fitting
* observed processing rates and parallelism levels while applying weights to account for
* recency and significance.
*
* <p>The computed coefficient is clamped within the specified lower and upper bounds to ensure
* stability and prevent extreme scaling adjustments.
*
* @param parallelismLevels List of parallelism levels.
* @param processingRates List of observed processing rates.
* @param weights List of weights for each observation.
* @param baselineProcessingRate Baseline processing rate.
* @param upperBound Maximum allowable value for the scaling coefficient.
* @param lowerBound Minimum allowable value for the scaling coefficient.
* @return The optimized scaling coefficient (α), constrained within {@code [lowerBound,
* upperBound]}.
*/
public static double optimizeLinearScalingCoefficient(
List<Double> parallelismLevels,
List<Double> processingRates,
List<Double> weights,
double baselineProcessingRate,
double upperBound,
double lowerBound) {

double weightedSum = 0.0;
double weightedSquaredSum = 0.0;

for (int i = 0; i < parallelismLevels.size(); i++) {
double parallelism = parallelismLevels.get(i);
double processingRate = processingRates.get(i);
double weight = weights.get(i);

weightedSum += weight * parallelism * processingRate;
weightedSquaredSum += weight * parallelism * parallelism;
}

if (weightedSquaredSum == 0.0) {
return 1.0; // Fallback to linear scaling if denominator is zero
}

double alpha = weightedSum / (weightedSquaredSum * baselineProcessingRate);

return Math.max(lowerBound, Math.min(upperBound, alpha));
}

/**
* Computes the baseline processing rate from historical scaling data.
*
* <p>The baseline processing rate represents the **processing rate per unit of parallelism**.
* It is determined using the smallest observed parallelism in the history.
*
* @param history A {@code SortedMap} where keys are timestamps ({@code Instant}), and values
* are {@code ScalingSummary} objects.
* @return The computed baseline processing rate (processing rate per unit of parallelism).
*/
public static double computeBaselineProcessingRate(SortedMap<Instant, ScalingSummary> history) {
ScalingSummary latestSmallestParallelismSummary = null;

for (Map.Entry<Instant, ScalingSummary> entry :
((NavigableMap<Instant, ScalingSummary>) history).descendingMap().entrySet()) {
ScalingSummary summary = entry.getValue();
double parallelism = summary.getCurrentParallelism();

if (parallelism == 1) {
return summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
}

if (latestSmallestParallelismSummary == null
|| parallelism < latestSmallestParallelismSummary.getCurrentParallelism()) {
latestSmallestParallelismSummary = entry.getValue();
}
}

if (latestSmallestParallelismSummary == null) {
return Double.NaN;
}

double parallelism = latestSmallestParallelismSummary.getCurrentParallelism();
double processingRate =
latestSmallestParallelismSummary
.getMetrics()
.get(TRUE_PROCESSING_RATE)
.getAverage();
return processingRate / parallelism;
}
}
Loading