Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ void registerSnapshotController() {
reconciler,
observer,
eventRecorder,
metricManager,
statusRecorder);
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class FlinkStateSnapshotController
private final StateSnapshotReconciler reconciler;
private final StateSnapshotObserver observer;
private final EventRecorder eventRecorder;
private final MetricManager<FlinkStateSnapshot> metricManager;
private final StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> statusRecorder;

@Override
Expand All @@ -82,7 +84,7 @@ public UpdateControl<FlinkStateSnapshot> reconcile(
reconciler.reconcile(ctx);
}

notifyListeners(ctx);
notifyListenersAndMetricManager(ctx);
return getUpdateControl(ctx);
}

Expand All @@ -91,6 +93,7 @@ public DeleteControl cleanup(
FlinkStateSnapshot flinkStateSnapshot, Context<FlinkStateSnapshot> josdkContext) {
var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, josdkContext);
try {
metricManager.onRemove(flinkStateSnapshot);
return reconciler.cleanup(ctx);
} catch (Exception e) {
eventRecorder.triggerSnapshotEvent(
Expand Down Expand Up @@ -131,7 +134,7 @@ public ErrorStatusUpdateControl<FlinkStateSnapshot> updateErrorStatus(
LOG.info(
"Snapshot {} failed and won't be retried as failure count exceeded the backoff limit",
resource.getMetadata().getName());
notifyListeners(ctx);
notifyListenersAndMetricManager(ctx);
return ErrorStatusUpdateControl.patchStatus(resource).withNoRetry();
}

Expand All @@ -142,7 +145,7 @@ public ErrorStatusUpdateControl<FlinkStateSnapshot> updateErrorStatus(
retrySeconds);
FlinkStateSnapshotUtils.snapshotTriggerPending(resource);

notifyListeners(ctx);
notifyListenersAndMetricManager(ctx);
return ErrorStatusUpdateControl.patchStatus(resource)
.rescheduleAfter(Duration.ofSeconds(retrySeconds));
}
Expand Down Expand Up @@ -173,10 +176,11 @@ private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotCon
}
}

private void notifyListeners(FlinkStateSnapshotContext ctx) {
private void notifyListenersAndMetricManager(FlinkStateSnapshotContext ctx) {
if (!ctx.getOriginalStatus().equals(ctx.getResource().getStatus())) {
statusRecorder.notifyListeners(ctx.getResource(), ctx.getOriginalStatus());
}
metricManager.onUpdate(ctx.getResource());
}

private boolean validateSnapshot(FlinkStateSnapshotContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -29,8 +31,14 @@ public class FlinkStateSnapshotMetrics implements CustomResourceMetrics<FlinkSta

private final KubernetesOperatorMetricGroup parentMetricGroup;
private final Configuration configuration;
private final Map<String, Set<String>> snapshots = new ConcurrentHashMap<>();
public static final String COUNTER_NAME = "Count";
private final Map<String, Map<FlinkStateSnapshotStatus.State, Set<String>>> checkpointStatuses =
new ConcurrentHashMap<>();
private final Map<String, Map<FlinkStateSnapshotStatus.State, Set<String>>> savepointStatuses =
new ConcurrentHashMap<>();
public static final String COUNTER_GROUP_NAME = "Count";
public static final String STATE_GROUP_NAME = "State";
public static final String CHECKPOINT_GROUP_NAME = "Checkpoint";
public static final String SAVEPOINT_GROUP_NAME = "Savepoint";

public FlinkStateSnapshotMetrics(
KubernetesOperatorMetricGroup parentMetricGroup, Configuration configuration) {
Expand All @@ -39,29 +47,107 @@ public FlinkStateSnapshotMetrics(
}

public void onUpdate(FlinkStateSnapshot snapshot) {
if (snapshot.getStatus() == null || snapshot.getStatus().getState() == null) {
return;
}

onRemove(snapshot);
snapshots
getSnapshotMap(snapshot)
.computeIfAbsent(
snapshot.getMetadata().getNamespace(),
ns -> {
initNamespaceSnapshotStates(ns);
initNamespaceSnapshotCounts(ns);
return ConcurrentHashMap.newKeySet();
return createSnapshotStateMap();
})
.get(snapshot.getStatus().getState())
.add(snapshot.getMetadata().getName());
}

public void onRemove(FlinkStateSnapshot snapshot) {
if (!snapshots.containsKey(snapshot.getMetadata().getNamespace())) {
return;
var namespace = snapshot.getMetadata().getNamespace();
var name = snapshot.getMetadata().getName();
var snapshotMap = getSnapshotMap(snapshot);

if (snapshotMap.containsKey(namespace)) {
snapshotMap.get(namespace).values().forEach(names -> names.remove(name));
}
}

private Map<String, Map<FlinkStateSnapshotStatus.State, Set<String>>> getSnapshotMap(
FlinkStateSnapshot snapshot) {
if (snapshot.getSpec().isSavepoint()) {
return savepointStatuses;
} else {
return checkpointStatuses;
}
snapshots
.get(snapshot.getMetadata().getNamespace())
.remove(snapshot.getMetadata().getName());
}

private void initNamespaceSnapshotCounts(String ns) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkStateSnapshot.class, ns)
.gauge(COUNTER_NAME, () -> snapshots.get(ns).size());
var mainGroup =
parentMetricGroup.createResourceNamespaceGroup(
configuration, FlinkStateSnapshot.class, ns);

mainGroup
.addGroup(CHECKPOINT_GROUP_NAME)
.gauge(
COUNTER_GROUP_NAME,
() -> {
if (!checkpointStatuses.containsKey(ns)) {
return 0;
}
return checkpointStatuses.get(ns).values().stream()
.mapToInt(Set::size)
.sum();
});
mainGroup
.addGroup(SAVEPOINT_GROUP_NAME)
.gauge(
COUNTER_GROUP_NAME,
() -> {
if (!savepointStatuses.containsKey(ns)) {
return 0;
}
return savepointStatuses.get(ns).values().stream()
.mapToInt(Set::size)
.sum();
});
}

private void initNamespaceSnapshotStates(String ns) {
var mainGroup =
parentMetricGroup.createResourceNamespaceGroup(
configuration, FlinkStateSnapshot.class, ns);

for (var state : FlinkStateSnapshotStatus.State.values()) {
mainGroup
.addGroup(CHECKPOINT_GROUP_NAME)
.addGroup(STATE_GROUP_NAME)
.addGroup(state.toString())
.gauge(
COUNTER_GROUP_NAME,
() ->
Optional.ofNullable(checkpointStatuses.get(ns))
.map(s -> s.get(state).size())
.orElse(0));
mainGroup
.addGroup(SAVEPOINT_GROUP_NAME)
.addGroup(STATE_GROUP_NAME)
.addGroup(state.toString())
.gauge(
COUNTER_GROUP_NAME,
() ->
Optional.ofNullable(savepointStatuses.get(ns))
.map(s -> s.get(state).size())
.orElse(0));
}
}

private Map<FlinkStateSnapshotStatus.State, Set<String>> createSnapshotStateMap() {
Map<FlinkStateSnapshotStatus.State, Set<String>> statuses = new ConcurrentHashMap<>();
for (var state : FlinkStateSnapshotStatus.State.values()) {
statuses.put(state, ConcurrentHashMap.newKeySet());
}
return statuses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public StatusRecorder(
this.metricManager = metricManager;
}

/**
* Notifies status update listeners of changes made to a resource.
*
* @param resource resource that has been updated
* @param prevStatus previous status of resource
*/
public void notifyListeners(CR resource, STATUS prevStatus) {
statusUpdateListener.accept(resource, prevStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
Expand All @@ -56,6 +57,7 @@
import javax.annotation.Nullable;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

Expand All @@ -64,13 +66,15 @@
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
import static org.apache.flink.kubernetes.operator.metrics.FlinkStateSnapshotMetricsUtils.assertSnapshotMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Test class for {@link FlinkStateSnapshotController}. */
@EnableKubernetesMockClient(crud = true)
public class FlinkStateSnapshotControllerTest {
private static final String SNAPSHOT_NAME = "snapshot-test";
private static final String SAVEPOINT_NAME = "savepoint-test";
private static final String CHECKPOINT_NAME = "checkpoint-test";
private static final String SAVEPOINT_PATH = "/tmp/asd";
private static final String JOB_ID = "fd72014d4c864993a2e5a9287b4a9c5d";

Expand All @@ -81,6 +85,8 @@ public class FlinkStateSnapshotControllerTest {
private FlinkStateSnapshotEventCollector flinkStateSnapshotEventCollector;
private EventRecorder eventRecorder;
private TestingFlinkResourceContextFactory ctxFactory;
private TestingMetricListener listener;
private MetricManager<FlinkStateSnapshot> metricManager;
private StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> statusRecorder;
private FlinkStateSnapshotController controller;
private Context<FlinkStateSnapshot> context;
Expand All @@ -97,14 +103,20 @@ public void beforeEach() {
TestUtils.createTestMetricGroup(new Configuration()),
flinkService,
eventRecorder);
statusRecorder = new StatusRecorder<>(new MetricManager<>(), statusUpdateCounter);

listener = new TestingMetricListener(new Configuration());
metricManager =
MetricManager.createFlinkStateSnapshotMetricManager(
new Configuration(), listener.getMetricGroup());
statusRecorder = new StatusRecorder<>(metricManager, statusUpdateCounter);
controller =
new FlinkStateSnapshotController(
ValidatorUtils.discoverValidators(configManager),
ctxFactory,
new StateSnapshotReconciler(ctxFactory, eventRecorder),
new StateSnapshotObserver(ctxFactory, eventRecorder),
eventRecorder,
metricManager,
statusRecorder);
}

Expand Down Expand Up @@ -514,8 +526,10 @@ public void testReconcileJobNotFound() {
var snapshot = createSavepoint(deployment);
var errorMessage =
String.format(
"Secondary resource %s (%s) for savepoint snapshot-test was not found",
deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT);
"Secondary resource %s (%s) for savepoint %s was not found",
deployment.getMetadata().getName(),
CrdConstants.KIND_FLINK_DEPLOYMENT,
SAVEPOINT_NAME);

// First reconcile will trigger the snapshot.
controller.reconcile(snapshot, TestUtils.createSnapshotContext(client, deployment));
Expand Down Expand Up @@ -556,8 +570,10 @@ public void testReconcileJobNotRunning() {
var snapshot = createSavepoint(deployment);
var errorMessage =
String.format(
"Secondary resource %s (%s) for savepoint snapshot-test is not running",
deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT);
"Secondary resource %s (%s) for savepoint %s is not running",
deployment.getMetadata().getName(),
CrdConstants.KIND_FLINK_DEPLOYMENT,
SAVEPOINT_NAME);

controller.reconcile(snapshot, context);

Expand All @@ -579,6 +595,37 @@ public void testReconcileJobNotRunning() {
});
}

@Test
public void testMetrics() {
var deployment = createDeployment();
var savepoint = createSavepoint(deployment);
savepoint.getSpec().getSavepoint().setDisposeOnDelete(false);
var checkpoint = createCheckpoint(deployment, CheckpointType.FULL, 1);

context = TestUtils.createSnapshotContext(client, deployment);

controller.reconcile(savepoint, context);
controller.reconcile(savepoint, context);
controller.reconcile(savepoint, context);
assertThat(savepoint.getStatus().getState()).isEqualTo(COMPLETED);

controller.reconcile(checkpoint, context);
controller.reconcile(checkpoint, context);
controller.reconcile(checkpoint, context);
assertThat(checkpoint.getStatus().getState()).isEqualTo(COMPLETED);

assertSnapshotMetrics(
listener, TestUtils.TEST_NAMESPACE, Map.of(COMPLETED, 1), Map.of(COMPLETED, 1));

// Remove savepoint
assertDeleteControl(controller.cleanup(savepoint, context), true, null);
assertSnapshotMetrics(listener, TestUtils.TEST_NAMESPACE, Map.of(), Map.of(COMPLETED, 1));

// Remove checkpoint
assertDeleteControl(controller.cleanup(checkpoint, context), true, null);
assertSnapshotMetrics(listener, TestUtils.TEST_NAMESPACE, Map.of(), Map.of());
}

private FlinkStateSnapshot createSavepoint(FlinkDeployment deployment) {
return createSavepoint(deployment, false, 7);
}
Expand All @@ -591,8 +638,8 @@ private FlinkStateSnapshot createSavepoint(
FlinkDeployment deployment, boolean alreadyExists, int backoffLimit) {
var snapshot =
TestUtils.buildFlinkStateSnapshotSavepoint(
SNAPSHOT_NAME,
"test",
SAVEPOINT_NAME,
TestUtils.TEST_NAMESPACE,
SAVEPOINT_PATH,
alreadyExists,
deployment == null ? null : JobReference.fromFlinkResource(deployment));
Expand All @@ -606,8 +653,8 @@ private FlinkStateSnapshot createCheckpoint(
FlinkDeployment deployment, CheckpointType checkpointType, int backoffLimit) {
var snapshot =
TestUtils.buildFlinkStateSnapshotCheckpoint(
SNAPSHOT_NAME,
"test",
CHECKPOINT_NAME,
TestUtils.TEST_NAMESPACE,
checkpointType,
JobReference.fromFlinkResource(deployment));
snapshot.getSpec().setBackoffLimit(backoffLimit);
Expand Down
Loading
Loading