diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index c14d92dfe4..4bd2836f7a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -237,6 +237,7 @@ void registerSnapshotController() { reconciler, observer, eventRecorder, + metricManager, statusRecorder); registeredControllers.add(operator.register(controller, this::overrideControllerConfigs)); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java index 4f91610499..1a705967d3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java @@ -19,6 +19,7 @@ import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.metrics.MetricManager; import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler; @@ -65,6 +66,7 @@ public class FlinkStateSnapshotController private final StateSnapshotReconciler reconciler; private final StateSnapshotObserver observer; private final EventRecorder eventRecorder; + private final MetricManager metricManager; private final StatusRecorder statusRecorder; @Override @@ -82,7 +84,7 @@ public UpdateControl reconcile( reconciler.reconcile(ctx); } - notifyListeners(ctx); + notifyListenersAndMetricManager(ctx); return getUpdateControl(ctx); } @@ -91,6 +93,7 @@ public DeleteControl cleanup( FlinkStateSnapshot flinkStateSnapshot, Context josdkContext) { var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, josdkContext); try { + metricManager.onRemove(flinkStateSnapshot); return reconciler.cleanup(ctx); } catch (Exception e) { eventRecorder.triggerSnapshotEvent( @@ -131,7 +134,7 @@ public ErrorStatusUpdateControl updateErrorStatus( LOG.info( "Snapshot {} failed and won't be retried as failure count exceeded the backoff limit", resource.getMetadata().getName()); - notifyListeners(ctx); + notifyListenersAndMetricManager(ctx); return ErrorStatusUpdateControl.patchStatus(resource).withNoRetry(); } @@ -142,7 +145,7 @@ public ErrorStatusUpdateControl updateErrorStatus( retrySeconds); FlinkStateSnapshotUtils.snapshotTriggerPending(resource); - notifyListeners(ctx); + notifyListenersAndMetricManager(ctx); return ErrorStatusUpdateControl.patchStatus(resource) .rescheduleAfter(Duration.ofSeconds(retrySeconds)); } @@ -173,10 +176,11 @@ private UpdateControl getUpdateControl(FlinkStateSnapshotCon } } - private void notifyListeners(FlinkStateSnapshotContext ctx) { + private void notifyListenersAndMetricManager(FlinkStateSnapshotContext ctx) { if (!ctx.getOriginalStatus().equals(ctx.getResource().getStatus())) { statusRecorder.notifyListeners(ctx.getResource(), ctx.getOriginalStatus()); } + metricManager.onUpdate(ctx.getResource()); } private boolean validateSnapshot(FlinkStateSnapshotContext ctx) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetrics.java index 6eb50c3912..fc4f092970 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetrics.java @@ -19,8 +19,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -29,8 +31,14 @@ public class FlinkStateSnapshotMetrics implements CustomResourceMetrics> snapshots = new ConcurrentHashMap<>(); - public static final String COUNTER_NAME = "Count"; + private final Map>> checkpointStatuses = + new ConcurrentHashMap<>(); + private final Map>> savepointStatuses = + new ConcurrentHashMap<>(); + public static final String COUNTER_GROUP_NAME = "Count"; + public static final String STATE_GROUP_NAME = "State"; + public static final String CHECKPOINT_GROUP_NAME = "Checkpoint"; + public static final String SAVEPOINT_GROUP_NAME = "Savepoint"; public FlinkStateSnapshotMetrics( KubernetesOperatorMetricGroup parentMetricGroup, Configuration configuration) { @@ -39,29 +47,107 @@ public FlinkStateSnapshotMetrics( } public void onUpdate(FlinkStateSnapshot snapshot) { + if (snapshot.getStatus() == null || snapshot.getStatus().getState() == null) { + return; + } + onRemove(snapshot); - snapshots + getSnapshotMap(snapshot) .computeIfAbsent( snapshot.getMetadata().getNamespace(), ns -> { + initNamespaceSnapshotStates(ns); initNamespaceSnapshotCounts(ns); - return ConcurrentHashMap.newKeySet(); + return createSnapshotStateMap(); }) + .get(snapshot.getStatus().getState()) .add(snapshot.getMetadata().getName()); } public void onRemove(FlinkStateSnapshot snapshot) { - if (!snapshots.containsKey(snapshot.getMetadata().getNamespace())) { - return; + var namespace = snapshot.getMetadata().getNamespace(); + var name = snapshot.getMetadata().getName(); + var snapshotMap = getSnapshotMap(snapshot); + + if (snapshotMap.containsKey(namespace)) { + snapshotMap.get(namespace).values().forEach(names -> names.remove(name)); + } + } + + private Map>> getSnapshotMap( + FlinkStateSnapshot snapshot) { + if (snapshot.getSpec().isSavepoint()) { + return savepointStatuses; + } else { + return checkpointStatuses; } - snapshots - .get(snapshot.getMetadata().getNamespace()) - .remove(snapshot.getMetadata().getName()); } private void initNamespaceSnapshotCounts(String ns) { - parentMetricGroup - .createResourceNamespaceGroup(configuration, FlinkStateSnapshot.class, ns) - .gauge(COUNTER_NAME, () -> snapshots.get(ns).size()); + var mainGroup = + parentMetricGroup.createResourceNamespaceGroup( + configuration, FlinkStateSnapshot.class, ns); + + mainGroup + .addGroup(CHECKPOINT_GROUP_NAME) + .gauge( + COUNTER_GROUP_NAME, + () -> { + if (!checkpointStatuses.containsKey(ns)) { + return 0; + } + return checkpointStatuses.get(ns).values().stream() + .mapToInt(Set::size) + .sum(); + }); + mainGroup + .addGroup(SAVEPOINT_GROUP_NAME) + .gauge( + COUNTER_GROUP_NAME, + () -> { + if (!savepointStatuses.containsKey(ns)) { + return 0; + } + return savepointStatuses.get(ns).values().stream() + .mapToInt(Set::size) + .sum(); + }); + } + + private void initNamespaceSnapshotStates(String ns) { + var mainGroup = + parentMetricGroup.createResourceNamespaceGroup( + configuration, FlinkStateSnapshot.class, ns); + + for (var state : FlinkStateSnapshotStatus.State.values()) { + mainGroup + .addGroup(CHECKPOINT_GROUP_NAME) + .addGroup(STATE_GROUP_NAME) + .addGroup(state.toString()) + .gauge( + COUNTER_GROUP_NAME, + () -> + Optional.ofNullable(checkpointStatuses.get(ns)) + .map(s -> s.get(state).size()) + .orElse(0)); + mainGroup + .addGroup(SAVEPOINT_GROUP_NAME) + .addGroup(STATE_GROUP_NAME) + .addGroup(state.toString()) + .gauge( + COUNTER_GROUP_NAME, + () -> + Optional.ofNullable(savepointStatuses.get(ns)) + .map(s -> s.get(state).size()) + .orElse(0)); + } + } + + private Map> createSnapshotStateMap() { + Map> statuses = new ConcurrentHashMap<>(); + for (var state : FlinkStateSnapshotStatus.State.values()) { + statuses.put(state, ConcurrentHashMap.newKeySet()); + } + return statuses; } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java index 43200fa42e..1c4eb50037 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java @@ -68,6 +68,12 @@ public StatusRecorder( this.metricManager = metricManager; } + /** + * Notifies status update listeners of changes made to a resource. + * + * @param resource resource that has been updated + * @param prevStatus previous status of resource + */ public void notifyListeners(CR resource, STATUS prevStatus) { statusUpdateListener.accept(resource, prevStatus); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java index 81cf121b27..9b68bea695 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java @@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.metrics.MetricManager; +import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener; import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler; @@ -56,6 +57,7 @@ import javax.annotation.Nullable; import java.time.Instant; +import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -64,13 +66,15 @@ import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetricsUtils.assertSnapshotMetrics; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; /** Test class for {@link FlinkStateSnapshotController}. */ @EnableKubernetesMockClient(crud = true) public class FlinkStateSnapshotControllerTest { - private static final String SNAPSHOT_NAME = "snapshot-test"; + private static final String SAVEPOINT_NAME = "savepoint-test"; + private static final String CHECKPOINT_NAME = "checkpoint-test"; private static final String SAVEPOINT_PATH = "/tmp/asd"; private static final String JOB_ID = "fd72014d4c864993a2e5a9287b4a9c5d"; @@ -81,6 +85,8 @@ public class FlinkStateSnapshotControllerTest { private FlinkStateSnapshotEventCollector flinkStateSnapshotEventCollector; private EventRecorder eventRecorder; private TestingFlinkResourceContextFactory ctxFactory; + private TestingMetricListener listener; + private MetricManager metricManager; private StatusRecorder statusRecorder; private FlinkStateSnapshotController controller; private Context context; @@ -97,7 +103,12 @@ public void beforeEach() { TestUtils.createTestMetricGroup(new Configuration()), flinkService, eventRecorder); - statusRecorder = new StatusRecorder<>(new MetricManager<>(), statusUpdateCounter); + + listener = new TestingMetricListener(new Configuration()); + metricManager = + MetricManager.createFlinkStateSnapshotMetricManager( + new Configuration(), listener.getMetricGroup()); + statusRecorder = new StatusRecorder<>(metricManager, statusUpdateCounter); controller = new FlinkStateSnapshotController( ValidatorUtils.discoverValidators(configManager), @@ -105,6 +116,7 @@ public void beforeEach() { new StateSnapshotReconciler(ctxFactory, eventRecorder), new StateSnapshotObserver(ctxFactory, eventRecorder), eventRecorder, + metricManager, statusRecorder); } @@ -514,8 +526,10 @@ public void testReconcileJobNotFound() { var snapshot = createSavepoint(deployment); var errorMessage = String.format( - "Secondary resource %s (%s) for savepoint snapshot-test was not found", - deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT); + "Secondary resource %s (%s) for savepoint %s was not found", + deployment.getMetadata().getName(), + CrdConstants.KIND_FLINK_DEPLOYMENT, + SAVEPOINT_NAME); // First reconcile will trigger the snapshot. controller.reconcile(snapshot, TestUtils.createSnapshotContext(client, deployment)); @@ -556,8 +570,10 @@ public void testReconcileJobNotRunning() { var snapshot = createSavepoint(deployment); var errorMessage = String.format( - "Secondary resource %s (%s) for savepoint snapshot-test is not running", - deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT); + "Secondary resource %s (%s) for savepoint %s is not running", + deployment.getMetadata().getName(), + CrdConstants.KIND_FLINK_DEPLOYMENT, + SAVEPOINT_NAME); controller.reconcile(snapshot, context); @@ -579,6 +595,37 @@ public void testReconcileJobNotRunning() { }); } + @Test + public void testMetrics() { + var deployment = createDeployment(); + var savepoint = createSavepoint(deployment); + savepoint.getSpec().getSavepoint().setDisposeOnDelete(false); + var checkpoint = createCheckpoint(deployment, CheckpointType.FULL, 1); + + context = TestUtils.createSnapshotContext(client, deployment); + + controller.reconcile(savepoint, context); + controller.reconcile(savepoint, context); + controller.reconcile(savepoint, context); + assertThat(savepoint.getStatus().getState()).isEqualTo(COMPLETED); + + controller.reconcile(checkpoint, context); + controller.reconcile(checkpoint, context); + controller.reconcile(checkpoint, context); + assertThat(checkpoint.getStatus().getState()).isEqualTo(COMPLETED); + + assertSnapshotMetrics( + listener, TestUtils.TEST_NAMESPACE, Map.of(COMPLETED, 1), Map.of(COMPLETED, 1)); + + // Remove savepoint + assertDeleteControl(controller.cleanup(savepoint, context), true, null); + assertSnapshotMetrics(listener, TestUtils.TEST_NAMESPACE, Map.of(), Map.of(COMPLETED, 1)); + + // Remove checkpoint + assertDeleteControl(controller.cleanup(checkpoint, context), true, null); + assertSnapshotMetrics(listener, TestUtils.TEST_NAMESPACE, Map.of(), Map.of()); + } + private FlinkStateSnapshot createSavepoint(FlinkDeployment deployment) { return createSavepoint(deployment, false, 7); } @@ -591,8 +638,8 @@ private FlinkStateSnapshot createSavepoint( FlinkDeployment deployment, boolean alreadyExists, int backoffLimit) { var snapshot = TestUtils.buildFlinkStateSnapshotSavepoint( - SNAPSHOT_NAME, - "test", + SAVEPOINT_NAME, + TestUtils.TEST_NAMESPACE, SAVEPOINT_PATH, alreadyExists, deployment == null ? null : JobReference.fromFlinkResource(deployment)); @@ -606,8 +653,8 @@ private FlinkStateSnapshot createCheckpoint( FlinkDeployment deployment, CheckpointType checkpointType, int backoffLimit) { var snapshot = TestUtils.buildFlinkStateSnapshotCheckpoint( - SNAPSHOT_NAME, - "test", + CHECKPOINT_NAME, + TestUtils.TEST_NAMESPACE, checkpointType, JobReference.fromFlinkResource(deployment)); snapshot.getSpec().setBackoffLimit(backoffLimit); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetricsTest.java new file mode 100644 index 0000000000..9ad5115b80 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetricsTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.status.CheckpointType; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetrics.CHECKPOINT_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetrics.COUNTER_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetrics.SAVEPOINT_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetricsUtils.assertSnapshotMetrics; +import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FlinkStateSnapshotMetrics}. */ +@Slf4j +public class FlinkStateSnapshotMetricsTest { + private final Configuration configuration = new Configuration(); + private TestingMetricListener listener; + private MetricManager metricManager; + + @BeforeEach + public void init() { + listener = new TestingMetricListener(configuration); + metricManager = + MetricManager.createFlinkStateSnapshotMetricManager( + configuration, listener.getMetricGroup()); + } + + @Test + public void testMetricsSameNamespace() { + var namespace = TestUtils.TEST_NAMESPACE; + var savepoint1 = createEmptySavepoint("savepoint1", State.COMPLETED); + var savepoint2 = createEmptySavepoint("savepoint2", State.IN_PROGRESS); + + var counterIdSavepoint = + listener.getNamespaceMetricId( + FlinkStateSnapshot.class, + namespace, + SAVEPOINT_GROUP_NAME, + COUNTER_GROUP_NAME); + var counterIdCheckpoint = + listener.getNamespaceMetricId( + FlinkStateSnapshot.class, + namespace, + CHECKPOINT_GROUP_NAME, + COUNTER_GROUP_NAME); + assertThat(listener.getGauge(counterIdSavepoint)).isEmpty(); + assertThat(listener.getGauge(counterIdCheckpoint)).isEmpty(); + + metricManager.onUpdate(savepoint1); + metricManager.onUpdate(savepoint2); + assertSnapshotMetrics( + listener, namespace, Map.of(State.COMPLETED, 1, State.IN_PROGRESS, 1), Map.of()); + + savepoint2.getStatus().setState(State.COMPLETED); + metricManager.onUpdate(savepoint2); + assertSnapshotMetrics(listener, namespace, Map.of(State.COMPLETED, 2), Map.of()); + + var checkpoint1 = createEmptyCheckpoint("checkpoint1", State.FAILED); + var checkpoint2 = createEmptyCheckpoint("checkpoint2", State.COMPLETED); + var checkpoint3 = createEmptyCheckpoint("checkpoint3", State.COMPLETED); + + metricManager.onUpdate(checkpoint1); + metricManager.onUpdate(checkpoint2); + metricManager.onUpdate(checkpoint3); + assertSnapshotMetrics( + listener, + namespace, + Map.of(State.COMPLETED, 2), + Map.of(State.FAILED, 1, State.COMPLETED, 2)); + + metricManager.onRemove(savepoint1); + metricManager.onRemove(checkpoint1); + metricManager.onRemove(checkpoint2); + assertSnapshotMetrics( + listener, namespace, Map.of(State.COMPLETED, 1), Map.of(State.COMPLETED, 1)); + } + + @Test + public void testMetricsMultipleNamespace() { + var namespace1 = "namespace1"; + var namespace2 = "namespace2"; + var namespace3 = "namespace3"; + var savepoint1 = createEmptySavepoint(namespace1, "savepoint", State.IN_PROGRESS); + var savepoint2 = createEmptySavepoint(namespace2, "savepoint", State.IN_PROGRESS); + + metricManager.onUpdate(savepoint1); + metricManager.onUpdate(savepoint2); + assertSnapshotMetrics(listener, namespace1, Map.of(State.IN_PROGRESS, 1), Map.of()); + assertSnapshotMetrics(listener, namespace2, Map.of(State.IN_PROGRESS, 1), Map.of()); + + savepoint1.getStatus().setState(State.COMPLETED); + metricManager.onUpdate(savepoint1); + metricManager.onUpdate(savepoint2); + assertSnapshotMetrics(listener, namespace1, Map.of(State.COMPLETED, 1), Map.of()); + assertSnapshotMetrics(listener, namespace2, Map.of(State.IN_PROGRESS, 1), Map.of()); + + var checkpoint1 = createEmptyCheckpoint(namespace1, "checkpoint1", State.FAILED); + var checkpoint2 = createEmptyCheckpoint(namespace2, "checkpoint2", State.COMPLETED); + var checkpoint3 = createEmptyCheckpoint(namespace3, "checkpoint3", State.IN_PROGRESS); + + metricManager.onUpdate(checkpoint1); + metricManager.onUpdate(checkpoint2); + metricManager.onUpdate(checkpoint3); + assertSnapshotMetrics( + listener, namespace1, Map.of(State.COMPLETED, 1), Map.of(State.FAILED, 1)); + assertSnapshotMetrics( + listener, namespace2, Map.of(State.IN_PROGRESS, 1), Map.of(State.COMPLETED, 1)); + assertSnapshotMetrics(listener, namespace3, Map.of(), Map.of(State.IN_PROGRESS, 1)); + + metricManager.onRemove(savepoint1); + metricManager.onRemove(checkpoint1); + metricManager.onRemove(checkpoint2); + assertSnapshotMetrics(listener, namespace1, Map.of(), Map.of()); + assertSnapshotMetrics(listener, namespace2, Map.of(State.IN_PROGRESS, 1), Map.of()); + assertSnapshotMetrics(listener, namespace3, Map.of(), Map.of(State.IN_PROGRESS, 1)); + } + + @Test + public void testMetricsDisabled() { + var conf = new Configuration(); + conf.set(OPERATOR_RESOURCE_METRICS_ENABLED, false); + var listener = new TestingMetricListener(conf); + var metricManager = + MetricManager.createFlinkStateSnapshotMetricManager( + conf, listener.getMetricGroup()); + + var namespace = TestUtils.TEST_NAMESPACE; + var savepoint = createEmptySavepoint(namespace, "savepoint", State.IN_PROGRESS); + metricManager.onUpdate(savepoint); + var counterIdSavepoint = + listener.getNamespaceMetricId( + FlinkStateSnapshot.class, + namespace, + SAVEPOINT_GROUP_NAME, + COUNTER_GROUP_NAME); + var counterIdCheckpoint = + listener.getNamespaceMetricId( + FlinkStateSnapshot.class, + namespace, + CHECKPOINT_GROUP_NAME, + COUNTER_GROUP_NAME); + assertThat(listener.getGauge(counterIdSavepoint)).isEmpty(); + assertThat(listener.getGauge(counterIdCheckpoint)).isEmpty(); + } + + private FlinkStateSnapshot createEmptySavepoint(String name, State state) { + return createEmptySavepoint(TestUtils.TEST_NAMESPACE, name, state); + } + + private FlinkStateSnapshot createEmptySavepoint(String namespace, String name, State state) { + var savepoint = + TestUtils.buildFlinkStateSnapshotSavepoint(name, namespace, "", false, null); + savepoint.setStatus(FlinkStateSnapshotStatus.builder().state(state).build()); + return savepoint; + } + + private FlinkStateSnapshot createEmptyCheckpoint(String name, State state) { + return createEmptyCheckpoint(TestUtils.TEST_NAMESPACE, name, state); + } + + private FlinkStateSnapshot createEmptyCheckpoint(String namespace, String name, State state) { + var checkpoint = + TestUtils.buildFlinkStateSnapshotCheckpoint( + name, namespace, CheckpointType.FULL, null); + checkpoint.setStatus(FlinkStateSnapshotStatus.builder().state(state).build()); + return checkpoint; + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetricsUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetricsUtils.java new file mode 100644 index 0000000000..0a27289b48 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetricsUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.metrics; + +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.metrics.Gauge; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetrics.CHECKPOINT_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetrics.COUNTER_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetrics.SAVEPOINT_GROUP_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetrics.STATE_GROUP_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +/** Utils for testing FlinkStateSnapshot metrics. */ +public class FlinkStateSnapshotMetricsUtils { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkStateSnapshotMetricsUtils.class); + + public static void assertSnapshotMetrics( + TestingMetricListener listener, + String namespace, + Map savepointExpectedCountMap, + Map checkpointExpectedCountMap) { + assertSnapshotMetricsForGroup( + listener, namespace, SAVEPOINT_GROUP_NAME, savepointExpectedCountMap); + assertSnapshotMetricsForGroup( + listener, namespace, CHECKPOINT_GROUP_NAME, checkpointExpectedCountMap); + } + + private static void assertSnapshotMetricsForGroup( + TestingMetricListener listener, + String namespace, + String snapshotGroupName, + Map expectedStateCountMap) { + var snapshotCount = expectedStateCountMap.values().stream().mapToInt(i -> i).sum(); + var counterIdCheckpoint = + listener.getNamespaceMetricId( + FlinkStateSnapshot.class, namespace, snapshotGroupName, COUNTER_GROUP_NAME); + assertThat(listener.getGauge(counterIdCheckpoint)) + .isPresent() + .get() + .extracting(Gauge::getValue) + .isEqualTo(snapshotCount); + + for (FlinkStateSnapshotStatus.State state : FlinkStateSnapshotStatus.State.values()) { + var stateId = + listener.getNamespaceMetricId( + FlinkStateSnapshot.class, + namespace, + snapshotGroupName, + STATE_GROUP_NAME, + state.name(), + COUNTER_GROUP_NAME); + var expectedStateCount = expectedStateCountMap.getOrDefault(state, 0); + + LOG.info("Asserting snapshot metrics {} = {}", stateId, expectedStateCount); + assertThat(listener.getGauge(stateId)) + .isPresent() + .get() + .extracting(Gauge::getValue) + .isEqualTo(expectedStateCount); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java index 0dbb5369da..208c446f26 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.metrics; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; @@ -29,6 +28,8 @@ import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import io.fabric8.kubernetes.client.CustomResource; + import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -101,7 +102,7 @@ public String getMetricId(String... identifiers) { } public String getNamespaceMetricId( - Class> resourceClass, + Class> resourceClass, String resourceNs, String... identifiers) { return metricGroup @@ -110,7 +111,7 @@ public String getNamespaceMetricId( } public String getResourceMetricId( - Class> resourceClass, + Class> resourceClass, String resourceNs, String resourceName, String... identifiers) {