diff --git a/docs/content/docs/custom-resource/snapshots.md b/docs/content/docs/custom-resource/snapshots.md index 3c1771bc36..d40a7ca2de 100644 --- a/docs/content/docs/custom-resource/snapshots.md +++ b/docs/content/docs/custom-resource/snapshots.md @@ -127,7 +127,6 @@ This however requires the referenced Flink resource to be alive, as this operati This feature is not available for checkpoints. - ## Triggering snapshots Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections. @@ -208,3 +207,19 @@ Legacy savepoints found in FlinkDeployment/FlinkSessionJob CRs under the depreca - For max count and FlinkStateSnapshot resources **disabled**, it will be cleaned up when `savepointHistory` exceeds max count - For max count and FlinkStateSnapshot resources **enabled**, it will be cleaned up when `savepointHistory` + number of FlinkStateSnapshot CRs related to the job exceed max count + +## Advanced Snapshot Filtering + +At the end of each snapshot reconciliation phase, the operator will update its labels to reflect the latest status and spec of the resources. +This will allow the Kubernetes API server to filter snapshots without having to query all resources, since filtering by status or spec fields of custom resources is not supported in Kubernetes by default. +Example queries with label selectors using `kubectl`: +```shell +# Query all checkpoints +kubectl -n flink get flinksnp -l 'snapshot.type=CHECKPOINT' + +# Query all savepoints with states +kubectl -n flink get flinksnp -l 'snapshot.state in (COMPLETED,ABANDONED),snapshot.type=SAVEPOINT' + +# Query all savepoints/checkpoints with job reference +kubectl -n flink get flinksnp -l 'job-reference.kind=FlinkDeployment,job-reference.name=test-job' +``` \ No newline at end of file diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/CrdConstants.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/CrdConstants.java index ba68e5e16f..01e3dd5d50 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/CrdConstants.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/CrdConstants.java @@ -30,4 +30,8 @@ public class CrdConstants { public static final String EPHEMERAL_STORAGE = "ephemeral-storage"; public static final String LABEL_SNAPSHOT_TYPE = "snapshot.type"; + public static final String LABEL_SNAPSHOT_TRIGGER_TYPE = "snapshot.trigger-type"; + public static final String LABEL_SNAPSHOT_STATE = "snapshot.state"; + public static final String LABEL_SNAPSHOT_JOB_REFERENCE_KIND = "job-reference.kind"; + public static final String LABEL_SNAPSHOT_JOB_REFERENCE_NAME = "job-reference.name"; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java index 793b722b30..64c466195c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java @@ -33,6 +33,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; +import java.util.Map; import java.util.Optional; /** Context for reconciling a snapshot. */ @@ -42,6 +43,7 @@ public class FlinkStateSnapshotContext { private final FlinkStateSnapshot resource; private final FlinkStateSnapshotStatus originalStatus; + private final Map originalLabels; private final Context josdkContext; private final FlinkConfigManager configManager; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java index 1a705967d3..1a516c3d9f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -84,6 +85,8 @@ public UpdateControl reconcile( reconciler.reconcile(ctx); } + updateLabels(ctx); + notifyListenersAndMetricManager(ctx); return getUpdateControl(ctx); } @@ -157,13 +160,27 @@ public Map prepareEventSources( EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context)); } + /** + * Checks whether status and/or labels were changed on this resource, and returns an + * UpdateControl instance accordingly. Unless the snapshot state is terminal, the update control + * will be configured to reschedule the reconciliation. + * + * @param ctx snapshot context + * @return update control + */ private UpdateControl getUpdateControl(FlinkStateSnapshotContext ctx) { var resource = ctx.getResource(); - UpdateControl updateControl; - if (!ctx.getOriginalStatus().equals(resource.getStatus())) { + var updateControl = UpdateControl.noUpdate(); + + var labelsChanged = resourceLabelsChanged(ctx); + var statusChanged = resourceStatusChanged(ctx); + + if (labelsChanged && statusChanged) { + updateControl = UpdateControl.updateResourceAndPatchStatus(resource); + } else if (labelsChanged) { + updateControl = UpdateControl.updateResource(resource); + } else if (statusChanged) { updateControl = UpdateControl.patchStatus(resource); - } else { - updateControl = UpdateControl.noUpdate(); } switch (resource.getStatus().getState()) { @@ -177,7 +194,7 @@ private UpdateControl getUpdateControl(FlinkStateSnapshotCon } private void notifyListenersAndMetricManager(FlinkStateSnapshotContext ctx) { - if (!ctx.getOriginalStatus().equals(ctx.getResource().getStatus())) { + if (resourceStatusChanged(ctx)) { statusRecorder.notifyListeners(ctx.getResource(), ctx.getOriginalStatus()); } metricManager.onUpdate(ctx.getResource()); @@ -201,4 +218,38 @@ private boolean validateSnapshot(FlinkStateSnapshotContext ctx) { } return true; } + + /** + * Updates FlinkStateSnapshot resource labels with labels that represent its current state and + * spec. + * + * @param ctx snapshot context + */ + private void updateLabels(FlinkStateSnapshotContext ctx) { + var labels = new HashMap<>(ctx.getResource().getMetadata().getLabels()); + labels.putAll( + FlinkStateSnapshotUtils.getSnapshotLabels( + ctx.getResource(), ctx.getSecondaryResource())); + ctx.getResource().getMetadata().setLabels(labels); + } + + /** + * Checks if the resource status has changed since the start of reconciliation. + * + * @param ctx snapshot context + * @return true if resource status changed + */ + private boolean resourceStatusChanged(FlinkStateSnapshotContext ctx) { + return !ctx.getOriginalStatus().equals(ctx.getResource().getStatus()); + } + + /** + * Checks if the resource labels have changed since the start of reconciliation. + * + * @param ctx snapshot context + * @return true if resource labels changed + */ + private boolean resourceLabelsChanged(FlinkStateSnapshotContext ctx) { + return !ctx.getOriginalLabels().equals(ctx.getResource().getMetadata().getLabels()); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java index a09e8ffd90..21cad5e62b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java @@ -80,7 +80,7 @@ public Instant getLastPeriodicTriggerInstant( .getLabels() .get( CrdConstants - .LABEL_SNAPSHOT_TYPE))) + .LABEL_SNAPSHOT_TRIGGER_TYPE))) .map( s -> DateTimeUtils.parseKubernetes( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java index 58715c133a..d9d3688b20 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java @@ -35,6 +35,8 @@ import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.slf4j.Logger; @@ -76,7 +78,11 @@ public FlinkResourceContextFactory( public FlinkStateSnapshotContext getFlinkStateSnapshotContext( FlinkStateSnapshot savepoint, Context josdkContext) { return new FlinkStateSnapshotContext( - savepoint, savepoint.getStatus().toBuilder().build(), josdkContext, configManager); + savepoint, + savepoint.getStatus().toBuilder().build(), + ImmutableMap.copyOf(savepoint.getMetadata().getLabels()), + josdkContext, + configManager); } public > FlinkResourceContext getResourceContext( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index ecc6c5c96d..24ecb462d0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -61,7 +61,7 @@ public class EventSourceUtils { .map(Enum::name) .collect(Collectors.joining(",")); var labelSelector = - String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TYPE, labelFilters); + String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, labelFilters); var configuration = InformerConfiguration.from(FlinkStateSnapshot.class, context) .withLabelSelector(labelSelector) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java index 5cfbcb1e02..5e25816af2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java @@ -42,6 +42,8 @@ import javax.annotation.Nullable; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; @@ -66,7 +68,7 @@ protected static FlinkStateSnapshot createFlinkStateSnapshot( var metadata = new ObjectMeta(); metadata.setNamespace(namespace); metadata.setName(name); - metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name()); + metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, triggerType.name()); var snapshot = new FlinkStateSnapshot(); snapshot.setSpec(spec); @@ -84,7 +86,7 @@ protected static FlinkStateSnapshot createFlinkStateSnapshot( */ public static SnapshotTriggerType getSnapshotTriggerType(FlinkStateSnapshot snapshot) { var triggerTypeStr = - snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE); + snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE); try { return SnapshotTriggerType.valueOf(triggerTypeStr); } catch (NullPointerException | IllegalArgumentException e) { @@ -345,7 +347,9 @@ public static void snapshotSuccessful( public static void snapshotInProgress(FlinkStateSnapshot snapshot, String triggerId) { snapshot.getMetadata() .getLabels() - .putIfAbsent(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.MANUAL.name()); + .putIfAbsent( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + SnapshotTriggerType.MANUAL.name()); snapshot.getStatus().setState(IN_PROGRESS); snapshot.getStatus().setTriggerId(triggerId); snapshot.getStatus().setTriggerTimestamp(DateTimeUtils.kubernetes(Instant.now())); @@ -361,6 +365,52 @@ public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) { snapshot.getStatus().setState(TRIGGER_PENDING); } + /** + * Creates a map of labels that can be applied to a snapshot resource based on its current spec + * and status. As described in FLINK-36109, we should set up selectable spec fields instead of + * labels once the Kubernetes feature is GA and widely supported. + * + * @param snapshot snapshot instance + * @param secondaryResourceOpt optional referenced Flink resource + * @return map of auto-generated labels + */ + public static Map getSnapshotLabels( + FlinkStateSnapshot snapshot, + Optional> secondaryResourceOpt) { + var labels = new HashMap(); + labels.put( + CrdConstants.LABEL_SNAPSHOT_TYPE, + snapshot.getSpec().isSavepoint() + ? SnapshotType.SAVEPOINT.name() + : SnapshotType.CHECKPOINT.name()); + labels.put( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + snapshot.getMetadata() + .getLabels() + .getOrDefault( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + SnapshotTriggerType.MANUAL.name())); + + Optional.ofNullable(snapshot.getStatus()) + .ifPresent( + status -> + labels.put( + CrdConstants.LABEL_SNAPSHOT_STATE, + status.getState().name())); + + secondaryResourceOpt.ifPresent( + secondaryResource -> { + labels.put( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_KIND, + secondaryResource.getKind()); + labels.put( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_NAME, + secondaryResource.getMetadata().getName()); + }); + + return labels; + } + /** * Extracts the namespace of the job reference from a snapshot resource. This is either * explicitly specified in the job reference, or it will fallback to the namespace of the diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java index 593841b5e5..98c98a0e31 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.CrdConstants; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; @@ -38,6 +39,7 @@ import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener; import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.reconciler.SnapshotType; import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector; @@ -49,9 +51,11 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; @@ -63,6 +67,11 @@ import static org.apache.flink.api.common.JobStatus.CANCELED; import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.kubernetes.operator.api.CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_KIND; +import static org.apache.flink.kubernetes.operator.api.CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_NAME; +import static org.apache.flink.kubernetes.operator.api.CrdConstants.LABEL_SNAPSHOT_STATE; +import static org.apache.flink.kubernetes.operator.api.CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE; +import static org.apache.flink.kubernetes.operator.api.CrdConstants.LABEL_SNAPSHOT_TYPE; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED; @@ -161,6 +170,73 @@ public void testReconcileSavepointAlreadyExists(boolean jobReferenced) { assertThat(statusUpdateCounter.getCount()).isEqualTo(1); } + @ParameterizedTest + @EnumSource(SnapshotType.class) + public void testReconcileLabels(SnapshotType snapshotType) { + var deployment = createDeployment(); + context = TestUtils.createSnapshotContext(client, null); + FlinkStateSnapshot snapshot; + if (snapshotType == SnapshotType.SAVEPOINT) { + snapshot = createSavepoint(deployment); + } else { + snapshot = createCheckpoint(deployment, CheckpointType.FULL, 0); + } + + // First we have empty secondary resource, update labels but not status + assertThat(snapshot.getMetadata().getLabels()).isEmpty(); + assertUpdateControl(controller.reconcile(snapshot, context), true, false); + assertLabels(snapshot, null, snapshotType, SnapshotTriggerType.MANUAL, TRIGGER_PENDING); + + // Correct secondary resource, update status to IN_PROGRESS, update labels too + context = TestUtils.createSnapshotContext(client, deployment); + assertUpdateControl(controller.reconcile(snapshot, context), true, true); + assertLabels(snapshot, deployment, snapshotType, SnapshotTriggerType.MANUAL, IN_PROGRESS); + + // No update to status or labels + assertUpdateControl(controller.reconcile(snapshot, context), false, false); + assertLabels(snapshot, deployment, snapshotType, SnapshotTriggerType.MANUAL, IN_PROGRESS); + + // Update to both status and labels + assertUpdateControl(controller.reconcile(snapshot, context), true, true); + assertLabels(snapshot, deployment, snapshotType, SnapshotTriggerType.MANUAL, COMPLETED); + + // Try to manually modify label + snapshot.getMetadata().getLabels().put(LABEL_SNAPSHOT_TYPE, "custom-value"); + assertUpdateControl(controller.reconcile(snapshot, context), true, false); + assertLabels(snapshot, deployment, snapshotType, SnapshotTriggerType.MANUAL, COMPLETED); + } + + private void assertLabels( + FlinkStateSnapshot snapshot, + @Nullable AbstractFlinkResource secondaryResource, + SnapshotType snapshotType, + SnapshotTriggerType snapshotTriggerType, + FlinkStateSnapshotStatus.State state) { + assertThat(snapshot.getMetadata().getLabels().get(LABEL_SNAPSHOT_TYPE)) + .isEqualTo(snapshotType.name()); + assertThat(snapshot.getMetadata().getLabels().get(LABEL_SNAPSHOT_TRIGGER_TYPE)) + .isEqualTo(snapshotTriggerType.name()); + assertThat(snapshot.getMetadata().getLabels().get(LABEL_SNAPSHOT_STATE)) + .isEqualTo(state.name()); + if (secondaryResource == null) { + assertThat(snapshot.getMetadata().getLabels()) + .doesNotContainKey(LABEL_SNAPSHOT_JOB_REFERENCE_KIND); + assertThat(snapshot.getMetadata().getLabels()) + .doesNotContainKey(LABEL_SNAPSHOT_JOB_REFERENCE_NAME); + } else { + assertThat(snapshot.getMetadata().getLabels().get(LABEL_SNAPSHOT_JOB_REFERENCE_KIND)) + .isEqualTo(secondaryResource.getKind()); + assertThat(snapshot.getMetadata().getLabels().get(LABEL_SNAPSHOT_JOB_REFERENCE_NAME)) + .isEqualTo(secondaryResource.getMetadata().getName()); + } + } + + private void assertUpdateControl( + UpdateControl actual, boolean updateResource, boolean patchStatus) { + assertThat(actual.isUpdateResource()).isEqualTo(updateResource); + assertThat(actual.isPatchStatus()).isEqualTo(patchStatus); + } + @Test public void testReconcileSnapshotDeploymentDoesNotExist() { var deployment = createDeployment(); @@ -218,7 +294,7 @@ public void testReconcileNewSavepoint() { assertThat(status.getError()).isNull(); assertThat(status.getTriggerId()).isEqualTo("savepoint_trigger_0"); assertThat(status.getState()).isEqualTo(IN_PROGRESS); - assertThat(snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE)) + assertThat(snapshot.getMetadata().getLabels().get(LABEL_SNAPSHOT_TRIGGER_TYPE)) .isEqualTo(SnapshotTriggerType.MANUAL.name()); assertThat(statusUpdateCounter.getCount()).isEqualTo(1); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java index eebac80647..ed50072f87 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.java @@ -266,7 +266,10 @@ private static FlinkStateSnapshot createSnapshot( var metadata = new ObjectMetaBuilder() .withName(UUID.randomUUID().toString()) - .withLabels(Map.of(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name())) + .withLabels( + Map.of( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + triggerType.name())) .withCreationTimestamp( DateTimeUtils.kubernetes(Instant.ofEpochMilli(timestamp))) .build(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStoreTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStoreTest.java index d17e64fe22..3c1fe766e4 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStoreTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStoreTest.java @@ -135,7 +135,7 @@ private FlinkStateSnapshot createSnapshot( } snapshot.getMetadata() .getLabels() - .put(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name()); + .put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, triggerType.name()); snapshot.setStatus(new FlinkStateSnapshotStatus()); snapshot.getStatus().setState(COMPLETED); return snapshot; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 8bf19ff110..2c236b68cf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -310,7 +310,11 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception { assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()).isEqualTo("savepoint_0"); assertEquals( SnapshotTriggerType.UPGRADE.name(), - snapshots.get(0).getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE)); + snapshots + .get(0) + .getMetadata() + .getLabels() + .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); // Make sure jobId rotated on savepoint verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java index 9d4893023c..a208a00f80 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java @@ -179,7 +179,11 @@ private void testUpgradeToSavepoint(FlinkVersion flinkVersion, UpgradeMode fromU assertEquals("savepoint_0", snapshots.get(0).getSpec().getSavepoint().getPath()); assertEquals( SnapshotTriggerType.UPGRADE.name(), - snapshots.get(0).getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE)); + snapshots + .get(0) + .getMetadata() + .getLabels() + .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); } @ParameterizedTest diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java index c72b50826d..b001ed7739 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java @@ -410,7 +410,7 @@ public void testSavepointUpgrade(boolean legacySnapshots) throws Exception { .get(0) .getMetadata() .getLabels() - .get(CrdConstants.LABEL_SNAPSHOT_TYPE)); + .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); assertEquals( snapshots.get(0).getSpec().getSavepoint().getPath(), statefulSessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java index 7b05226bbc..0e9b7dfe8f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java @@ -22,15 +22,18 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.CrdConstants; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobKind; import org.apache.flink.kubernetes.operator.api.spec.JobReference; +import org.apache.flink.kubernetes.operator.api.status.CheckpointType; import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType; import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.reconciler.SnapshotType; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; @@ -40,12 +43,17 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.flink.api.common.JobStatus.FAILED; import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS; +import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.TEST_DEPLOYMENT_NAME; +import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.TEST_SESSION_JOB_NAME; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -68,25 +76,25 @@ public void testGetSnapshotTriggerType() { assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot)) .isEqualTo(SnapshotTriggerType.UNKNOWN); - snapshot.getMetadata().getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, ""); + snapshot.getMetadata().getLabels().put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, ""); assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot)) .isEqualTo(SnapshotTriggerType.UNKNOWN); snapshot.getMetadata() .getLabels() - .put(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.MANUAL.name()); + .put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, SnapshotTriggerType.MANUAL.name()); assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot)) .isEqualTo(SnapshotTriggerType.MANUAL); snapshot.getMetadata() .getLabels() - .put(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.UPGRADE.name()); + .put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, SnapshotTriggerType.UPGRADE.name()); assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot)) .isEqualTo(SnapshotTriggerType.UPGRADE); snapshot.getMetadata() .getLabels() - .put(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.PERIODIC.name()); + .put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, SnapshotTriggerType.PERIODIC.name()); assertThat(FlinkStateSnapshotUtils.getSnapshotTriggerType(snapshot)) .isEqualTo(SnapshotTriggerType.PERIODIC); } @@ -304,6 +312,86 @@ public void testAbandonSnapshotIfJobNotRunningJobDeleted() { }); } + @Test + public void testGetSnapshotLabels() { + var snapshot = initSavepoint(IN_PROGRESS, null); + assertThat(FlinkStateSnapshotUtils.getSnapshotLabels(snapshot, Optional.empty())) + .containsExactlyInAnyOrderEntriesOf( + Map.ofEntries( + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TYPE, + SnapshotType.SAVEPOINT.name()), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + SnapshotTriggerType.MANUAL.name()), + Map.entry(CrdConstants.LABEL_SNAPSHOT_STATE, IN_PROGRESS.name()))); + + var deployment = initDeployment(); + snapshot = initCheckpoint(COMPLETED, JobReference.fromFlinkResource(deployment)); + assertThat(FlinkStateSnapshotUtils.getSnapshotLabels(snapshot, Optional.of(deployment))) + .containsExactlyInAnyOrderEntriesOf( + Map.ofEntries( + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TYPE, + SnapshotType.CHECKPOINT.name()), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + SnapshotTriggerType.MANUAL.name()), + Map.entry(CrdConstants.LABEL_SNAPSHOT_STATE, COMPLETED.name()), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_KIND, + "FlinkDeployment"), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_NAME, + TEST_DEPLOYMENT_NAME))); + + // Null status should be handled correctly + snapshot.setStatus(null); + assertThat(FlinkStateSnapshotUtils.getSnapshotLabels(snapshot, Optional.of(deployment))) + .containsExactlyInAnyOrderEntriesOf( + Map.ofEntries( + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TYPE, + SnapshotType.CHECKPOINT.name()), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + SnapshotTriggerType.MANUAL.name()), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_KIND, + "FlinkDeployment"), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_NAME, + TEST_DEPLOYMENT_NAME))); + + var sessionJob = initFlinkSessionJob(); + snapshot = initCheckpoint(ABANDONED, JobReference.fromFlinkResource(sessionJob)); + assertThat(FlinkStateSnapshotUtils.getSnapshotLabels(snapshot, Optional.of(sessionJob))) + .containsExactlyInAnyOrderEntriesOf( + Map.ofEntries( + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TYPE, + SnapshotType.CHECKPOINT.name()), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + SnapshotTriggerType.MANUAL.name()), + Map.entry(CrdConstants.LABEL_SNAPSHOT_STATE, ABANDONED.name()), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_KIND, + "FlinkSessionJob"), + Map.entry( + CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_NAME, + TEST_SESSION_JOB_NAME))); + + // Trigger type should not be overridden + snapshot.getMetadata() + .getLabels() + .put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, SnapshotTriggerType.UPGRADE.name()); + assertThat(FlinkStateSnapshotUtils.getSnapshotLabels(snapshot, Optional.of(deployment))) + .containsEntry( + CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, + SnapshotTriggerType.UPGRADE.name()); + } + private void assertSavepointResource( FlinkStateSnapshot snapshot, FlinkDeployment deployment, @@ -313,7 +401,7 @@ private void assertSavepointResource( boolean expectedAlreadyExists) { assertEquals( triggerType.name(), - snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE)); + snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); assertTrue(snapshot.getSpec().isSavepoint()); assertEquals(SAVEPOINT_PATH, snapshot.getSpec().getSavepoint().getPath()); assertEquals(expectedFormatType, snapshot.getSpec().getSavepoint().getFormatType()); @@ -332,7 +420,7 @@ private void assertCheckpointResource( SnapshotTriggerType triggerType) { assertEquals( triggerType.name(), - snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE)); + snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); assertTrue(snapshot.getSpec().isCheckpoint()); assertEquals( @@ -341,13 +429,19 @@ private void assertCheckpointResource( } private static FlinkDeployment initDeployment() { - FlinkDeployment deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_19); + var deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_19); deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING); deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); reconcileSpec(deployment); return deployment; } + private static FlinkSessionJob initFlinkSessionJob() { + var sessionJob = TestUtils.buildSessionJob(); + sessionJob.getStatus().getJobStatus().setState(JobStatus.RUNNING); + return sessionJob; + } + private static FlinkStateSnapshot initSavepoint( FlinkStateSnapshotStatus.State snapshotState, JobReference jobReference) { var snapshot = @@ -361,4 +455,14 @@ private static FlinkStateSnapshot initSavepoint( return snapshot; } + + private static FlinkStateSnapshot initCheckpoint( + FlinkStateSnapshotStatus.State snapshotState, JobReference jobReference) { + var snapshot = + TestUtils.buildFlinkStateSnapshotCheckpoint( + SAVEPOINT_NAME, NAMESPACE, CheckpointType.FULL, jobReference); + snapshot.setStatus(FlinkStateSnapshotStatus.builder().state(snapshotState).build()); + + return snapshot; + } }