diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java index ca580c3728..402e7a3dcd 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java @@ -35,7 +35,9 @@ public enum ResourceLifecycleState { STABLE(true, "The resource deployment is considered to be stable and won’t be rolled back"), ROLLING_BACK(false, "The resource is being rolled back to the last stable spec"), ROLLED_BACK(true, "The resource is deployed with the last stable spec"), - FAILED(true, "The job terminally failed"); + FAILED(true, "The job terminally failed"), + DELETING(false, "The resource is being deleted"), + DELETED(true, "The resource is deleted"); @JsonIgnore private final boolean terminal; @JsonIgnore @Getter private final String description; diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java index 9d8c6475c8..34c8cc99a3 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java @@ -60,6 +60,11 @@ public abstract class CommonStatus { public abstract ReconciliationStatus getReconciliationStatus(); public ResourceLifecycleState getLifecycleState() { + if (ResourceLifecycleState.DELETING == lifecycleState + || ResourceLifecycleState.DELETED == lifecycleState) { + return lifecycleState; + } + var reconciliationStatus = getReconciliationStatus(); if (reconciliationStatus.isBeforeFirstDeployment()) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 328c2fb7b5..ac0f7356e1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; @@ -104,6 +105,7 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) { "Cleaning up FlinkDeployment", josdkContext.getClient()); statusRecorder.updateStatusFromCache(flinkApp); + flinkApp.getStatus().setLifecycleState(ResourceLifecycleState.DELETING); var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext); try { observerFactory.getOrCreate(flinkApp).observe(ctx); @@ -113,7 +115,8 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) { var deleteControl = reconcilerFactory.getOrCreate(flinkApp).cleanup(ctx); if (deleteControl.isRemoveFinalizer()) { - statusRecorder.removeCachedStatus(flinkApp); + flinkApp.getStatus().setLifecycleState(ResourceLifecycleState.DELETED); + statusRecorder.cleanupForDeletion(flinkApp); ctxFactory.cleanup(flinkApp); } else { statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient()); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index a88a2963db..1e818d6659 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -20,6 +20,7 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.health.CanaryResourceManager; @@ -145,6 +146,8 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) { EventRecorder.Component.Operator, "Cleaning up FlinkSessionJob", josdkContext.getClient()); + statusRecorder.updateStatusFromCache(sessionJob); + sessionJob.getStatus().setLifecycleState(ResourceLifecycleState.DELETING); var ctx = ctxFactory.getResourceContext(sessionJob, josdkContext); try { observer.observe(ctx); @@ -154,8 +157,9 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) { var deleteControl = reconciler.cleanup(ctx); if (deleteControl.isRemoveFinalizer()) { + sessionJob.getStatus().setLifecycleState(ResourceLifecycleState.DELETED); ctxFactory.cleanup(sessionJob); - statusRecorder.removeCachedStatus(sessionJob); + statusRecorder.cleanupForDeletion(sessionJob); } else { statusRecorder.patchAndCacheStatus(sessionJob, ctx.getKubernetesClient()); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java index 1c4eb50037..8b4fc630d4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java @@ -98,19 +98,7 @@ public void patchAndCacheStatus(CR resource, KubernetesClient client) { return; } - Class statusClass; - if (resource instanceof FlinkDeployment) { - statusClass = FlinkDeploymentStatus.class; - } else if (resource instanceof FlinkSessionJob) { - statusClass = FlinkSessionJobStatus.class; - } else if (resource instanceof FlinkStateSnapshot) { - statusClass = FlinkStateSnapshotStatus.class; - } else { - throw new RuntimeException( - String.format("Resource is unknown class: %s", resource.getClass())); - } - - var prevStatus = (STATUS) objectMapper.convertValue(previousStatusNode, statusClass); + var prevStatus = convertPreviousStatus(resource, previousStatusNode); Exception err = null; for (int i = 0; i < 3; i++) { @@ -134,6 +122,21 @@ public void patchAndCacheStatus(CR resource, KubernetesClient client) { metricManager.onUpdate(resource); } + private STATUS convertPreviousStatus(CR resource, ObjectNode previousStatusNode) { + Class statusClass; + if (resource instanceof FlinkDeployment) { + statusClass = FlinkDeploymentStatus.class; + } else if (resource instanceof FlinkSessionJob) { + statusClass = FlinkSessionJobStatus.class; + } else if (resource instanceof FlinkStateSnapshot) { + statusClass = FlinkStateSnapshotStatus.class; + } else { + throw new RuntimeException( + String.format("Resource is unknown class: %s", resource.getClass())); + } + return (STATUS) objectMapper.convertValue(previousStatusNode, statusClass); + } + private void replaceStatus(CR resource, STATUS prevStatus, KubernetesClient client) throws JsonProcessingException { int retries = 0; @@ -240,13 +243,15 @@ public void updateStatusFromCache(CR resource) { } /** - * Remove cached status for Flink resource. + * Clean up resource after deletion and send a last status update. * * @param resource Flink resource. */ - public void removeCachedStatus(CR resource) { - statusCache.remove(ResourceID.fromResource(resource)); + public void cleanupForDeletion(CR resource) { + var prevJson = statusCache.remove(ResourceID.fromResource(resource)); + var prevStatus = convertPreviousStatus(resource, prevJson); metricManager.onRemove(resource); + statusUpdateListener.accept(resource, prevStatus); } public static , CR extends AbstractFlinkResource> diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java index 4b07e3353c..7673060890 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java @@ -63,7 +63,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH; import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.IMAGE; @@ -934,13 +933,4 @@ private PodTemplateSpec createTestPodWithContainers() { TestUtils.getTestPodTemplate("hostname", List.of(mainContainer, sideCarContainer)); return pod; } - - private static Stream serviceExposedTypes() { - return Stream.of( - null, - KubernetesConfigOptions.ServiceExposedType.ClusterIP, - KubernetesConfigOptions.ServiceExposedType.LoadBalancer, - KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP, - KubernetesConfigOptions.ServiceExposedType.NodePort); - } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 08937e70b4..0354a1256e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -153,6 +153,17 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception assertEquals( appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec(), appCluster.getStatus().getReconciliationStatus().getLastStableSpec()); + + testController.cleanup(appCluster, context); + // Make sure status is recorded and sent out at the end of cleanup + assertEquals( + ResourceLifecycleState.DELETED, + testController + .getStatusUpdateCounter() + .currentResource + .getStatus() + .getLifecycleState()); + assertEquals(ResourceLifecycleState.DELETED, appCluster.getStatus().getLifecycleState()); } @ParameterizedTest diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index 82ccaea51d..0a5490890f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; @@ -652,6 +653,14 @@ public void testCancelJobNotFound() throws Exception { assertEquals(CANCELLING, sessionJob.getStatus().getJobStatus().getState()); assertFalse(deleteControl.isRemoveFinalizer()); + assertEquals( + ResourceLifecycleState.DELETING, + testController + .getStatusUpdateCounter() + .currentResource + .getStatus() + .getLifecycleState()); + assertEquals(ResourceLifecycleState.DELETING, sessionJob.getStatus().getLifecycleState()); assertEquals( configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis(), deleteControl.getScheduleDelay().get()); @@ -660,6 +669,14 @@ public void testCancelJobNotFound() throws Exception { flinkService.setFlinkJobNotFound(true); deleteControl = testController.cleanup(sessionJob, context); assertTrue(deleteControl.isRemoveFinalizer()); + assertEquals( + ResourceLifecycleState.DELETED, + testController + .getStatusUpdateCounter() + .currentResource + .getStatus() + .getLifecycleState()); + assertEquals(ResourceLifecycleState.DELETED, sessionJob.getStatus().getLifecycleState()); } private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob sessionJob) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 0ee38d7011..09885b965a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -69,7 +69,7 @@ public class TestingFlinkDeploymentController @Getter private ReconcilerFactory reconcilerFactory; private FlinkDeploymentController flinkDeploymentController; - private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter(); + @Getter private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter(); private FlinkResourceEventCollector flinkResourceEventCollector = new FlinkResourceEventCollector(); @@ -174,11 +174,12 @@ public Queue flinkResourceEvents() { return flinkResourceEventCollector.events; } - private static class StatusUpdateCounter + /** Test status consumer. */ + protected static class StatusUpdateCounter implements BiConsumer { - private FlinkDeployment currentResource; - private int counter; + FlinkDeployment currentResource; + int counter; @Override public void accept( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java index 7c116e45ca..2b200de1de 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java @@ -65,8 +65,11 @@ public class TestingFlinkSessionJobController @Getter private CanaryResourceManager canaryResourceManager; private FlinkSessionJobController flinkSessionJobController; + + @Getter private TestingFlinkSessionJobController.StatusUpdateCounter statusUpdateCounter = new TestingFlinkSessionJobController.StatusUpdateCounter(); + private FlinkResourceEventCollector flinkResourceEventCollector = new FlinkResourceEventCollector(); private EventRecorder eventRecorder; @@ -161,10 +164,11 @@ public Queue events() { return flinkResourceEventCollector.events; } - private static class StatusUpdateCounter + /** Test status consumer. */ + protected static class StatusUpdateCounter implements BiConsumer { - private FlinkSessionJob currentResource; + FlinkSessionJob currentResource; private int counter; @Override diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index c94c13871e..2f851e5ce2 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -10378,6 +10378,8 @@ spec: lifecycleState: enum: - CREATED + - DELETED + - DELETING - DEPLOYED - FAILED - ROLLED_BACK diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml index 93515bf143..f66d67ea72 100644 --- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml @@ -220,6 +220,8 @@ spec: lifecycleState: enum: - CREATED + - DELETED + - DELETING - DEPLOYED - FAILED - ROLLED_BACK