diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java index 310bb75a5f..f8a9165ee4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java @@ -49,6 +49,8 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics> deploymentCpuUsage = new ConcurrentHashMap<>(); // map(namespace, map(deployment, memory)) private final Map> deploymentMemoryUsage = new ConcurrentHashMap<>(); + // map(namespace, map(deployment, stateSize)) + private final Map> deploymentStateSize = new ConcurrentHashMap<>(); public static final String FLINK_VERSION_GROUP_NAME = "FlinkVersion"; public static final String FLINK_MINOR_VERSION_GROUP_NAME = "FlinkMinorVersion"; public static final String UNKNOWN_VERSION = "UNKNOWN"; @@ -58,6 +60,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics { + initNamespaceStateSize(ns); + return new ConcurrentHashMap<>(); + }) + .put(deploymentName, stateSize > 0 ? stateSize : 0); } public void onRemove(FlinkDeployment flinkApp) { @@ -168,6 +183,9 @@ public void onRemove(FlinkDeployment flinkApp) { if (deploymentMemoryUsage.containsKey(namespace)) { deploymentMemoryUsage.get(namespace).remove(name); } + if (deploymentStateSize.containsKey(namespace)) { + deploymentStateSize.get(namespace).remove(name); + } } private void initNamespaceDeploymentCounts(String ns) { @@ -231,6 +249,15 @@ private void initNamespaceMemoryUsage(String ns) { .reduce(0L, Long::sum)); } + private void initNamespaceStateSize(String ns) { + parentMetricGroup + .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) + .addGroup(RESOURCE_USAGE_GROUP_NAME) + .gauge( + STATE_SIZE, + () -> deploymentStateSize.get(ns).values().stream().reduce(0L, Long::sum)); + } + private Map> createDeploymentStatusMap() { Map> statuses = new ConcurrentHashMap<>(); for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java index 9020cbfa8c..38d4b831c6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java @@ -82,7 +82,10 @@ private void observeClusterInfo(FlinkResourceContext ctx) { var flinkApp = ctx.getResource(); try { Map clusterInfo = - ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig()); + ctx.getFlinkService() + .getClusterInfo( + ctx.getObserveConfig(), + flinkApp.getStatus().getJobStatus().getJobId()); flinkApp.getStatus().getClusterInfo().putAll(clusterInfo); logger.debug("ClusterInfo: {}", flinkApp.getStatus().getClusterInfo()); } catch (Exception e) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java index 2d716bfd14..84f213e7f8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java @@ -36,7 +36,7 @@ public void observeFlinkCluster(FlinkResourceContext ctx) { // Check if session cluster can serve rest calls following our practice in JobObserver try { logger.debug("Observing session cluster"); - ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig()); + ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig(), null); var rs = ctx.getResource().getStatus().getReconciliationStatus(); if (rs.getState() == ReconciliationState.DEPLOYED) { rs.markReconciledSpecAsStable(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 68d7fe897f..6c9eb58929 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -73,6 +73,8 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; @@ -161,6 +163,7 @@ public abstract class AbstractFlinkService implements FlinkService { private static final String EMPTY_JAR_FILENAME = "empty.jar"; public static final String FIELD_NAME_TOTAL_CPU = "total-cpu"; public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory"; + public static final String FIELD_NAME_STATE_SIZE = "state-size"; protected final KubernetesClient kubernetesClient; protected final ExecutorService executorService; @@ -735,9 +738,27 @@ public CheckpointStatsResult fetchCheckpointStats( } @Override - public Map getClusterInfo(Configuration conf) throws Exception { + public Map getClusterInfo(Configuration conf, @Nullable String jobId) + throws Exception { Map clusterInfo = new HashMap<>(); + populateFlinkVersion(conf, clusterInfo); + + var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size(); + clusterInfo.put( + FIELD_NAME_TOTAL_CPU, + String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas))); + clusterInfo.put( + FIELD_NAME_TOTAL_MEMORY, + String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas))); + + populateStateSize(conf, jobId, clusterInfo); + + return clusterInfo; + } + + private void populateFlinkVersion(Configuration conf, Map clusterInfo) + throws Exception { try (var clusterClient = getClusterClient(conf)) { CustomDashboardConfiguration dashboardConfiguration = @@ -757,16 +778,35 @@ public Map getClusterInfo(Configuration conf) throws Exception { DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } + } - var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size(); - clusterInfo.put( - FIELD_NAME_TOTAL_CPU, - String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas))); - clusterInfo.put( - FIELD_NAME_TOTAL_MEMORY, - String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas))); + private void populateStateSize( + Configuration conf, @Nullable String jobId, Map clusterInfo) + throws Exception { + if (jobId != null) { + try (RestClusterClient clusterClient = getClusterClient(conf)) { + var checkpointingStatisticsHeaders = CheckpointingStatisticsHeaders.getInstance(); + var parameters = checkpointingStatisticsHeaders.getUnresolvedMessageParameters(); + parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); - return clusterInfo; + CheckpointingStatistics checkpointingStatistics = + clusterClient + .sendRequest( + checkpointingStatisticsHeaders, + parameters, + EmptyRequestBody.getInstance()) + .get(); + CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics = + checkpointingStatistics + .getLatestCheckpoints() + .getCompletedCheckpointStatistics(); + if (completedCheckpointStatistics != null) { + clusterInfo.put( + FIELD_NAME_STATE_SIZE, + String.valueOf(completedCheckpointStatistics.getStateSize())); + } + } + } } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index 7707e38156..168ea4c1f4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -115,7 +115,7 @@ String triggerCheckpoint( void disposeSavepoint(String savepointPath, Configuration conf) throws Exception; - Map getClusterInfo(Configuration conf) throws Exception; + Map getClusterInfo(Configuration conf, @Nullable String jobId) throws Exception; PodList getJmPodList(FlinkDeployment deployment, Configuration conf); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 7aef5ce583..42e3ca243a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -648,7 +648,8 @@ public void markApplicationJobFailedWithError(JobID jobID, String error) throws } @Override - public Map getClusterInfo(Configuration conf) throws TimeoutException { + public Map getClusterInfo(Configuration conf, String jobId) + throws TimeoutException { if (!isPortReady) { throw new TimeoutException("JM port is unavailable"); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java index 1776cad2e0..a1f4d922da 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java @@ -38,6 +38,7 @@ import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_VERSION_GROUP_NAME; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATE_SIZE; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME; import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED; import static org.assertj.core.api.Assertions.assertThat; @@ -339,7 +340,8 @@ public void testResourceMetrics() { .putAll( Map.of( AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5", - AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024")); + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024", + AbstractFlinkService.FIELD_NAME_STATE_SIZE, "4096")); deployment2 .getStatus() @@ -347,7 +349,8 @@ public void testResourceMetrics() { .putAll( Map.of( AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "10", - AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048")); + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048", + AbstractFlinkService.FIELD_NAME_STATE_SIZE, "8192")); deployment3 .getStatus() @@ -355,7 +358,8 @@ public void testResourceMetrics() { .putAll( Map.of( AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "13", - AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096")); + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096", + AbstractFlinkService.FIELD_NAME_STATE_SIZE, "16384")); var cpuGroupId1 = listener.getNamespaceMetricId( @@ -363,17 +367,25 @@ public void testResourceMetrics() { var memoryGroupId1 = listener.getNamespaceMetricId( FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME); + var stateSizeGroupId1 = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE); var cpuGroupId2 = listener.getNamespaceMetricId( FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, CPU_NAME); var memoryGroupId2 = listener.getNamespaceMetricId( FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME); + var stateSizeGroupId2 = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE); assertTrue(listener.getGauge(cpuGroupId1).isEmpty()); assertTrue(listener.getGauge(memoryGroupId1).isEmpty()); + assertTrue(listener.getGauge(stateSizeGroupId1).isEmpty()); assertTrue(listener.getGauge(cpuGroupId2).isEmpty()); assertTrue(listener.getGauge(memoryGroupId2).isEmpty()); + assertTrue(listener.getGauge(stateSizeGroupId2).isEmpty()); metricManager.onUpdate(deployment1); metricManager.onUpdate(deployment2); @@ -381,8 +393,10 @@ public void testResourceMetrics() { assertEquals(15D, listener.getGauge(cpuGroupId1).get().getValue()); assertEquals(3072L, listener.getGauge(memoryGroupId1).get().getValue()); + assertEquals(12288L, listener.getGauge(stateSizeGroupId1).get().getValue()); assertEquals(13D, listener.getGauge(cpuGroupId2).get().getValue()); assertEquals(4096L, listener.getGauge(memoryGroupId2).get().getValue()); + assertEquals(16384L, listener.getGauge(stateSizeGroupId2).get().getValue()); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index 336de17e5f..a26cbb461f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -50,6 +50,7 @@ import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult; import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.HardwareDescription; @@ -64,8 +65,11 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.TriggerId; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters; import org.apache.flink.runtime.rest.messages.job.metrics.Metric; import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; @@ -114,6 +118,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; @@ -988,6 +993,35 @@ public void getClusterInfoTest() throws Exception { null); var tmsInfo = new TaskManagersInfo(List.of(tmInfo)); + var checkpointingStatistics = + new CheckpointingStatistics( + new CheckpointingStatistics.Counts(-1, -1, -1, -1, -1), + new CheckpointingStatistics.Summary(null, null, null, null, null, null), + new CheckpointingStatistics.LatestCheckpoints( + new CheckpointStatistics.CompletedCheckpointStatistics( + 42, + CheckpointStatsStatus.COMPLETED, + false, + null, + 123, + 1234, + -1, + 42424242, + -1, + -1, + -1, + -1, + 0, + 0, + CheckpointStatistics.RestAPICheckpointType.CHECKPOINT, + Map.of(), + "path", + false), + null, + null, + null), + List.of()); + var flinkService = getTestingService( (h, p, r) -> { @@ -995,6 +1029,8 @@ public void getClusterInfoTest() throws Exception { return CompletableFuture.completedFuture(config); } else if (h instanceof TaskManagersHeaders) { return CompletableFuture.completedFuture(tmsInfo); + } else if (h instanceof CheckpointingStatisticsHeaders) { + return CompletableFuture.completedFuture(checkpointingStatistics); } fail("unknown request"); return null; @@ -1004,7 +1040,7 @@ public void getClusterInfoTest() throws Exception { conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000)); conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000)); - assertEquals( + Map expectedEntries = Map.of( DashboardConfiguration.FIELD_NAME_FLINK_VERSION, testVersion, @@ -1013,8 +1049,16 @@ public void getClusterInfoTest() throws Exception { AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "2.0", AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, - "" + MemorySize.ofMebiBytes(1000).getBytes() * 2), - flinkService.getClusterInfo(conf)); + "" + MemorySize.ofMebiBytes(1000).getBytes() * 2); + + assertEquals(expectedEntries, flinkService.getClusterInfo(conf, null)); + + assertEquals( + ImmutableMap.builder() + .putAll(expectedEntries) + .put(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "42424242") + .build(), + flinkService.getClusterInfo(conf, JobID.generate().toHexString())); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java new file mode 100644 index 0000000000..5e839522b0 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java @@ -0,0 +1,681 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; +import org.apache.flink.runtime.checkpoint.SavepointType; +import org.apache.flink.runtime.checkpoint.SnapshotType; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** Statistics for a checkpoint. Required for visibility of RestAPICheckpointType. */ +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "className") +@JsonSubTypes({ + @JsonSubTypes.Type( + value = CheckpointStatistics.CompletedCheckpointStatistics.class, + name = "completed"), + @JsonSubTypes.Type( + value = CheckpointStatistics.FailedCheckpointStatistics.class, + name = "failed"), + @JsonSubTypes.Type( + value = CheckpointStatistics.PendingCheckpointStatistics.class, + name = "in_progress") +}) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_SAVEPOINT_FORMAT = "savepointFormat"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_CHECKPOINTED_SIZE = "checkpointed_size"; + + /** + * The accurate name of this field should be 'checkpointed_data_size', keep it as before to not + * break backwards compatibility for old web UI. + * + * @see FLINK-13390 + */ + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_PROCESSED_DATA = "processed_data"; + + public static final String FIELD_NAME_PERSISTED_DATA = "persisted_data"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + public static final String FIELD_NAME_CHECKPOINT_TYPE = "checkpoint_type"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_SAVEPOINT_FORMAT) + @Nullable + private final String savepointFormat; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_CHECKPOINTED_SIZE) + private final long checkpointedSize; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_PROCESSED_DATA) + private final long processedData; + + @JsonProperty(FIELD_NAME_PERSISTED_DATA) + private final long persistedData; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) + private final RestAPICheckpointType checkpointType; + + @JsonProperty(FIELD_NAME_TASKS) + @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class) + private final Map checkpointStatisticsPerTask; + + @JsonCreator + private CheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_SAVEPOINT_FORMAT) String savepointFormat, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_CHECKPOINTED_SIZE) long checkpointedSize, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_PROCESSED_DATA) long processedData, + @JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType, + @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) + @JsonProperty(FIELD_NAME_TASKS) + Map checkpointStatisticsPerTask) { + this.id = id; + this.status = Preconditions.checkNotNull(status); + this.savepoint = savepoint; + this.savepointFormat = savepointFormat; + this.triggerTimestamp = triggerTimestamp; + this.latestAckTimestamp = latestAckTimestamp; + this.checkpointedSize = checkpointedSize; + this.stateSize = stateSize; + this.duration = duration; + this.alignmentBuffered = alignmentBuffered; + this.processedData = processedData; + this.persistedData = persistedData; + this.numSubtasks = numSubtasks; + this.numAckSubtasks = numAckSubtasks; + this.checkpointType = Preconditions.checkNotNull(checkpointType); + this.checkpointStatisticsPerTask = Preconditions.checkNotNull(checkpointStatisticsPerTask); + } + + public long getId() { + return id; + } + + public CheckpointStatsStatus getStatus() { + return status; + } + + public boolean isSavepoint() { + return savepoint; + } + + public long getTriggerTimestamp() { + return triggerTimestamp; + } + + public long getLatestAckTimestamp() { + return latestAckTimestamp; + } + + public long getCheckpointedSize() { + return checkpointedSize; + } + + public long getStateSize() { + return stateSize; + } + + public long getDuration() { + return duration; + } + + public int getNumSubtasks() { + return numSubtasks; + } + + public int getNumAckSubtasks() { + return numAckSubtasks; + } + + public RestAPICheckpointType getCheckpointType() { + return checkpointType; + } + + @Nullable + public Map getCheckpointStatisticsPerTask() { + return checkpointStatisticsPerTask; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointStatistics that = (CheckpointStatistics) o; + return id == that.id + && savepoint == that.savepoint + && Objects.equals(savepointFormat, that.savepointFormat) + && triggerTimestamp == that.triggerTimestamp + && latestAckTimestamp == that.latestAckTimestamp + && stateSize == that.stateSize + && duration == that.duration + && alignmentBuffered == that.alignmentBuffered + && processedData == that.processedData + && persistedData == that.persistedData + && numSubtasks == that.numSubtasks + && numAckSubtasks == that.numAckSubtasks + && status == that.status + && Objects.equals(checkpointType, that.checkpointType) + && Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask); + } + + @Override + public int hashCode() { + return Objects.hash( + id, + status, + savepoint, + savepointFormat, + triggerTimestamp, + latestAckTimestamp, + stateSize, + duration, + alignmentBuffered, + processedData, + persistedData, + numSubtasks, + numAckSubtasks, + checkpointType, + checkpointStatisticsPerTask); + } + + // ------------------------------------------------------------------------- + // Static factory methods + // ------------------------------------------------------------------------- + + public static CheckpointStatistics generateCheckpointStatistics( + AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) { + Preconditions.checkNotNull(checkpointStats); + + Map checkpointStatisticsPerTask; + + if (includeTaskCheckpointStatistics) { + Collection taskStateStats = checkpointStats.getAllTaskStateStats(); + + checkpointStatisticsPerTask = + CollectionUtil.newHashMapWithExpectedSize(taskStateStats.size()); + + for (TaskStateStats taskStateStat : taskStateStats) { + checkpointStatisticsPerTask.put( + taskStateStat.getJobVertexId(), + new TaskCheckpointStatistics( + checkpointStats.getCheckpointId(), + checkpointStats.getStatus(), + taskStateStat.getLatestAckTimestamp(), + taskStateStat.getCheckpointedSize(), + taskStateStat.getStateSize(), + taskStateStat.getEndToEndDuration( + checkpointStats.getTriggerTimestamp()), + 0, + taskStateStat.getProcessedDataStats(), + taskStateStat.getPersistedDataStats(), + taskStateStat.getNumberOfSubtasks(), + taskStateStat.getNumberOfAcknowledgedSubtasks())); + } + } else { + checkpointStatisticsPerTask = Collections.emptyMap(); + } + String savepointFormat = null; + SnapshotType snapshotType = checkpointStats.getProperties().getCheckpointType(); + if (snapshotType instanceof SavepointType) { + savepointFormat = ((SavepointType) snapshotType).getFormatType().name(); + } + if (checkpointStats instanceof CompletedCheckpointStats) { + final CompletedCheckpointStats completedCheckpointStats = + ((CompletedCheckpointStats) checkpointStats); + + return new CompletedCheckpointStatistics( + completedCheckpointStats.getCheckpointId(), + completedCheckpointStats.getStatus(), + snapshotType.isSavepoint(), + savepointFormat, + completedCheckpointStats.getTriggerTimestamp(), + completedCheckpointStats.getLatestAckTimestamp(), + completedCheckpointStats.getCheckpointedSize(), + completedCheckpointStats.getStateSize(), + completedCheckpointStats.getEndToEndDuration(), + 0, + completedCheckpointStats.getProcessedData(), + completedCheckpointStats.getPersistedData(), + completedCheckpointStats.getNumberOfSubtasks(), + completedCheckpointStats.getNumberOfAcknowledgedSubtasks(), + RestAPICheckpointType.valueOf( + completedCheckpointStats.getProperties().getCheckpointType(), + completedCheckpointStats.isUnalignedCheckpoint()), + checkpointStatisticsPerTask, + completedCheckpointStats.getExternalPath(), + completedCheckpointStats.isDiscarded()); + } else if (checkpointStats instanceof FailedCheckpointStats) { + final FailedCheckpointStats failedCheckpointStats = + ((FailedCheckpointStats) checkpointStats); + + return new FailedCheckpointStatistics( + failedCheckpointStats.getCheckpointId(), + failedCheckpointStats.getStatus(), + failedCheckpointStats.getProperties().isSavepoint(), + savepointFormat, + failedCheckpointStats.getTriggerTimestamp(), + failedCheckpointStats.getLatestAckTimestamp(), + failedCheckpointStats.getCheckpointedSize(), + failedCheckpointStats.getStateSize(), + failedCheckpointStats.getEndToEndDuration(), + 0, + failedCheckpointStats.getProcessedData(), + failedCheckpointStats.getPersistedData(), + failedCheckpointStats.getNumberOfSubtasks(), + failedCheckpointStats.getNumberOfAcknowledgedSubtasks(), + RestAPICheckpointType.valueOf( + failedCheckpointStats.getProperties().getCheckpointType(), + failedCheckpointStats.isUnalignedCheckpoint()), + checkpointStatisticsPerTask, + failedCheckpointStats.getFailureTimestamp(), + failedCheckpointStats.getFailureMessage()); + } else if (checkpointStats instanceof PendingCheckpointStats) { + final PendingCheckpointStats pendingCheckpointStats = + ((PendingCheckpointStats) checkpointStats); + + return new PendingCheckpointStatistics( + pendingCheckpointStats.getCheckpointId(), + pendingCheckpointStats.getStatus(), + pendingCheckpointStats.getProperties().isSavepoint(), + savepointFormat, + pendingCheckpointStats.getTriggerTimestamp(), + pendingCheckpointStats.getLatestAckTimestamp(), + pendingCheckpointStats.getCheckpointedSize(), + pendingCheckpointStats.getStateSize(), + pendingCheckpointStats.getEndToEndDuration(), + 0, + pendingCheckpointStats.getProcessedData(), + pendingCheckpointStats.getPersistedData(), + pendingCheckpointStats.getNumberOfSubtasks(), + pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(), + RestAPICheckpointType.valueOf( + pendingCheckpointStats.getProperties().getCheckpointType(), + pendingCheckpointStats.isUnalignedCheckpoint()), + checkpointStatisticsPerTask); + } else { + throw new IllegalArgumentException( + "Given checkpoint stats object of type " + + checkpointStats.getClass().getName() + + " cannot be converted."); + } + } + + /** + * Backward compatibility layer between internal {@link CheckpointType} and a field used in + * {@link CheckpointStatistics}. + */ + public enum RestAPICheckpointType { + CHECKPOINT, + UNALIGNED_CHECKPOINT, + SAVEPOINT, + SYNC_SAVEPOINT; + + public static RestAPICheckpointType valueOf( + SnapshotType checkpointType, boolean isUnalignedCheckpoint) { + if (checkpointType.isSavepoint()) { + Preconditions.checkArgument( + !isUnalignedCheckpoint, + "Currently the savepoint doesn't support unaligned checkpoint."); + SavepointType savepointType = (SavepointType) checkpointType; + return savepointType.isSynchronous() ? SYNC_SAVEPOINT : SAVEPOINT; + } + if (isUnalignedCheckpoint) { + return UNALIGNED_CHECKPOINT; + } + return CHECKPOINT; + } + } + + // --------------------------------------------------------------------- + // Static inner classes + // --------------------------------------------------------------------- + + /** Statistics for a completed checkpoint. */ + public static final class CompletedCheckpointStatistics extends CheckpointStatistics { + + public static final String FIELD_NAME_EXTERNAL_PATH = "external_path"; + + public static final String FIELD_NAME_DISCARDED = "discarded"; + + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) + @Nullable + private final String externalPath; + + @JsonProperty(FIELD_NAME_DISCARDED) + private final boolean discarded; + + @JsonCreator + public CompletedCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_SAVEPOINT_FORMAT) String savepointFormat, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_CHECKPOINTED_SIZE) long checkpointedSize, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_PROCESSED_DATA) long processedData, + @JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType, + @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) + @JsonProperty(FIELD_NAME_TASKS) + Map checkpointingStatisticsPerTask, + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath, + @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) { + super( + id, + status, + savepoint, + savepointFormat, + triggerTimestamp, + latestAckTimestamp, + checkpointedSize, + stateSize, + duration, + alignmentBuffered, + processedData, + persistedData, + numSubtasks, + numAckSubtasks, + checkpointType, + checkpointingStatisticsPerTask); + + this.externalPath = externalPath; + this.discarded = discarded; + } + + @Nullable + public String getExternalPath() { + return externalPath; + } + + public boolean isDiscarded() { + return discarded; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o; + return discarded == that.discarded && Objects.equals(externalPath, that.externalPath); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), externalPath, discarded); + } + } + + /** Statistics for a failed checkpoint. */ + public static final class FailedCheckpointStatistics extends CheckpointStatistics { + + public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp"; + + public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message"; + + @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) + private final long failureTimestamp; + + @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) + @Nullable + private final String failureMessage; + + @JsonCreator + public FailedCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_SAVEPOINT_FORMAT) String savepointFormat, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_CHECKPOINTED_SIZE) long checkpointedSize, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_PROCESSED_DATA) long processedData, + @JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType, + @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) + @JsonProperty(FIELD_NAME_TASKS) + Map checkpointingStatisticsPerTask, + @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp, + @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) { + super( + id, + status, + savepoint, + savepointFormat, + triggerTimestamp, + latestAckTimestamp, + checkpointedSize, + stateSize, + duration, + alignmentBuffered, + processedData, + persistedData, + numSubtasks, + numAckSubtasks, + checkpointType, + checkpointingStatisticsPerTask); + + this.failureTimestamp = failureTimestamp; + this.failureMessage = failureMessage; + } + + public long getFailureTimestamp() { + return failureTimestamp; + } + + @Nullable + public String getFailureMessage() { + return failureMessage; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + FailedCheckpointStatistics that = (FailedCheckpointStatistics) o; + return failureTimestamp == that.failureTimestamp + && Objects.equals(failureMessage, that.failureMessage); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), failureTimestamp, failureMessage); + } + } + + /** Statistics for a pending checkpoint. */ + public static final class PendingCheckpointStatistics extends CheckpointStatistics { + + @JsonCreator + public PendingCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_SAVEPOINT_FORMAT) String savepointFormat, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_CHECKPOINTED_SIZE) long checkpointedSize, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_PROCESSED_DATA) long processedData, + @JsonProperty(FIELD_NAME_PERSISTED_DATA) long persistedData, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) RestAPICheckpointType checkpointType, + @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) + @JsonProperty(FIELD_NAME_TASKS) + Map checkpointingStatisticsPerTask) { + super( + id, + status, + savepoint, + savepointFormat, + triggerTimestamp, + latestAckTimestamp, + checkpointedSize, + stateSize, + duration, + alignmentBuffered, + processedData, + persistedData, + numSubtasks, + numAckSubtasks, + checkpointType, + checkpointingStatisticsPerTask); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode()); + } + } +}