Skip to content

Commit b26a0f5

Browse files
committed
[FLINK-36129][autoscaler] Autoscaler is compatible with Flink 1.20
1 parent 024b70b commit b26a0f5

File tree

9 files changed

+495
-24
lines changed

9 files changed

+495
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.messages.job.metrics;
20+
21+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
22+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
23+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
25+
26+
import javax.annotation.Nullable;
27+
28+
import static java.util.Objects.requireNonNull;
29+
30+
/**
31+
* TODO : It can be removed after upgrading flink dependency to 1.20.
32+
*
33+
* <p>Response type for aggregated metrics. Contains the metric name and optionally the sum,
34+
* average, minimum and maximum.
35+
*/
36+
public class AggregatedMetric {
37+
38+
private static final String FIELD_NAME_ID = "id";
39+
40+
private static final String FIELD_NAME_MIN = "min";
41+
42+
private static final String FIELD_NAME_MAX = "max";
43+
44+
private static final String FIELD_NAME_AVG = "avg";
45+
46+
private static final String FIELD_NAME_SUM = "sum";
47+
48+
private static final String FIELD_NAME_SKEW = "skew";
49+
50+
@JsonProperty(value = FIELD_NAME_ID, required = true)
51+
private final String id;
52+
53+
@JsonInclude(JsonInclude.Include.NON_NULL)
54+
@JsonProperty(FIELD_NAME_MIN)
55+
private final Double min;
56+
57+
@JsonInclude(JsonInclude.Include.NON_NULL)
58+
@JsonProperty(FIELD_NAME_MAX)
59+
private final Double max;
60+
61+
@JsonInclude(JsonInclude.Include.NON_NULL)
62+
@JsonProperty(FIELD_NAME_AVG)
63+
private final Double avg;
64+
65+
@JsonInclude(JsonInclude.Include.NON_NULL)
66+
@JsonProperty(FIELD_NAME_SUM)
67+
private final Double sum;
68+
69+
@JsonInclude(JsonInclude.Include.NON_NULL)
70+
@JsonProperty(FIELD_NAME_SKEW)
71+
private final Double skew;
72+
73+
@JsonCreator
74+
public AggregatedMetric(
75+
final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
76+
final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
77+
final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
78+
final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
79+
final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum,
80+
final @Nullable @JsonProperty(FIELD_NAME_SKEW) Double skew) {
81+
82+
this.id = requireNonNull(id, "id must not be null");
83+
this.min = min;
84+
this.max = max;
85+
this.avg = avg;
86+
this.sum = sum;
87+
this.skew = skew;
88+
}
89+
90+
public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, required = true) String id) {
91+
this(id, null, null, null, null, null);
92+
}
93+
94+
@JsonIgnore
95+
public String getId() {
96+
return id;
97+
}
98+
99+
@JsonIgnore
100+
public Double getMin() {
101+
return min;
102+
}
103+
104+
@JsonIgnore
105+
public Double getMax() {
106+
return max;
107+
}
108+
109+
@JsonIgnore
110+
public Double getSum() {
111+
return sum;
112+
}
113+
114+
@JsonIgnore
115+
public Double getAvg() {
116+
return avg;
117+
}
118+
119+
@JsonIgnore
120+
public Double getSkew() {
121+
return skew;
122+
}
123+
124+
@Override
125+
public String toString() {
126+
return "AggregatedMetric{"
127+
+ "id='"
128+
+ id
129+
+ '\''
130+
+ ", mim='"
131+
+ min
132+
+ '\''
133+
+ ", max='"
134+
+ max
135+
+ '\''
136+
+ ", avg='"
137+
+ avg
138+
+ '\''
139+
+ ", sum='"
140+
+ sum
141+
+ '\''
142+
+ ", skew='"
143+
+ skew
144+
+ '\''
145+
+ '}';
146+
}
147+
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,9 @@ private Map<FlinkMetric, AggregatedMetric> aggregateByFlinkMetric(
230230
null,
231231
m1.getSum() != null
232232
? m1.getSum() + m2.getSum()
233+
: null,
234+
m1.getSkew() != null
235+
? Math.max(m1.getSkew(), m2.getSkew())
233236
: null)));
234237
}
235238
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ public List<String> findAll(Collection<String> metrics) {
7474
}
7575

7676
private static AggregatedMetric zero() {
77-
return new AggregatedMetric("", 0., 0., 0., 0.);
77+
return new AggregatedMetric("", 0., 0., 0., 0., 0.);
7878
}
7979
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ private static double getNumRecordsInInternal(
230230
Double.NaN,
231231
Double.NaN,
232232
Double.NaN,
233-
(double) ioMetrics.getNumRecordsIn());
233+
(double) ioMetrics.getNumRecordsIn(),
234+
Double.NaN);
234235

235236
// 2. If the former is unavailable and the vertex contains a source operator, use the
236237
// corresponding source operator metric.
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.messages.job.metrics;
20+
21+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
22+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
23+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
25+
26+
import javax.annotation.Nullable;
27+
28+
import static java.util.Objects.requireNonNull;
29+
30+
/**
31+
* TODO : It can be removed after upgrading flink dependency to 1.20.
32+
*
33+
* <p>Response type for aggregated metrics. Contains the metric name and optionally the sum,
34+
* average, minimum and maximum.
35+
*/
36+
public class AggregatedMetric {
37+
38+
private static final String FIELD_NAME_ID = "id";
39+
40+
private static final String FIELD_NAME_MIN = "min";
41+
42+
private static final String FIELD_NAME_MAX = "max";
43+
44+
private static final String FIELD_NAME_AVG = "avg";
45+
46+
private static final String FIELD_NAME_SUM = "sum";
47+
48+
private static final String FIELD_NAME_SKEW = "skew";
49+
50+
@JsonProperty(value = FIELD_NAME_ID, required = true)
51+
private final String id;
52+
53+
@JsonInclude(JsonInclude.Include.NON_NULL)
54+
@JsonProperty(FIELD_NAME_MIN)
55+
private final Double min;
56+
57+
@JsonInclude(JsonInclude.Include.NON_NULL)
58+
@JsonProperty(FIELD_NAME_MAX)
59+
private final Double max;
60+
61+
@JsonInclude(JsonInclude.Include.NON_NULL)
62+
@JsonProperty(FIELD_NAME_AVG)
63+
private final Double avg;
64+
65+
@JsonInclude(JsonInclude.Include.NON_NULL)
66+
@JsonProperty(FIELD_NAME_SUM)
67+
private final Double sum;
68+
69+
@JsonInclude(JsonInclude.Include.NON_NULL)
70+
@JsonProperty(FIELD_NAME_SKEW)
71+
private final Double skew;
72+
73+
@JsonCreator
74+
public AggregatedMetric(
75+
final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
76+
final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
77+
final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
78+
final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
79+
final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum,
80+
final @Nullable @JsonProperty(FIELD_NAME_SKEW) Double skew) {
81+
82+
this.id = requireNonNull(id, "id must not be null");
83+
this.min = min;
84+
this.max = max;
85+
this.avg = avg;
86+
this.sum = sum;
87+
this.skew = skew;
88+
}
89+
90+
public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, required = true) String id) {
91+
this(id, null, null, null, null, null);
92+
}
93+
94+
@JsonIgnore
95+
public String getId() {
96+
return id;
97+
}
98+
99+
@JsonIgnore
100+
public Double getMin() {
101+
return min;
102+
}
103+
104+
@JsonIgnore
105+
public Double getMax() {
106+
return max;
107+
}
108+
109+
@JsonIgnore
110+
public Double getSum() {
111+
return sum;
112+
}
113+
114+
@JsonIgnore
115+
public Double getAvg() {
116+
return avg;
117+
}
118+
119+
@JsonIgnore
120+
public Double getSkew() {
121+
return skew;
122+
}
123+
124+
@Override
125+
public String toString() {
126+
return "AggregatedMetric{"
127+
+ "id='"
128+
+ id
129+
+ '\''
130+
+ ", mim='"
131+
+ min
132+
+ '\''
133+
+ ", max='"
134+
+ max
135+
+ '\''
136+
+ ", avg='"
137+
+ avg
138+
+ '\''
139+
+ ", sum='"
140+
+ sum
141+
+ '\''
142+
+ ", skew='"
143+
+ skew
144+
+ '\''
145+
+ '}';
146+
}
147+
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,26 @@ public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Excepti
7777
var aggregatedMetricsResponse =
7878
List.of(
7979
new AggregatedMetric(
80-
"a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
80+
"a.pendingRecords",
81+
Double.NaN,
82+
Double.NaN,
83+
Double.NaN,
84+
100.,
85+
Double.NaN),
8186
new AggregatedMetric(
82-
"b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.),
87+
"b.pendingRecords",
88+
Double.NaN,
89+
Double.NaN,
90+
Double.NaN,
91+
100.,
92+
Double.NaN),
8393
new AggregatedMetric(
84-
"c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.));
94+
"c.unrelated",
95+
Double.NaN,
96+
Double.NaN,
97+
Double.NaN,
98+
100.,
99+
Double.NaN));
85100

86101
var conf = new Configuration();
87102
var restClusterClient =
@@ -233,10 +248,12 @@ CompletableFuture<P> sendRequest(M headers, U parameters, R request) {
233248
assertThrows(RuntimeException.class, () -> collector.queryTmMetrics(context));
234249

235250
// Test only heap metrics available
236-
var heapMax = new AggregatedMetric(HEAP_MAX_NAME, null, 100., null, null);
237-
var heapUsed = new AggregatedMetric(HEAP_USED_NAME, null, 50., null, null);
238-
var managedUsed = new AggregatedMetric(MANAGED_MEMORY_NAME, null, 42., null, null);
239-
var metaspaceUsed = new AggregatedMetric(METASPACE_MEMORY_NAME, null, 11., null, null);
251+
var heapMax = new AggregatedMetric(HEAP_MAX_NAME, null, 100., null, null, Double.NaN);
252+
var heapUsed = new AggregatedMetric(HEAP_USED_NAME, null, 50., null, null, Double.NaN);
253+
var managedUsed =
254+
new AggregatedMetric(MANAGED_MEMORY_NAME, null, 42., null, null, Double.NaN);
255+
var metaspaceUsed =
256+
new AggregatedMetric(METASPACE_MEMORY_NAME, null, 11., null, null, Double.NaN);
240257
metricValues.put(HEAP_MAX_NAME, heapMax);
241258
metricValues.put(HEAP_USED_NAME, heapUsed);
242259
metricValues.put(MANAGED_MEMORY_NAME, managedUsed);
@@ -256,7 +273,7 @@ CompletableFuture<P> sendRequest(M headers, U parameters, R request) {
256273
collector.cleanup(context.getJobKey());
257274

258275
// Test all metrics available
259-
var gcTime = new AggregatedMetric(GC_METRIC_NAME, null, 150., null, null);
276+
var gcTime = new AggregatedMetric(GC_METRIC_NAME, null, 150., null, null, Double.NaN);
260277
metricValues.put(GC_METRIC_NAME, gcTime);
261278

262279
assertMetricsEquals(

0 commit comments

Comments
 (0)