Skip to content

Commit d1cbb11

Browse files
committed
1. adding comment for supporting fallback key with delegating configuration. 2. add sample custom evaluator for simple trend adjustor
1 parent 89f7857 commit d1cbb11

File tree

2 files changed

+109
-1
lines changed

2 files changed

+109
-1
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
395395

396396
public static Configuration forCustomEvaluator(
397397
Configuration configuration, String customEvaluatorName) {
398-
398+
// add support for fallBackKey with DelegatingConfiguration.
399399
return new DelegatingConfiguration(
400400
configuration,
401401
AUTOSCALER_CONF_PREFIX + CUSTOM_EVALUATOR_CONF_PREFIX + customEvaluatorName + ".");
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.metrics;
20+
21+
import org.apache.flink.runtime.jobgraph.JobVertexID;
22+
23+
import java.time.Instant;
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.SortedMap;
28+
29+
/**
30+
* A simple implementation of the {@link CustomEvaluator} interface that adjusts scaling metrics
31+
* based on recent historical trends. This evaluator applies a weighted moving average to refine the
32+
* target data rate for source job vertices, enabling more responsive scaling decisions.
33+
*/
34+
public class SimpleTrendAdjustor implements CustomEvaluator {
35+
@Override
36+
public Map<ScalingMetric, EvaluatedScalingMetric> evaluateVertexMetrics(
37+
JobVertexID vertex,
38+
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
39+
Context evaluationContext) {
40+
41+
if (!evaluationContext.getTopology().isSource(vertex)) {
42+
return Collections.emptyMap();
43+
}
44+
45+
var customEvaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
46+
47+
// Extract current target data rate
48+
EvaluatedScalingMetric targetDataRateMetric =
49+
evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE);
50+
double currentTargetRate =
51+
(targetDataRateMetric != null) ? targetDataRateMetric.getAverage() : 0.0;
52+
53+
// Compute historical trend adjustment
54+
double trendAdjustment =
55+
computeTrendAdjustment(vertex, evaluationContext.getMetricsHistory());
56+
57+
// Apply a dynamic adjustment based on recent trends
58+
double adjustedTargetRate = currentTargetRate + trendAdjustment;
59+
60+
// Store the updated metric
61+
customEvaluatedMetrics.put(
62+
ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedTargetRate));
63+
64+
return customEvaluatedMetrics;
65+
}
66+
67+
/**
68+
* Computes a trend-based adjustment using recent historical metrics. Uses a simple weighted
69+
* moving average over the last few recorded metrics.
70+
*/
71+
private double computeTrendAdjustment(
72+
JobVertexID vertex, SortedMap<Instant, CollectedMetrics> metricsHistory) {
73+
if (metricsHistory.isEmpty()) {
74+
// Fallback: apply no increase if no history is available
75+
return 0.;
76+
}
77+
78+
double totalWeight = 0.0;
79+
double weightedSum = 0.0;
80+
// Increasing weight for more recent data points
81+
int weight = 1;
82+
83+
// Iterate over the last N entries (e.g., last 5 data points)
84+
int count = 0;
85+
for (var entry : metricsHistory.values()) {
86+
Double historicalRate =
87+
entry.getVertexMetrics().get(vertex).get(ScalingMetric.TARGET_DATA_RATE);
88+
if (historicalRate != null) {
89+
weightedSum += historicalRate * weight;
90+
totalWeight += weight;
91+
weight++;
92+
count++;
93+
}
94+
if (count >= 5) { // Limit to last 5 points
95+
break;
96+
}
97+
}
98+
99+
return (totalWeight > 0)
100+
? (weightedSum / totalWeight)
101+
- metricsHistory
102+
.get(metricsHistory.lastKey())
103+
.getVertexMetrics()
104+
.get(vertex)
105+
.get(ScalingMetric.TARGET_DATA_RATE)
106+
: 0.;
107+
}
108+
}

0 commit comments

Comments
 (0)