Skip to content

Commit 4539856

Browse files
committed
[FLINK-37253] Add state size in application status and deployment metrics
This adds state size, i.e. the size of the last completed checkpoint, to the deployment status. It also exposes the state size as a deployment metric.
1 parent 379c690 commit 4539856

File tree

9 files changed

+170
-19
lines changed

9 files changed

+170
-19
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
4949
private final Map<String, Map<String, Double>> deploymentCpuUsage = new ConcurrentHashMap<>();
5050
// map(namespace, map(deployment, memory))
5151
private final Map<String, Map<String, Long>> deploymentMemoryUsage = new ConcurrentHashMap<>();
52+
// map(namespace, map(deployment, stateSize))
53+
private final Map<String, Map<String, Long>> deploymentStateSize = new ConcurrentHashMap<>();
5254
public static final String FLINK_VERSION_GROUP_NAME = "FlinkVersion";
5355
public static final String FLINK_MINOR_VERSION_GROUP_NAME = "FlinkMinorVersion";
5456
public static final String UNKNOWN_VERSION = "UNKNOWN";
@@ -58,6 +60,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
5860
public static final String COUNTER_NAME = "Count";
5961
public static final String CPU_NAME = "Cpu";
6062
public static final String MEMORY_NAME = "Memory";
63+
public static final String STATE_SIZE = "StateSize";
6164

6265
public FlinkDeploymentMetrics(
6366
KubernetesOperatorMetricGroup parentMetricGroup, Configuration configuration) {
@@ -144,6 +147,18 @@ public void onUpdate(FlinkDeployment flinkApp) {
144147
NumberUtils.toLong(
145148
clusterInfo.getOrDefault(
146149
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));
150+
151+
long stateSize =
152+
NumberUtils.toLong(
153+
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "0"));
154+
deploymentStateSize
155+
.computeIfAbsent(
156+
namespace,
157+
ns -> {
158+
initNamespaceStateSize(ns);
159+
return new ConcurrentHashMap<>();
160+
})
161+
.put(deploymentName, stateSize > 0 ? stateSize : 0);
147162
}
148163

149164
public void onRemove(FlinkDeployment flinkApp) {
@@ -168,6 +183,9 @@ public void onRemove(FlinkDeployment flinkApp) {
168183
if (deploymentMemoryUsage.containsKey(namespace)) {
169184
deploymentMemoryUsage.get(namespace).remove(name);
170185
}
186+
if (deploymentStateSize.containsKey(namespace)) {
187+
deploymentStateSize.get(namespace).remove(name);
188+
}
171189
}
172190

173191
private void initNamespaceDeploymentCounts(String ns) {
@@ -231,6 +249,15 @@ private void initNamespaceMemoryUsage(String ns) {
231249
.reduce(0L, Long::sum));
232250
}
233251

252+
private void initNamespaceStateSize(String ns) {
253+
parentMetricGroup
254+
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
255+
.addGroup(RESOURCE_USAGE_GROUP_NAME)
256+
.gauge(
257+
STATE_SIZE,
258+
() -> deploymentStateSize.get(ns).values().stream().reduce(0L, Long::sum));
259+
}
260+
234261
private Map<JobManagerDeploymentStatus, Set<String>> createDeploymentStatusMap() {
235262
Map<JobManagerDeploymentStatus, Set<String>> statuses = new ConcurrentHashMap<>();
236263
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ private void observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
8282
var flinkApp = ctx.getResource();
8383
try {
8484
Map<String, String> clusterInfo =
85-
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
85+
ctx.getFlinkService()
86+
.getClusterInfo(
87+
ctx.getObserveConfig(),
88+
flinkApp.getStatus().getJobStatus().getJobId());
8689
flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);
8790
logger.debug("ClusterInfo: {}", flinkApp.getStatus().getClusterInfo());
8891
} catch (Exception e) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
3636
// Check if session cluster can serve rest calls following our practice in JobObserver
3737
try {
3838
logger.debug("Observing session cluster");
39-
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
39+
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig(), null);
4040
var rs = ctx.getResource().getStatus().getReconciliationStatus();
4141
if (rs.getState() == ReconciliationState.DEPLOYED) {
4242
rs.markReconciledSpecAsStable();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
7474
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
7575
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
76+
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
77+
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
7678
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
7779
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
7880
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -161,6 +163,7 @@ public abstract class AbstractFlinkService implements FlinkService {
161163
private static final String EMPTY_JAR_FILENAME = "empty.jar";
162164
public static final String FIELD_NAME_TOTAL_CPU = "total-cpu";
163165
public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory";
166+
public static final String FIELD_NAME_STATE_SIZE = "state-size";
164167

165168
protected final KubernetesClient kubernetesClient;
166169
protected final ExecutorService executorService;
@@ -735,9 +738,27 @@ public CheckpointStatsResult fetchCheckpointStats(
735738
}
736739

737740
@Override
738-
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
741+
public Map<String, String> getClusterInfo(Configuration conf, @Nullable String jobId)
742+
throws Exception {
739743
Map<String, String> clusterInfo = new HashMap<>();
740744

745+
populateFlinkVersion(conf, clusterInfo);
746+
747+
var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size();
748+
clusterInfo.put(
749+
FIELD_NAME_TOTAL_CPU,
750+
String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas)));
751+
clusterInfo.put(
752+
FIELD_NAME_TOTAL_MEMORY,
753+
String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas)));
754+
755+
populateStateSize(conf, jobId, clusterInfo);
756+
757+
return clusterInfo;
758+
}
759+
760+
private void populateFlinkVersion(Configuration conf, Map<String, String> clusterInfo)
761+
throws Exception {
741762
try (var clusterClient = getClusterClient(conf)) {
742763

743764
CustomDashboardConfiguration dashboardConfiguration =
@@ -757,16 +778,35 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
757778
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
758779
dashboardConfiguration.getFlinkRevision());
759780
}
781+
}
760782

761-
var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size();
762-
clusterInfo.put(
763-
FIELD_NAME_TOTAL_CPU,
764-
String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas)));
765-
clusterInfo.put(
766-
FIELD_NAME_TOTAL_MEMORY,
767-
String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas)));
783+
private void populateStateSize(
784+
Configuration conf, @Nullable String jobId, Map<String, String> clusterInfo)
785+
throws Exception {
786+
if (jobId != null) {
787+
try (RestClusterClient<String> clusterClient = getClusterClient(conf)) {
788+
var checkpointingStatisticsHeaders = CheckpointingStatisticsHeaders.getInstance();
789+
var parameters = checkpointingStatisticsHeaders.getUnresolvedMessageParameters();
790+
parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
768791

769-
return clusterInfo;
792+
CheckpointingStatistics checkpointingStatistics =
793+
clusterClient
794+
.sendRequest(
795+
checkpointingStatisticsHeaders,
796+
parameters,
797+
EmptyRequestBody.getInstance())
798+
.get();
799+
CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics =
800+
checkpointingStatistics
801+
.getLatestCheckpoints()
802+
.getCompletedCheckpointStatistics();
803+
if (completedCheckpointStatistics != null) {
804+
clusterInfo.put(
805+
FIELD_NAME_STATE_SIZE,
806+
String.valueOf(completedCheckpointStatistics.getCheckpointedSize()));
807+
}
808+
}
809+
}
770810
}
771811

772812
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ String triggerCheckpoint(
115115

116116
void disposeSavepoint(String savepointPath, Configuration conf) throws Exception;
117117

118-
Map<String, String> getClusterInfo(Configuration conf) throws Exception;
118+
Map<String, String> getClusterInfo(Configuration conf, @Nullable String jobId) throws Exception;
119119

120120
PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
121121

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,28 @@ public static Long calculateClusterMemoryUsage(Configuration conf, int taskManag
403403
return tmTotalMemory + jmTotalMemory;
404404
}
405405

406+
public static Long calculateClusterStateSize(Configuration conf, int taskManagerReplicas) {
407+
var clusterSpec = new KubernetesClusterClientFactory().getClusterSpecification(conf);
408+
409+
var jmParameters = new KubernetesJobManagerParameters(conf, clusterSpec);
410+
var jmTotalMemory =
411+
Math.round(
412+
jmParameters.getJobManagerMemoryMB()
413+
* Math.pow(1024, 2)
414+
* jmParameters.getJobManagerMemoryLimitFactor()
415+
* jmParameters.getReplicas());
416+
417+
var tmTotalMemory =
418+
Math.round(
419+
clusterSpec.getTaskManagerMemoryMB()
420+
* Math.pow(1024, 2)
421+
* conf.getDouble(
422+
KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR)
423+
* taskManagerReplicas);
424+
425+
return tmTotalMemory + jmTotalMemory;
426+
}
427+
406428
public static void setGenerationAnnotation(Configuration conf, Long generation) {
407429
if (generation == null) {
408430
return;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,8 @@ public void markApplicationJobFailedWithError(JobID jobID, String error) throws
648648
}
649649

650650
@Override
651-
public Map<String, String> getClusterInfo(Configuration conf) throws TimeoutException {
651+
public Map<String, String> getClusterInfo(Configuration conf, String jobId)
652+
throws TimeoutException {
652653
if (!isPortReady) {
653654
throw new TimeoutException("JM port is unavailable");
654655
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_VERSION_GROUP_NAME;
3939
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME;
4040
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME;
41+
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATE_SIZE;
4142
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
4243
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
4344
import static org.assertj.core.api.Assertions.assertThat;
@@ -339,50 +340,63 @@ public void testResourceMetrics() {
339340
.putAll(
340341
Map.of(
341342
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5",
342-
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024"));
343+
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024",
344+
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "4096"));
343345

344346
deployment2
345347
.getStatus()
346348
.getClusterInfo()
347349
.putAll(
348350
Map.of(
349351
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "10",
350-
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048"));
352+
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048",
353+
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "8192"));
351354

352355
deployment3
353356
.getStatus()
354357
.getClusterInfo()
355358
.putAll(
356359
Map.of(
357360
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "13",
358-
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096"));
361+
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096",
362+
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "16384"));
359363

360364
var cpuGroupId1 =
361365
listener.getNamespaceMetricId(
362366
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
363367
var memoryGroupId1 =
364368
listener.getNamespaceMetricId(
365369
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
370+
var stateSizeGroupId1 =
371+
listener.getNamespaceMetricId(
372+
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE);
366373
var cpuGroupId2 =
367374
listener.getNamespaceMetricId(
368375
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
369376
var memoryGroupId2 =
370377
listener.getNamespaceMetricId(
371378
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
379+
var stateSizeGroupId2 =
380+
listener.getNamespaceMetricId(
381+
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE);
372382

373383
assertTrue(listener.getGauge(cpuGroupId1).isEmpty());
374384
assertTrue(listener.getGauge(memoryGroupId1).isEmpty());
385+
assertTrue(listener.getGauge(stateSizeGroupId1).isEmpty());
375386
assertTrue(listener.getGauge(cpuGroupId2).isEmpty());
376387
assertTrue(listener.getGauge(memoryGroupId2).isEmpty());
388+
assertTrue(listener.getGauge(stateSizeGroupId2).isEmpty());
377389

378390
metricManager.onUpdate(deployment1);
379391
metricManager.onUpdate(deployment2);
380392
metricManager.onUpdate(deployment3);
381393

382394
assertEquals(15D, listener.getGauge(cpuGroupId1).get().getValue());
383395
assertEquals(3072L, listener.getGauge(memoryGroupId1).get().getValue());
396+
assertEquals(12288L, listener.getGauge(stateSizeGroupId1).get().getValue());
384397
assertEquals(13D, listener.getGauge(cpuGroupId2).get().getValue());
385398
assertEquals(4096L, listener.getGauge(memoryGroupId2).get().getValue());
399+
assertEquals(16384L, listener.getGauge(stateSizeGroupId2).get().getValue());
386400
}
387401

388402
@Test

0 commit comments

Comments
 (0)