Skip to content

Commit 6a27b3e

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-35492][snapshot] Add metrics for snapshot resources
1 parent d1827a4 commit 6a27b3e

File tree

8 files changed

+454
-29
lines changed

8 files changed

+454
-29
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ void registerSnapshotController() {
237237
reconciler,
238238
observer,
239239
eventRecorder,
240+
metricManager,
240241
statusRecorder);
241242
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
242243
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
2121
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
22+
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
2223
import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
2324
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2425
import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
@@ -65,6 +66,7 @@ public class FlinkStateSnapshotController
6566
private final StateSnapshotReconciler reconciler;
6667
private final StateSnapshotObserver observer;
6768
private final EventRecorder eventRecorder;
69+
private final MetricManager<FlinkStateSnapshot> metricManager;
6870
private final StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> statusRecorder;
6971

7072
@Override
@@ -82,7 +84,7 @@ public UpdateControl<FlinkStateSnapshot> reconcile(
8284
reconciler.reconcile(ctx);
8385
}
8486

85-
notifyListeners(ctx);
87+
notifyListenersAndMetricManager(ctx);
8688
return getUpdateControl(ctx);
8789
}
8890

@@ -91,6 +93,7 @@ public DeleteControl cleanup(
9193
FlinkStateSnapshot flinkStateSnapshot, Context<FlinkStateSnapshot> josdkContext) {
9294
var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, josdkContext);
9395
try {
96+
metricManager.onRemove(flinkStateSnapshot);
9497
return reconciler.cleanup(ctx);
9598
} catch (Exception e) {
9699
eventRecorder.triggerSnapshotEvent(
@@ -131,7 +134,7 @@ public ErrorStatusUpdateControl<FlinkStateSnapshot> updateErrorStatus(
131134
LOG.info(
132135
"Snapshot {} failed and won't be retried as failure count exceeded the backoff limit",
133136
resource.getMetadata().getName());
134-
notifyListeners(ctx);
137+
notifyListenersAndMetricManager(ctx);
135138
return ErrorStatusUpdateControl.patchStatus(resource).withNoRetry();
136139
}
137140

@@ -142,7 +145,7 @@ public ErrorStatusUpdateControl<FlinkStateSnapshot> updateErrorStatus(
142145
retrySeconds);
143146
FlinkStateSnapshotUtils.snapshotTriggerPending(resource);
144147

145-
notifyListeners(ctx);
148+
notifyListenersAndMetricManager(ctx);
146149
return ErrorStatusUpdateControl.patchStatus(resource)
147150
.rescheduleAfter(Duration.ofSeconds(retrySeconds));
148151
}
@@ -173,10 +176,11 @@ private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotCon
173176
}
174177
}
175178

176-
private void notifyListeners(FlinkStateSnapshotContext ctx) {
179+
private void notifyListenersAndMetricManager(FlinkStateSnapshotContext ctx) {
177180
if (!ctx.getOriginalStatus().equals(ctx.getResource().getStatus())) {
178181
statusRecorder.notifyListeners(ctx.getResource(), ctx.getOriginalStatus());
179182
}
183+
metricManager.onUpdate(ctx.getResource());
180184
}
181185

182186
private boolean validateSnapshot(FlinkStateSnapshotContext ctx) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetrics.java

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import org.apache.flink.configuration.Configuration;
2121
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
22+
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
2223

2324
import java.util.Map;
25+
import java.util.Optional;
2426
import java.util.Set;
2527
import java.util.concurrent.ConcurrentHashMap;
2628

@@ -29,8 +31,14 @@ public class FlinkStateSnapshotMetrics implements CustomResourceMetrics<FlinkSta
2931

3032
private final KubernetesOperatorMetricGroup parentMetricGroup;
3133
private final Configuration configuration;
32-
private final Map<String, Set<String>> snapshots = new ConcurrentHashMap<>();
33-
public static final String COUNTER_NAME = "Count";
34+
private final Map<String, Map<FlinkStateSnapshotStatus.State, Set<String>>> checkpointStatuses =
35+
new ConcurrentHashMap<>();
36+
private final Map<String, Map<FlinkStateSnapshotStatus.State, Set<String>>> savepointStatuses =
37+
new ConcurrentHashMap<>();
38+
public static final String COUNTER_GROUP_NAME = "Count";
39+
public static final String STATE_GROUP_NAME = "State";
40+
public static final String CHECKPOINT_GROUP_NAME = "Checkpoint";
41+
public static final String SAVEPOINT_GROUP_NAME = "Savepoint";
3442

3543
public FlinkStateSnapshotMetrics(
3644
KubernetesOperatorMetricGroup parentMetricGroup, Configuration configuration) {
@@ -39,29 +47,107 @@ public FlinkStateSnapshotMetrics(
3947
}
4048

4149
public void onUpdate(FlinkStateSnapshot snapshot) {
50+
if (snapshot.getStatus() == null || snapshot.getStatus().getState() == null) {
51+
return;
52+
}
53+
4254
onRemove(snapshot);
43-
snapshots
55+
getSnapshotMap(snapshot)
4456
.computeIfAbsent(
4557
snapshot.getMetadata().getNamespace(),
4658
ns -> {
59+
initNamespaceSnapshotStates(ns);
4760
initNamespaceSnapshotCounts(ns);
48-
return ConcurrentHashMap.newKeySet();
61+
return createSnapshotStateMap();
4962
})
63+
.get(snapshot.getStatus().getState())
5064
.add(snapshot.getMetadata().getName());
5165
}
5266

5367
public void onRemove(FlinkStateSnapshot snapshot) {
54-
if (!snapshots.containsKey(snapshot.getMetadata().getNamespace())) {
55-
return;
68+
var namespace = snapshot.getMetadata().getNamespace();
69+
var name = snapshot.getMetadata().getName();
70+
var snapshotMap = getSnapshotMap(snapshot);
71+
72+
if (snapshotMap.containsKey(namespace)) {
73+
snapshotMap.get(namespace).values().forEach(names -> names.remove(name));
74+
}
75+
}
76+
77+
private Map<String, Map<FlinkStateSnapshotStatus.State, Set<String>>> getSnapshotMap(
78+
FlinkStateSnapshot snapshot) {
79+
if (snapshot.getSpec().isSavepoint()) {
80+
return savepointStatuses;
81+
} else {
82+
return checkpointStatuses;
5683
}
57-
snapshots
58-
.get(snapshot.getMetadata().getNamespace())
59-
.remove(snapshot.getMetadata().getName());
6084
}
6185

6286
private void initNamespaceSnapshotCounts(String ns) {
63-
parentMetricGroup
64-
.createResourceNamespaceGroup(configuration, FlinkStateSnapshot.class, ns)
65-
.gauge(COUNTER_NAME, () -> snapshots.get(ns).size());
87+
var mainGroup =
88+
parentMetricGroup.createResourceNamespaceGroup(
89+
configuration, FlinkStateSnapshot.class, ns);
90+
91+
mainGroup
92+
.addGroup(CHECKPOINT_GROUP_NAME)
93+
.gauge(
94+
COUNTER_GROUP_NAME,
95+
() -> {
96+
if (!checkpointStatuses.containsKey(ns)) {
97+
return 0;
98+
}
99+
return checkpointStatuses.get(ns).values().stream()
100+
.mapToInt(Set::size)
101+
.sum();
102+
});
103+
mainGroup
104+
.addGroup(SAVEPOINT_GROUP_NAME)
105+
.gauge(
106+
COUNTER_GROUP_NAME,
107+
() -> {
108+
if (!savepointStatuses.containsKey(ns)) {
109+
return 0;
110+
}
111+
return savepointStatuses.get(ns).values().stream()
112+
.mapToInt(Set::size)
113+
.sum();
114+
});
115+
}
116+
117+
private void initNamespaceSnapshotStates(String ns) {
118+
var mainGroup =
119+
parentMetricGroup.createResourceNamespaceGroup(
120+
configuration, FlinkStateSnapshot.class, ns);
121+
122+
for (var state : FlinkStateSnapshotStatus.State.values()) {
123+
mainGroup
124+
.addGroup(CHECKPOINT_GROUP_NAME)
125+
.addGroup(STATE_GROUP_NAME)
126+
.addGroup(state.toString())
127+
.gauge(
128+
COUNTER_GROUP_NAME,
129+
() ->
130+
Optional.ofNullable(checkpointStatuses.get(ns))
131+
.map(s -> s.get(state).size())
132+
.orElse(0));
133+
mainGroup
134+
.addGroup(SAVEPOINT_GROUP_NAME)
135+
.addGroup(STATE_GROUP_NAME)
136+
.addGroup(state.toString())
137+
.gauge(
138+
COUNTER_GROUP_NAME,
139+
() ->
140+
Optional.ofNullable(savepointStatuses.get(ns))
141+
.map(s -> s.get(state).size())
142+
.orElse(0));
143+
}
144+
}
145+
146+
private Map<FlinkStateSnapshotStatus.State, Set<String>> createSnapshotStateMap() {
147+
Map<FlinkStateSnapshotStatus.State, Set<String>> statuses = new ConcurrentHashMap<>();
148+
for (var state : FlinkStateSnapshotStatus.State.values()) {
149+
statuses.put(state, ConcurrentHashMap.newKeySet());
150+
}
151+
return statuses;
66152
}
67153
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public StatusRecorder(
6868
this.metricManager = metricManager;
6969
}
7070

71+
/**
72+
* Notifies status update listeners of changes made to a resource.
73+
*
74+
* @param resource resource that has been updated
75+
* @param prevStatus previous status of resource
76+
*/
7177
public void notifyListeners(CR resource, STATUS prevStatus) {
7278
statusUpdateListener.accept(resource, prevStatus);
7379
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
3636
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
3737
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
38+
import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
3839
import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
3940
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
4041
import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
@@ -56,6 +57,7 @@
5657
import javax.annotation.Nullable;
5758

5859
import java.time.Instant;
60+
import java.util.Map;
5961
import java.util.Optional;
6062
import java.util.function.BiConsumer;
6163

@@ -64,13 +66,15 @@
6466
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED;
6567
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
6668
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
69+
import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetricsUtils.assertSnapshotMetrics;
6770
import static org.assertj.core.api.Assertions.assertThat;
6871
import static org.junit.jupiter.api.Assertions.assertThrows;
6972

7073
/** Test class for {@link FlinkStateSnapshotController}. */
7174
@EnableKubernetesMockClient(crud = true)
7275
public class FlinkStateSnapshotControllerTest {
73-
private static final String SNAPSHOT_NAME = "snapshot-test";
76+
private static final String SAVEPOINT_NAME = "savepoint-test";
77+
private static final String CHECKPOINT_NAME = "checkpoint-test";
7478
private static final String SAVEPOINT_PATH = "/tmp/asd";
7579
private static final String JOB_ID = "fd72014d4c864993a2e5a9287b4a9c5d";
7680

@@ -81,6 +85,8 @@ public class FlinkStateSnapshotControllerTest {
8185
private FlinkStateSnapshotEventCollector flinkStateSnapshotEventCollector;
8286
private EventRecorder eventRecorder;
8387
private TestingFlinkResourceContextFactory ctxFactory;
88+
private TestingMetricListener listener;
89+
private MetricManager<FlinkStateSnapshot> metricManager;
8490
private StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> statusRecorder;
8591
private FlinkStateSnapshotController controller;
8692
private Context<FlinkStateSnapshot> context;
@@ -97,14 +103,20 @@ public void beforeEach() {
97103
TestUtils.createTestMetricGroup(new Configuration()),
98104
flinkService,
99105
eventRecorder);
100-
statusRecorder = new StatusRecorder<>(new MetricManager<>(), statusUpdateCounter);
106+
107+
listener = new TestingMetricListener(new Configuration());
108+
metricManager =
109+
MetricManager.createFlinkStateSnapshotMetricManager(
110+
new Configuration(), listener.getMetricGroup());
111+
statusRecorder = new StatusRecorder<>(metricManager, statusUpdateCounter);
101112
controller =
102113
new FlinkStateSnapshotController(
103114
ValidatorUtils.discoverValidators(configManager),
104115
ctxFactory,
105116
new StateSnapshotReconciler(ctxFactory, eventRecorder),
106117
new StateSnapshotObserver(ctxFactory, eventRecorder),
107118
eventRecorder,
119+
metricManager,
108120
statusRecorder);
109121
}
110122

@@ -514,8 +526,10 @@ public void testReconcileJobNotFound() {
514526
var snapshot = createSavepoint(deployment);
515527
var errorMessage =
516528
String.format(
517-
"Secondary resource %s (%s) for savepoint snapshot-test was not found",
518-
deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT);
529+
"Secondary resource %s (%s) for savepoint %s was not found",
530+
deployment.getMetadata().getName(),
531+
CrdConstants.KIND_FLINK_DEPLOYMENT,
532+
SAVEPOINT_NAME);
519533

520534
// First reconcile will trigger the snapshot.
521535
controller.reconcile(snapshot, TestUtils.createSnapshotContext(client, deployment));
@@ -556,8 +570,10 @@ public void testReconcileJobNotRunning() {
556570
var snapshot = createSavepoint(deployment);
557571
var errorMessage =
558572
String.format(
559-
"Secondary resource %s (%s) for savepoint snapshot-test is not running",
560-
deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT);
573+
"Secondary resource %s (%s) for savepoint %s is not running",
574+
deployment.getMetadata().getName(),
575+
CrdConstants.KIND_FLINK_DEPLOYMENT,
576+
SAVEPOINT_NAME);
561577

562578
controller.reconcile(snapshot, context);
563579

@@ -579,6 +595,37 @@ public void testReconcileJobNotRunning() {
579595
});
580596
}
581597

598+
@Test
599+
public void testMetrics() {
600+
var deployment = createDeployment();
601+
var savepoint = createSavepoint(deployment);
602+
savepoint.getSpec().getSavepoint().setDisposeOnDelete(false);
603+
var checkpoint = createCheckpoint(deployment, CheckpointType.FULL, 1);
604+
605+
context = TestUtils.createSnapshotContext(client, deployment);
606+
607+
controller.reconcile(savepoint, context);
608+
controller.reconcile(savepoint, context);
609+
controller.reconcile(savepoint, context);
610+
assertThat(savepoint.getStatus().getState()).isEqualTo(COMPLETED);
611+
612+
controller.reconcile(checkpoint, context);
613+
controller.reconcile(checkpoint, context);
614+
controller.reconcile(checkpoint, context);
615+
assertThat(checkpoint.getStatus().getState()).isEqualTo(COMPLETED);
616+
617+
assertSnapshotMetrics(
618+
listener, TestUtils.TEST_NAMESPACE, Map.of(COMPLETED, 1), Map.of(COMPLETED, 1));
619+
620+
// Remove savepoint
621+
assertDeleteControl(controller.cleanup(savepoint, context), true, null);
622+
assertSnapshotMetrics(listener, TestUtils.TEST_NAMESPACE, Map.of(), Map.of(COMPLETED, 1));
623+
624+
// Remove checkpoint
625+
assertDeleteControl(controller.cleanup(checkpoint, context), true, null);
626+
assertSnapshotMetrics(listener, TestUtils.TEST_NAMESPACE, Map.of(), Map.of());
627+
}
628+
582629
private FlinkStateSnapshot createSavepoint(FlinkDeployment deployment) {
583630
return createSavepoint(deployment, false, 7);
584631
}
@@ -591,8 +638,8 @@ private FlinkStateSnapshot createSavepoint(
591638
FlinkDeployment deployment, boolean alreadyExists, int backoffLimit) {
592639
var snapshot =
593640
TestUtils.buildFlinkStateSnapshotSavepoint(
594-
SNAPSHOT_NAME,
595-
"test",
641+
SAVEPOINT_NAME,
642+
TestUtils.TEST_NAMESPACE,
596643
SAVEPOINT_PATH,
597644
alreadyExists,
598645
deployment == null ? null : JobReference.fromFlinkResource(deployment));
@@ -606,8 +653,8 @@ private FlinkStateSnapshot createCheckpoint(
606653
FlinkDeployment deployment, CheckpointType checkpointType, int backoffLimit) {
607654
var snapshot =
608655
TestUtils.buildFlinkStateSnapshotCheckpoint(
609-
SNAPSHOT_NAME,
610-
"test",
656+
CHECKPOINT_NAME,
657+
TestUtils.TEST_NAMESPACE,
611658
checkpointType,
612659
JobReference.fromFlinkResource(deployment));
613660
snapshot.getSpec().setBackoffLimit(backoffLimit);

0 commit comments

Comments
 (0)