Skip to content

Commit b7d6f9d

Browse files
authored
[FLINK-37253] Add state size in application status and deployment metrics (#941)
This adds state size, i.e. the size of the last completed checkpoint, to the deployment status. It also exposes the state size as a deployment metric.
1 parent 67aa78b commit b7d6f9d

File tree

9 files changed

+829
-19
lines changed

9 files changed

+829
-19
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
4949
private final Map<String, Map<String, Double>> deploymentCpuUsage = new ConcurrentHashMap<>();
5050
// map(namespace, map(deployment, memory))
5151
private final Map<String, Map<String, Long>> deploymentMemoryUsage = new ConcurrentHashMap<>();
52+
// map(namespace, map(deployment, stateSize))
53+
private final Map<String, Map<String, Long>> deploymentStateSize = new ConcurrentHashMap<>();
5254
public static final String FLINK_VERSION_GROUP_NAME = "FlinkVersion";
5355
public static final String FLINK_MINOR_VERSION_GROUP_NAME = "FlinkMinorVersion";
5456
public static final String UNKNOWN_VERSION = "UNKNOWN";
@@ -58,6 +60,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
5860
public static final String COUNTER_NAME = "Count";
5961
public static final String CPU_NAME = "Cpu";
6062
public static final String MEMORY_NAME = "Memory";
63+
public static final String STATE_SIZE = "StateSize";
6164

6265
public FlinkDeploymentMetrics(
6366
KubernetesOperatorMetricGroup parentMetricGroup, Configuration configuration) {
@@ -144,6 +147,18 @@ public void onUpdate(FlinkDeployment flinkApp) {
144147
NumberUtils.toLong(
145148
clusterInfo.getOrDefault(
146149
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));
150+
151+
long stateSize =
152+
NumberUtils.toLong(
153+
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "0"));
154+
deploymentStateSize
155+
.computeIfAbsent(
156+
namespace,
157+
ns -> {
158+
initNamespaceStateSize(ns);
159+
return new ConcurrentHashMap<>();
160+
})
161+
.put(deploymentName, stateSize > 0 ? stateSize : 0);
147162
}
148163

149164
public void onRemove(FlinkDeployment flinkApp) {
@@ -168,6 +183,9 @@ public void onRemove(FlinkDeployment flinkApp) {
168183
if (deploymentMemoryUsage.containsKey(namespace)) {
169184
deploymentMemoryUsage.get(namespace).remove(name);
170185
}
186+
if (deploymentStateSize.containsKey(namespace)) {
187+
deploymentStateSize.get(namespace).remove(name);
188+
}
171189
}
172190

173191
private void initNamespaceDeploymentCounts(String ns) {
@@ -231,6 +249,15 @@ private void initNamespaceMemoryUsage(String ns) {
231249
.reduce(0L, Long::sum));
232250
}
233251

252+
private void initNamespaceStateSize(String ns) {
253+
parentMetricGroup
254+
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
255+
.addGroup(RESOURCE_USAGE_GROUP_NAME)
256+
.gauge(
257+
STATE_SIZE,
258+
() -> deploymentStateSize.get(ns).values().stream().reduce(0L, Long::sum));
259+
}
260+
234261
private Map<JobManagerDeploymentStatus, Set<String>> createDeploymentStatusMap() {
235262
Map<JobManagerDeploymentStatus, Set<String>> statuses = new ConcurrentHashMap<>();
236263
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ private void observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
8282
var flinkApp = ctx.getResource();
8383
try {
8484
Map<String, String> clusterInfo =
85-
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
85+
ctx.getFlinkService()
86+
.getClusterInfo(
87+
ctx.getObserveConfig(),
88+
flinkApp.getStatus().getJobStatus().getJobId());
8689
flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);
8790
logger.debug("ClusterInfo: {}", flinkApp.getStatus().getClusterInfo());
8891
} catch (Exception e) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
3636
// Check if session cluster can serve rest calls following our practice in JobObserver
3737
try {
3838
logger.debug("Observing session cluster");
39-
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
39+
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig(), null);
4040
var rs = ctx.getResource().getStatus().getReconciliationStatus();
4141
if (rs.getState() == ReconciliationState.DEPLOYED) {
4242
rs.markReconciledSpecAsStable();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
7474
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
7575
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
76+
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
77+
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
7678
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
7779
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
7880
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -161,6 +163,7 @@ public abstract class AbstractFlinkService implements FlinkService {
161163
private static final String EMPTY_JAR_FILENAME = "empty.jar";
162164
public static final String FIELD_NAME_TOTAL_CPU = "total-cpu";
163165
public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory";
166+
public static final String FIELD_NAME_STATE_SIZE = "state-size";
164167

165168
protected final KubernetesClient kubernetesClient;
166169
protected final ExecutorService executorService;
@@ -735,9 +738,27 @@ public CheckpointStatsResult fetchCheckpointStats(
735738
}
736739

737740
@Override
738-
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
741+
public Map<String, String> getClusterInfo(Configuration conf, @Nullable String jobId)
742+
throws Exception {
739743
Map<String, String> clusterInfo = new HashMap<>();
740744

745+
populateFlinkVersion(conf, clusterInfo);
746+
747+
var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size();
748+
clusterInfo.put(
749+
FIELD_NAME_TOTAL_CPU,
750+
String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas)));
751+
clusterInfo.put(
752+
FIELD_NAME_TOTAL_MEMORY,
753+
String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas)));
754+
755+
populateStateSize(conf, jobId, clusterInfo);
756+
757+
return clusterInfo;
758+
}
759+
760+
private void populateFlinkVersion(Configuration conf, Map<String, String> clusterInfo)
761+
throws Exception {
741762
try (var clusterClient = getClusterClient(conf)) {
742763

743764
CustomDashboardConfiguration dashboardConfiguration =
@@ -757,16 +778,35 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
757778
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
758779
dashboardConfiguration.getFlinkRevision());
759780
}
781+
}
760782

761-
var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size();
762-
clusterInfo.put(
763-
FIELD_NAME_TOTAL_CPU,
764-
String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas)));
765-
clusterInfo.put(
766-
FIELD_NAME_TOTAL_MEMORY,
767-
String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas)));
783+
private void populateStateSize(
784+
Configuration conf, @Nullable String jobId, Map<String, String> clusterInfo)
785+
throws Exception {
786+
if (jobId != null) {
787+
try (RestClusterClient<String> clusterClient = getClusterClient(conf)) {
788+
var checkpointingStatisticsHeaders = CheckpointingStatisticsHeaders.getInstance();
789+
var parameters = checkpointingStatisticsHeaders.getUnresolvedMessageParameters();
790+
parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
768791

769-
return clusterInfo;
792+
CheckpointingStatistics checkpointingStatistics =
793+
clusterClient
794+
.sendRequest(
795+
checkpointingStatisticsHeaders,
796+
parameters,
797+
EmptyRequestBody.getInstance())
798+
.get();
799+
CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics =
800+
checkpointingStatistics
801+
.getLatestCheckpoints()
802+
.getCompletedCheckpointStatistics();
803+
if (completedCheckpointStatistics != null) {
804+
clusterInfo.put(
805+
FIELD_NAME_STATE_SIZE,
806+
String.valueOf(completedCheckpointStatistics.getStateSize()));
807+
}
808+
}
809+
}
770810
}
771811

772812
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ String triggerCheckpoint(
115115

116116
void disposeSavepoint(String savepointPath, Configuration conf) throws Exception;
117117

118-
Map<String, String> getClusterInfo(Configuration conf) throws Exception;
118+
Map<String, String> getClusterInfo(Configuration conf, @Nullable String jobId) throws Exception;
119119

120120
PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
121121

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,8 @@ public void markApplicationJobFailedWithError(JobID jobID, String error) throws
648648
}
649649

650650
@Override
651-
public Map<String, String> getClusterInfo(Configuration conf) throws TimeoutException {
651+
public Map<String, String> getClusterInfo(Configuration conf, String jobId)
652+
throws TimeoutException {
652653
if (!isPortReady) {
653654
throw new TimeoutException("JM port is unavailable");
654655
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_VERSION_GROUP_NAME;
3939
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME;
4040
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME;
41+
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATE_SIZE;
4142
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
4243
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
4344
import static org.assertj.core.api.Assertions.assertThat;
@@ -339,50 +340,63 @@ public void testResourceMetrics() {
339340
.putAll(
340341
Map.of(
341342
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5",
342-
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024"));
343+
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024",
344+
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "4096"));
343345

344346
deployment2
345347
.getStatus()
346348
.getClusterInfo()
347349
.putAll(
348350
Map.of(
349351
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "10",
350-
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048"));
352+
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048",
353+
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "8192"));
351354

352355
deployment3
353356
.getStatus()
354357
.getClusterInfo()
355358
.putAll(
356359
Map.of(
357360
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "13",
358-
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096"));
361+
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096",
362+
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "16384"));
359363

360364
var cpuGroupId1 =
361365
listener.getNamespaceMetricId(
362366
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
363367
var memoryGroupId1 =
364368
listener.getNamespaceMetricId(
365369
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
370+
var stateSizeGroupId1 =
371+
listener.getNamespaceMetricId(
372+
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE);
366373
var cpuGroupId2 =
367374
listener.getNamespaceMetricId(
368375
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
369376
var memoryGroupId2 =
370377
listener.getNamespaceMetricId(
371378
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
379+
var stateSizeGroupId2 =
380+
listener.getNamespaceMetricId(
381+
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE);
372382

373383
assertTrue(listener.getGauge(cpuGroupId1).isEmpty());
374384
assertTrue(listener.getGauge(memoryGroupId1).isEmpty());
385+
assertTrue(listener.getGauge(stateSizeGroupId1).isEmpty());
375386
assertTrue(listener.getGauge(cpuGroupId2).isEmpty());
376387
assertTrue(listener.getGauge(memoryGroupId2).isEmpty());
388+
assertTrue(listener.getGauge(stateSizeGroupId2).isEmpty());
377389

378390
metricManager.onUpdate(deployment1);
379391
metricManager.onUpdate(deployment2);
380392
metricManager.onUpdate(deployment3);
381393

382394
assertEquals(15D, listener.getGauge(cpuGroupId1).get().getValue());
383395
assertEquals(3072L, listener.getGauge(memoryGroupId1).get().getValue());
396+
assertEquals(12288L, listener.getGauge(stateSizeGroupId1).get().getValue());
384397
assertEquals(13D, listener.getGauge(cpuGroupId2).get().getValue());
385398
assertEquals(4096L, listener.getGauge(memoryGroupId2).get().getValue());
399+
assertEquals(16384L, listener.getGauge(stateSizeGroupId2).get().getValue());
386400
}
387401

388402
@Test

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
5151
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
5252
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
53+
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
5354
import org.apache.flink.runtime.clusterframework.types.ResourceID;
5455
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
5556
import org.apache.flink.runtime.instance.HardwareDescription;
@@ -64,8 +65,11 @@
6465
import org.apache.flink.runtime.rest.messages.ResponseBody;
6566
import org.apache.flink.runtime.rest.messages.TriggerId;
6667
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
68+
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
6769
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
6870
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
71+
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
72+
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
6973
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
7074
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
7175
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
@@ -114,6 +118,7 @@
114118
import org.junit.jupiter.params.ParameterizedTest;
115119
import org.junit.jupiter.params.provider.MethodSource;
116120
import org.junit.jupiter.params.provider.ValueSource;
121+
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
117122

118123
import java.io.File;
119124
import java.io.IOException;
@@ -988,13 +993,44 @@ public void getClusterInfoTest() throws Exception {
988993
null);
989994
var tmsInfo = new TaskManagersInfo(List.of(tmInfo));
990995

996+
var checkpointingStatistics =
997+
new CheckpointingStatistics(
998+
new CheckpointingStatistics.Counts(-1, -1, -1, -1, -1),
999+
new CheckpointingStatistics.Summary(null, null, null, null, null, null),
1000+
new CheckpointingStatistics.LatestCheckpoints(
1001+
new CheckpointStatistics.CompletedCheckpointStatistics(
1002+
42,
1003+
CheckpointStatsStatus.COMPLETED,
1004+
false,
1005+
null,
1006+
123,
1007+
1234,
1008+
-1,
1009+
42424242,
1010+
-1,
1011+
-1,
1012+
-1,
1013+
-1,
1014+
0,
1015+
0,
1016+
CheckpointStatistics.RestAPICheckpointType.CHECKPOINT,
1017+
Map.of(),
1018+
"path",
1019+
false),
1020+
null,
1021+
null,
1022+
null),
1023+
List.of());
1024+
9911025
var flinkService =
9921026
getTestingService(
9931027
(h, p, r) -> {
9941028
if (h instanceof CustomDashboardConfigurationHeaders) {
9951029
return CompletableFuture.completedFuture(config);
9961030
} else if (h instanceof TaskManagersHeaders) {
9971031
return CompletableFuture.completedFuture(tmsInfo);
1032+
} else if (h instanceof CheckpointingStatisticsHeaders) {
1033+
return CompletableFuture.completedFuture(checkpointingStatistics);
9981034
}
9991035
fail("unknown request");
10001036
return null;
@@ -1004,7 +1040,7 @@ public void getClusterInfoTest() throws Exception {
10041040
conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000));
10051041
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000));
10061042

1007-
assertEquals(
1043+
Map<String, String> expectedEntries =
10081044
Map.of(
10091045
DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
10101046
testVersion,
@@ -1013,8 +1049,16 @@ public void getClusterInfoTest() throws Exception {
10131049
AbstractFlinkService.FIELD_NAME_TOTAL_CPU,
10141050
"2.0",
10151051
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY,
1016-
"" + MemorySize.ofMebiBytes(1000).getBytes() * 2),
1017-
flinkService.getClusterInfo(conf));
1052+
"" + MemorySize.ofMebiBytes(1000).getBytes() * 2);
1053+
1054+
assertEquals(expectedEntries, flinkService.getClusterInfo(conf, null));
1055+
1056+
assertEquals(
1057+
ImmutableMap.<String, String>builder()
1058+
.putAll(expectedEntries)
1059+
.put(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "42424242")
1060+
.build(),
1061+
flinkService.getClusterInfo(conf, JobID.generate().toHexString()));
10181062
}
10191063

10201064
@Test

0 commit comments

Comments
 (0)