Skip to content

Commit 84d9b74

Browse files
authored
[FLINK-35183] MinorVersion metric for tracking applications
1 parent 04829bf commit 84d9b74

File tree

2 files changed

+77
-17
lines changed

2 files changed

+77
-17
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,17 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
4242
// map(namespace, map(version, set(deployment)))
4343
private final Map<String, Map<String, Set<String>>> deploymentFlinkVersions =
4444
new ConcurrentHashMap<>();
45+
// map(namespace, map(version, set(deployment)))
46+
private final Map<String, Map<String, Set<String>>> deploymentFlinkMinorVersions =
47+
new ConcurrentHashMap<>();
4548
// map(namespace, map(deployment, cpu))
4649
private final Map<String, Map<String, Double>> deploymentCpuUsage = new ConcurrentHashMap<>();
4750
// map(namespace, map(deployment, memory))
4851
private final Map<String, Map<String, Long>> deploymentMemoryUsage = new ConcurrentHashMap<>();
4952
public static final String FLINK_VERSION_GROUP_NAME = "FlinkVersion";
53+
public static final String FLINK_MINOR_VERSION_GROUP_NAME = "FlinkMinorVersion";
54+
public static final String UNKNOWN_VERSION = "UNKNOWN";
55+
public static final String MALFORMED_MINOR_VERSION = "MALFORMED";
5056
public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
5157
public static final String RESOURCE_USAGE_GROUP_NAME = "ResourceUsage";
5258
public static final String COUNTER_NAME = "Count";
@@ -77,12 +83,13 @@ public void onUpdate(FlinkDeployment flinkApp) {
7783
.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
7884
.add(deploymentName);
7985

86+
// Full runtime version queried from the JobManager REST API
8087
var flinkVersion =
8188
flinkApp.getStatus()
8289
.getClusterInfo()
8390
.getOrDefault(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "");
8491
if (StringUtils.isNullOrWhitespaceOnly(flinkVersion)) {
85-
flinkVersion = "UNKNOWN";
92+
flinkVersion = UNKNOWN_VERSION;
8693
}
8794
deploymentFlinkVersions
8895
.computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
@@ -94,6 +101,22 @@ public void onUpdate(FlinkDeployment flinkApp) {
94101
})
95102
.add(deploymentName);
96103

104+
// Minor version computed from the above
105+
var subVersions = flinkVersion.split("\\.");
106+
String minorVersion = MALFORMED_MINOR_VERSION;
107+
if (subVersions.length >= 2) {
108+
minorVersion = subVersions[0].concat(".").concat(subVersions[1]);
109+
}
110+
deploymentFlinkMinorVersions
111+
.computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
112+
.computeIfAbsent(
113+
minorVersion,
114+
v -> {
115+
initFlinkMinorVersions(namespace, v);
116+
return ConcurrentHashMap.newKeySet();
117+
})
118+
.add(deploymentName);
119+
97120
var totalCpu =
98121
NumberUtils.toDouble(
99122
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"));
@@ -133,6 +156,12 @@ public void onRemove(FlinkDeployment flinkApp) {
133156
if (deploymentFlinkVersions.containsKey(namespace)) {
134157
deploymentFlinkVersions.get(namespace).values().forEach(names -> names.remove(name));
135158
}
159+
if (deploymentFlinkMinorVersions.containsKey(namespace)) {
160+
deploymentFlinkMinorVersions
161+
.get(namespace)
162+
.values()
163+
.forEach(names -> names.remove(name));
164+
}
136165
if (deploymentCpuUsage.containsKey(namespace)) {
137166
deploymentCpuUsage.get(namespace).remove(name);
138167
}
@@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) {
165194
private void initFlinkVersions(String ns, String flinkVersion) {
166195
parentMetricGroup
167196
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
168-
.addGroup(FLINK_VERSION_GROUP_NAME)
169-
.addGroup(flinkVersion)
197+
.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion)
170198
.gauge(
171199
COUNTER_NAME,
172200
() -> deploymentFlinkVersions.get(ns).get(flinkVersion).size());
173201
}
174202

203+
private void initFlinkMinorVersions(String ns, String minorVersion) {
204+
parentMetricGroup
205+
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
206+
.addGroup(FLINK_MINOR_VERSION_GROUP_NAME, minorVersion)
207+
.gauge(
208+
COUNTER_NAME,
209+
() -> deploymentFlinkMinorVersions.get(ns).get(minorVersion).size());
210+
}
211+
175212
private void initNamespaceCpuUsage(String ns) {
176213
parentMetricGroup
177214
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
3636
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.CPU_NAME;
37+
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_MINOR_VERSION_GROUP_NAME;
3738
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_VERSION_GROUP_NAME;
3839
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME;
3940
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME;
@@ -226,13 +227,14 @@ public void testMetricsMultiNamespace() {
226227
public void testFlinkVersionMetrics() {
227228
Map<String, String> ns1Values = new HashMap<>();
228229
ns1Values.put("deployment1", " ");
229-
ns1Values.put("deployment2", "1.14");
230-
ns1Values.put("deployment3", "1.14");
231-
ns1Values.put("deployment4", "1.15");
232-
ns1Values.put("deployment5", "1.15");
233-
ns1Values.put("deployment6", "1.16");
234-
ns1Values.put("deployment7", "1.17");
235-
ns1Values.put("deployment8", "1.14");
230+
ns1Values.put("deployment2", "1.14.0");
231+
ns1Values.put("deployment3", "1.14.0");
232+
ns1Values.put("deployment4", "1.15.1");
233+
ns1Values.put("deployment5", "1.15.1");
234+
ns1Values.put("deployment6", "1.16.0");
235+
ns1Values.put("deployment7", "1.17.1");
236+
ns1Values.put("deployment8", "1.14.1");
237+
ns1Values.put("deployment9", "test");
236238

237239
Map<String, String> ns2Values = new HashMap<>();
238240
ns2Values.put("deployment1", "");
@@ -248,20 +250,41 @@ public void testFlinkVersionMetrics() {
248250
var namespaceVersions = Map.of("ns1", ns1Values, "ns2", ns2Values);
249251
var expected =
250252
Map.of(
251-
"ns1", Map.of("UNKNOWN", 1, "1.14", 3, "1.15", 2, "1.16", 1, "1.17", 1),
253+
"ns1",
254+
Map.of(
255+
"UNKNOWN", 1, "1.14.0", 2, "1.14.1", 1, "1.15.1", 2,
256+
"1.16.0", 1, "1.17.1", 1, "test", 1),
252257
"ns2", Map.of("UNKNOWN", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
253-
updateFlinkVersionsAndAssert(namespaceVersions, expected);
258+
updateFlinkVersionsAndAssert(FLINK_VERSION_GROUP_NAME, namespaceVersions, expected);
254259

255-
// Remove invalid version and insert 1.14
256-
namespaceVersions.get("ns1").put("deployment1", "1.14");
260+
var expectedMinors =
261+
Map.of(
262+
"ns1", Map.of("MALFORMED", 2, "1.14", 3, "1.15", 2, "1.16", 1, "1.17", 1),
263+
"ns2", Map.of("MALFORMED", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
264+
updateFlinkVersionsAndAssert(
265+
FLINK_MINOR_VERSION_GROUP_NAME, namespaceVersions, expectedMinors);
266+
267+
// Remove invalid version and insert 1.14.1
268+
namespaceVersions.get("ns1").put("deployment1", "1.14.1");
257269
expected =
258270
Map.of(
259-
"ns1", Map.of("UNKNOWN", 0, "1.14", 4, "1.15", 2, "1.16", 1, "1.17", 1),
271+
"ns1",
272+
Map.of(
273+
"1.14.0", 2, "1.14.1", 2, "1.15.1", 2, "1.16.0", 1,
274+
"1.17.1", 1, "test", 1),
260275
"ns2", Map.of("UNKNOWN", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
261-
updateFlinkVersionsAndAssert(namespaceVersions, expected);
276+
updateFlinkVersionsAndAssert(FLINK_VERSION_GROUP_NAME, namespaceVersions, expected);
277+
278+
expectedMinors =
279+
Map.of(
280+
"ns1", Map.of("MALFORMED", 1, "1.14", 4, "1.15", 2, "1.16", 1, "1.17", 1),
281+
"ns2", Map.of("MALFORMED", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
282+
updateFlinkVersionsAndAssert(
283+
FLINK_MINOR_VERSION_GROUP_NAME, namespaceVersions, expectedMinors);
262284
}
263285

264286
private void updateFlinkVersionsAndAssert(
287+
String metricGroup,
265288
Map<String, Map<String, String>> namespaceVersions,
266289
Map<String, Map<String, Integer>> expected) {
267290
for (var namespaceEntry : namespaceVersions.entrySet()) {
@@ -291,7 +314,7 @@ private void updateFlinkVersionsAndAssert(
291314
listener.getNamespaceMetricId(
292315
FlinkDeployment.class,
293316
namespaceName,
294-
FLINK_VERSION_GROUP_NAME,
317+
metricGroup,
295318
version,
296319
COUNTER_NAME);
297320

0 commit comments

Comments
 (0)