Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public enum ResourceLifecycleState {
STABLE(true, "The resource deployment is considered to be stable and won’t be rolled back"),
ROLLING_BACK(false, "The resource is being rolled back to the last stable spec"),
ROLLED_BACK(true, "The resource is deployed with the last stable spec"),
FAILED(true, "The job terminally failed");
FAILED(true, "The job terminally failed"),
DELETING(false, "The resource is being deleted"),
DELETED(true, "The resource is deleted");

@JsonIgnore private final boolean terminal;
@JsonIgnore @Getter private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
public abstract ReconciliationStatus<SPEC> getReconciliationStatus();

public ResourceLifecycleState getLifecycleState() {
if (ResourceLifecycleState.DELETING == lifecycleState
|| ResourceLifecycleState.DELETED == lifecycleState) {
return lifecycleState;
}

var reconciliationStatus = getReconciliationStatus();

if (reconciliationStatus.isBeforeFirstDeployment()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
Expand Down Expand Up @@ -104,6 +105,7 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {
"Cleaning up FlinkDeployment",
josdkContext.getClient());
statusRecorder.updateStatusFromCache(flinkApp);
flinkApp.getStatus().setLifecycleState(ResourceLifecycleState.DELETING);
var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext);
try {
observerFactory.getOrCreate(flinkApp).observe(ctx);
Expand All @@ -113,7 +115,8 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {

var deleteControl = reconcilerFactory.getOrCreate(flinkApp).cleanup(ctx);
if (deleteControl.isRemoveFinalizer()) {
statusRecorder.removeCachedStatus(flinkApp);
flinkApp.getStatus().setLifecycleState(ResourceLifecycleState.DELETED);
statusRecorder.cleanupForDeletion(flinkApp);
ctxFactory.cleanup(flinkApp);
} else {
statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
Expand Down Expand Up @@ -145,6 +146,8 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) {
EventRecorder.Component.Operator,
"Cleaning up FlinkSessionJob",
josdkContext.getClient());
statusRecorder.updateStatusFromCache(sessionJob);
sessionJob.getStatus().setLifecycleState(ResourceLifecycleState.DELETING);
var ctx = ctxFactory.getResourceContext(sessionJob, josdkContext);
try {
observer.observe(ctx);
Expand All @@ -154,8 +157,9 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) {

var deleteControl = reconciler.cleanup(ctx);
if (deleteControl.isRemoveFinalizer()) {
sessionJob.getStatus().setLifecycleState(ResourceLifecycleState.DELETED);
ctxFactory.cleanup(sessionJob);
statusRecorder.removeCachedStatus(sessionJob);
statusRecorder.cleanupForDeletion(sessionJob);
} else {
statusRecorder.patchAndCacheStatus(sessionJob, ctx.getKubernetesClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,7 @@ public void patchAndCacheStatus(CR resource, KubernetesClient client) {
return;
}

Class<?> statusClass;
if (resource instanceof FlinkDeployment) {
statusClass = FlinkDeploymentStatus.class;
} else if (resource instanceof FlinkSessionJob) {
statusClass = FlinkSessionJobStatus.class;
} else if (resource instanceof FlinkStateSnapshot) {
statusClass = FlinkStateSnapshotStatus.class;
} else {
throw new RuntimeException(
String.format("Resource is unknown class: %s", resource.getClass()));
}

var prevStatus = (STATUS) objectMapper.convertValue(previousStatusNode, statusClass);
var prevStatus = convertPreviousStatus(resource, previousStatusNode);

Exception err = null;
for (int i = 0; i < 3; i++) {
Expand All @@ -134,6 +122,21 @@ public void patchAndCacheStatus(CR resource, KubernetesClient client) {
metricManager.onUpdate(resource);
}

private STATUS convertPreviousStatus(CR resource, ObjectNode previousStatusNode) {
Class<?> statusClass;
if (resource instanceof FlinkDeployment) {
statusClass = FlinkDeploymentStatus.class;
} else if (resource instanceof FlinkSessionJob) {
statusClass = FlinkSessionJobStatus.class;
} else if (resource instanceof FlinkStateSnapshot) {
statusClass = FlinkStateSnapshotStatus.class;
} else {
throw new RuntimeException(
String.format("Resource is unknown class: %s", resource.getClass()));
}
return (STATUS) objectMapper.convertValue(previousStatusNode, statusClass);
}

private void replaceStatus(CR resource, STATUS prevStatus, KubernetesClient client)
throws JsonProcessingException {
int retries = 0;
Expand Down Expand Up @@ -240,13 +243,15 @@ public void updateStatusFromCache(CR resource) {
}

/**
* Remove cached status for Flink resource.
* Clean up resource after deletion and send a last status update.
*
* @param resource Flink resource.
*/
public void removeCachedStatus(CR resource) {
statusCache.remove(ResourceID.fromResource(resource));
public void cleanupForDeletion(CR resource) {
var prevJson = statusCache.remove(ResourceID.fromResource(resource));
var prevStatus = convertPreviousStatus(resource, prevJson);
metricManager.onRemove(resource);
statusUpdateListener.accept(resource, prevStatus);
}

public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.IMAGE;
Expand Down Expand Up @@ -934,13 +933,4 @@ private PodTemplateSpec createTestPodWithContainers() {
TestUtils.getTestPodTemplate("hostname", List.of(mainContainer, sideCarContainer));
return pod;
}

private static Stream<KubernetesConfigOptions.ServiceExposedType> serviceExposedTypes() {
return Stream.of(
null,
KubernetesConfigOptions.ServiceExposedType.ClusterIP,
KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP,
KubernetesConfigOptions.ServiceExposedType.NodePort);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
assertEquals(
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec(),
appCluster.getStatus().getReconciliationStatus().getLastStableSpec());

testController.cleanup(appCluster, context);
// Make sure status is recorded and sent out at the end of cleanup
assertEquals(
ResourceLifecycleState.DELETED,
testController
.getStatusUpdateCounter()
.currentResource
.getStatus()
.getLifecycleState());
assertEquals(ResourceLifecycleState.DELETED, appCluster.getStatus().getLifecycleState());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
Expand Down Expand Up @@ -652,6 +653,14 @@ public void testCancelJobNotFound() throws Exception {

assertEquals(CANCELLING, sessionJob.getStatus().getJobStatus().getState());
assertFalse(deleteControl.isRemoveFinalizer());
assertEquals(
ResourceLifecycleState.DELETING,
testController
.getStatusUpdateCounter()
.currentResource
.getStatus()
.getLifecycleState());
assertEquals(ResourceLifecycleState.DELETING, sessionJob.getStatus().getLifecycleState());
assertEquals(
configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis(),
deleteControl.getScheduleDelay().get());
Expand All @@ -660,6 +669,14 @@ public void testCancelJobNotFound() throws Exception {
flinkService.setFlinkJobNotFound(true);
deleteControl = testController.cleanup(sessionJob, context);
assertTrue(deleteControl.isRemoveFinalizer());
assertEquals(
ResourceLifecycleState.DELETED,
testController
.getStatusUpdateCounter()
.currentResource
.getStatus()
.getLifecycleState());
assertEquals(ResourceLifecycleState.DELETED, sessionJob.getStatus().getLifecycleState());
}

private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob sessionJob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class TestingFlinkDeploymentController

@Getter private ReconcilerFactory reconcilerFactory;
private FlinkDeploymentController flinkDeploymentController;
private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
@Getter private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
private FlinkResourceEventCollector flinkResourceEventCollector =
new FlinkResourceEventCollector();

Expand Down Expand Up @@ -174,11 +174,12 @@ public Queue<Event> flinkResourceEvents() {
return flinkResourceEventCollector.events;
}

private static class StatusUpdateCounter
/** Test status consumer. */
protected static class StatusUpdateCounter
implements BiConsumer<FlinkDeployment, FlinkDeploymentStatus> {

private FlinkDeployment currentResource;
private int counter;
FlinkDeployment currentResource;
int counter;

@Override
public void accept(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ public class TestingFlinkSessionJobController

@Getter private CanaryResourceManager<FlinkSessionJob> canaryResourceManager;
private FlinkSessionJobController flinkSessionJobController;

@Getter
private TestingFlinkSessionJobController.StatusUpdateCounter statusUpdateCounter =
new TestingFlinkSessionJobController.StatusUpdateCounter();

private FlinkResourceEventCollector flinkResourceEventCollector =
new FlinkResourceEventCollector();
private EventRecorder eventRecorder;
Expand Down Expand Up @@ -161,10 +164,11 @@ public Queue<Event> events() {
return flinkResourceEventCollector.events;
}

private static class StatusUpdateCounter
/** Test status consumer. */
protected static class StatusUpdateCounter
implements BiConsumer<FlinkSessionJob, FlinkSessionJobStatus> {

private FlinkSessionJob currentResource;
FlinkSessionJob currentResource;
private int counter;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10378,6 +10378,8 @@ spec:
lifecycleState:
enum:
- CREATED
- DELETED
- DELETING
- DEPLOYED
- FAILED
- ROLLED_BACK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ spec:
lifecycleState:
enum:
- CREATED
- DELETED
- DELETING
- DEPLOYED
- FAILED
- ROLLED_BACK
Expand Down
Loading