diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index c336e1c407..600f6289a5 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -402,7 +402,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| ----------| ---- | ---- |
| jobName | java.lang.String | Name of the job. |
| jobId | java.lang.String | Flink JobId of the Job. |
-| state | java.lang.String | Last observed state of the job. |
+| state | org.apache.flink.api.common.JobStatus | Last observed state of the job. |
| startTime | java.lang.String | Start time of the job. |
| updateTime | java.lang.String | Update time of the job. |
| upgradeSavepointPath | java.lang.String | |
diff --git a/flink-kubernetes-operator-api/pom.xml b/flink-kubernetes-operator-api/pom.xml
index da1b59a63d..40ec264245 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -226,7 +226,7 @@ under the License.
fork="true" failonerror="true">
-
+
@@ -243,7 +243,7 @@ under the License.
fork="true" failonerror="true">
-
+
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 0e9635cbd0..9d8c6475c8 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -81,10 +81,7 @@ public ResourceLifecycleState getLifecycleState() {
return ResourceLifecycleState.SUSPENDED;
}
- var jobState = getJobStatus().getState();
- if (jobState != null
- && org.apache.flink.api.common.JobStatus.valueOf(jobState)
- .equals(org.apache.flink.api.common.JobStatus.FAILED)) {
+ if (getJobStatus().getState() == org.apache.flink.api.common.JobStatus.FAILED) {
return ResourceLifecycleState.FAILED;
}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
index 6adef53cd3..40d67a341a 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
@@ -42,7 +42,7 @@ public class JobStatus {
/** Last observed state of the job. */
@PrinterColumn(name = "Job Status")
- private String state;
+ private org.apache.flink.api.common.JobStatus state;
/** Start time of the job. */
private String startTime;
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
index bdbe8de07d..6ae9c27d1d 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
@@ -88,6 +88,8 @@ public void resetTrigger() {
*/
public void updateLastSavepoint(Savepoint savepoint) {
if (savepoint == null) {
+ // In terminal states we have to handle the case when there is actually no savepoint to
+ // not restore from an old one
lastSavepoint = null;
} else if (lastSavepoint == null
|| !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
index faf6024a5a..8dce0d89f2 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
@@ -160,7 +160,10 @@ protected static void verifyOtherPropsMatch(String path, JsonNode oldNode, JsonN
protected static void checkStringTypeCompatibility(
String path, JsonNode oldNode, JsonNode newNode) {
if (!oldNode.has("enum") && newNode.has("enum")) {
- err("Cannot turn string into enum for " + path);
+ // We make an exception here for jobstatus.state, this is a backward compatible change
+ if (!path.equals(".status.jobStatus.state")) {
+ err("Cannot turn string into enum for " + path);
+ }
}
if (oldNode.has("enum")) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index dbaa6d4866..328c2fb7b5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -176,7 +176,7 @@ private void handleDeploymentFailed(
var flinkApp = ctx.getResource();
LOG.error("Flink Deployment failed", dfe);
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
- flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+ flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
eventRecorder.triggerEvent(
flinkApp,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index 0d5c55101a..3010f478c9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -22,7 +22,6 @@
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
@@ -73,7 +72,7 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
CommonStatus> status = getResource().getStatus();
String jobId = status.getJobStatus().getJobId();
- JobStatus jobStatus = generateJobStatusEnum(status);
+ JobStatus jobStatus = status.getJobStatus().getState();
return new KubernetesJobAutoScalerContext(
jobId == null ? null : JobID.fromHexString(jobId),
@@ -84,19 +83,6 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
this);
}
- @Nullable
- private JobStatus generateJobStatusEnum(CommonStatus> status) {
- if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
- return null;
- }
-
- String state = status.getJobStatus().getState();
- if (state == null) {
- return null;
- }
- return JobStatus.valueOf(state);
- }
-
/**
* Get the config that is currently deployed for the resource spec. The returned config may be
* null in case the resource is not accessible/ready yet.
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index e299531fc0..c94c1231b3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -122,7 +122,7 @@ protected void onTargetJobNotFound(FlinkResourceContext ctx) {
// upgrading state and retry the upgrade (if possible)
resource.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
}
- jobStatus.setState(JobStatus.RECONCILING.name());
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING);
resource.getStatus().setError(JOB_NOT_FOUND_ERR);
}
@@ -135,9 +135,9 @@ protected void onTargetJobNotFound(FlinkResourceContext ctx) {
*/
private void ifRunningMoveToReconciling(
org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus,
- String previousJobStatus) {
- if (JobStatus.RUNNING.name().equals(previousJobStatus)) {
- jobStatus.setState(JobStatus.RECONCILING.name());
+ JobStatus previousJobStatus) {
+ if (JobStatus.RUNNING == previousJobStatus) {
+ jobStatus.setState(JobStatus.RECONCILING);
}
}
@@ -160,7 +160,7 @@ private void updateJobStatus(FlinkResourceContext ctx, JobStatusMessage clust
var previousJobStatus = jobStatus.getState();
var currentJobStatus = clusterJobStatus.getJobState();
- jobStatus.setState(clusterJobStatus.getJobState().name());
+ jobStatus.setState(currentJobStatus);
jobStatus.setJobName(clusterJobStatus.getJobName());
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
@@ -177,7 +177,7 @@ private void updateJobStatus(FlinkResourceContext ctx, JobStatusMessage clust
if (JobStatus.CANCELED == currentJobStatus
|| (currentJobStatus.isGloballyTerminalState()
- && JobStatus.CANCELLING.name().equals(previousJobStatus))) {
+ && JobStatus.CANCELLING.equals(previousJobStatus))) {
// The job was cancelled
markSuspended(resource);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index fd19713a9b..9020cbfa8c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -135,7 +135,7 @@ protected void observeJmDeployment(FlinkResourceContext ctx) {
checkContainerBackoff(ctx);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for spec update
- deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
+ deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
if (!JobManagerDeploymentStatus.ERROR.equals(
deploymentStatus.getJobManagerDeploymentStatus())) {
throw dfe;
@@ -149,7 +149,7 @@ protected void observeJmDeployment(FlinkResourceContext ctx) {
}
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
+ deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
if (previousJmStatus != JobManagerDeploymentStatus.MISSING
&& previousJmStatus != JobManagerDeploymentStatus.ERROR) {
@@ -192,7 +192,7 @@ protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment dep) {
FlinkDeploymentStatus status = dep.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR
- && !JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
+ && !JobStatus.FAILED.equals(dep.getStatus().getJobStatus().getState())
&& reconciliationStatus.isLastReconciledSpecStable()) {
status.setError(null);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 15e15141ef..8dba93bc74 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -55,7 +55,10 @@
import java.time.Instant;
import java.util.function.BiConsumer;
+import static org.apache.flink.api.common.JobStatus.CANCELED;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkStateSnapshotException;
@@ -233,7 +236,7 @@ private static TaskManagerInfo getTaskManagerInfo(
}
}
- public static void updateForReconciliationError(FlinkResourceContext ctx, Throwable error) {
+ public static void updateForReconciliationError(FlinkResourceContext> ctx, Throwable error) {
updateFlinkResourceException(error, ctx.getResource(), ctx.getOperatorConfig());
}
@@ -352,35 +355,23 @@ private static boolean upgradeStarted(
public static boolean isJobInTerminalState(CommonStatus> status) {
var jobState = status.getJobStatus().getState();
- if (jobState == null) {
- jobState = org.apache.flink.api.common.JobStatus.RECONCILING.name();
- }
- return org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState();
+ return jobState != null && jobState.isGloballyTerminalState();
}
public static boolean isJobRunning(CommonStatus> status) {
- return org.apache.flink.api.common.JobStatus.RUNNING
- .name()
- .equals(status.getJobStatus().getState());
+ return RUNNING == status.getJobStatus().getState();
}
public static boolean isJobCancelled(CommonStatus> status) {
- return org.apache.flink.api.common.JobStatus.CANCELED
- .name()
- .equals(status.getJobStatus().getState());
+ return CANCELED == status.getJobStatus().getState();
}
public static boolean isJobCancellable(CommonStatus> status) {
- return !org.apache.flink.api.common.JobStatus.RECONCILING
- .name()
- .equals(status.getJobStatus().getState());
+ return RECONCILING != status.getJobStatus().getState();
}
public static boolean isJobCancelling(CommonStatus> status) {
- return status.getJobStatus() != null
- && org.apache.flink.api.common.JobStatus.CANCELLING
- .name()
- .equals(status.getJobStatus().getState());
+ return status.getJobStatus() != null && CANCELLING == status.getJobStatus().getState();
}
/**
@@ -503,8 +494,7 @@ public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource,
* @param status Status to be updated.
*/
public static void checkAndUpdateStableSpec(CommonStatus> status) {
- var flinkJobStatus =
- org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
+ var flinkJobStatus = status.getJobStatus().getState();
if (status.getReconciliationStatus().getState() != ReconciliationState.DEPLOYED) {
return;
@@ -542,8 +532,7 @@ public static void updateStatusForAlreadyUpgraded(AbstractFlinkResource, ?> re
var lastJobSpec = lastSpecWithMeta.getSpec().getJob();
if (lastJobSpec != null) {
lastJobSpec.setState(JobState.RUNNING);
- status.getJobStatus()
- .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ status.getJobStatus().setState(RECONCILING);
}
reconciliationStatus.setState(ReconciliationState.DEPLOYED);
reconciliationStatus.setLastReconciledSpec(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index cc57889bb7..757e1a7cd7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -511,7 +510,7 @@ private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
boolean nonTerminalApplication =
!sessionCluster
&& deployedJob.getState() == JobState.RUNNING
- && !JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState();
+ && !jobStatus.getState().isGloballyTerminalState();
boolean jmShouldBeRunning = sessionCluster || nonTerminalApplication;
return jmShouldBeRunning
&& (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 594632b29d..a6db17b989 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -420,8 +420,7 @@ protected void setUpgradeSavepointPath(FlinkResourceContext> ctx, String savep
@Override
public boolean reconcileOtherChanges(FlinkResourceContext ctx) throws Exception {
var status = ctx.getResource().getStatus();
- var jobStatus =
- org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
+ var jobStatus = status.getJobStatus().getState();
if (jobStatus == org.apache.flink.api.common.JobStatus.FAILED
&& ctx.getObserveConfig().getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
LOG.info("Stopping failed Flink job...");
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index a078a5934f..71cf417958 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -124,7 +124,7 @@ protected JobUpgrade getJobUpgrade(
private void deleteJmThatNeverStarted(
FlinkService flinkService, FlinkDeployment deployment, Configuration deployConfig) {
- deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ deployment.getStatus().getJobStatus().setState(JobStatus.FAILED);
flinkService.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), deployConfig, false);
LOG.info("Deleted application cluster that never started.");
@@ -181,7 +181,7 @@ public void deploy(
MSG_SUBMIT,
ctx.getKubernetesClient());
flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
- status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
IngressUtils.updateIngressRules(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index af227a9324..f1fa88859a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -94,7 +94,7 @@ public void deploy(
savepoint.orElse(null));
var status = ctx.getResource().getStatus();
- status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index c89baf8968..68d7fe897f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -343,7 +343,7 @@ protected CancelResult cancelJob(
deployment.getMetadata(), status, conf, suspendMode.deleteHaMeta());
}
- status.getJobStatus().setState(JobStatus.FINISHED.name());
+ status.getJobStatus().setState(JobStatus.FINISHED);
return CancelResult.completed(savepointPath);
}
@@ -366,7 +366,7 @@ public CancelResult cancelSessionJob(
break;
}
}
- status.getJobStatus().setState(JobStatus.FINISHED.name());
+ status.getJobStatus().setState(JobStatus.FINISHED);
status.getJobStatus().setJobId(null);
return CancelResult.completed(savepointPath);
}
@@ -404,7 +404,7 @@ public void cancelJobOrError(
"Cancellation Error", EventRecorder.Reason.CleanupFailed.name(), e);
}
}
- status.getJobStatus().setState(JobStatus.CANCELLING.name());
+ status.getJobStatus().setState(JobStatus.CANCELLING);
}
public String savepointJobOrError(
@@ -1037,9 +1037,8 @@ protected void deleteHAData(String namespace, String clusterId, Configuration co
protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
var currentJobState = status.getJobStatus().getState();
- if (currentJobState == null
- || !JobStatus.valueOf(currentJobState).isGloballyTerminalState()) {
- status.getJobStatus().setState(JobStatus.FINISHED.name());
+ if (currentJobState == null || !currentJobState.isGloballyTerminalState()) {
+ status.getJobStatus().setState(JobStatus.FINISHED);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index ffd071e662..a5a63f8b4f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -262,7 +262,7 @@ private static boolean supportsInPlaceScaling(
var status = resource.getStatus();
if (ReconciliationUtils.isJobInTerminalState(status)
- || JobStatus.RECONCILING.name().equals(status.getJobStatus().getState())) {
+ || JobStatus.RECONCILING.equals(status.getJobStatus().getState())) {
LOG.info("Job in terminal or reconciling state cannot be scaled in-place");
return false;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 67857b5285..76d03c9b65 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.controller;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -35,6 +34,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -101,7 +101,7 @@ public void verifyApplicationJmRecovery(FlinkVersion flinkVersion, UpgradeMode u
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals(JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
// Remove deployment
flinkService.setPortReady(false);
@@ -130,7 +130,7 @@ public void verifyApplicationJmRecovery(FlinkVersion flinkVersion, UpgradeMode u
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
assertEquals(
appCluster.getSpec(),
appCluster
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index 8fcad0a571..c84fc74ca8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -42,6 +42,7 @@
import java.util.Optional;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -85,7 +86,7 @@ public void verifyFailedApplicationRecovery(FlinkVersion flinkVersion, UpgradeMo
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
// Make deployment unhealthy
flinkService.markApplicationJobFailedWithError(
@@ -101,7 +102,7 @@ public void verifyFailedApplicationRecovery(FlinkVersion flinkVersion, UpgradeMo
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
// We started without savepoint
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -129,7 +130,7 @@ public void verifyFailedApplicationRecoveryWithCheckpoint(UpgradeMode upgradeMod
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
assertNull(flinkService.getSubmittedConf().get(SavepointConfigOptions.SAVEPOINT_PATH));
// trigger checkpoint
@@ -157,7 +158,7 @@ public void verifyFailedApplicationRecoveryWithCheckpoint(UpgradeMode upgradeMod
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
// check savepoint_path
if (upgradeMode != UpgradeMode.STATELESS) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 32e49502a8..08937e70b4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -128,7 +128,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(7, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -147,7 +147,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
// Validate last stable spec is still the old one
assertEquals(
@@ -350,7 +350,7 @@ public void verifyInProgressDeploymentWithError(String reason) throws Exception
JobManagerDeploymentStatus.ERROR,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
appCluster.getStatus().getJobStatus().getState());
// Validate status status
@@ -604,7 +604,7 @@ public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws Exception {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(
JobState.RUNNING,
@@ -693,8 +693,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
// jobStatus has not been set at this time
- assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(), jobStatus.getState());
+ assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, jobStatus.getState());
// Switches operator mode to SESSION
appCluster.getSpec().setJob(null);
@@ -716,7 +715,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
}
@Test
@@ -798,11 +797,11 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep
testController.reconcile(appCluster, context);
if (appCluster.getSpec().getJob() != null) {
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
} else {
assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED.name(),
+ org.apache.flink.api.common.JobStatus.FINISHED,
appCluster.getStatus().getJobStatus().getState());
}
assertEquals(
@@ -866,7 +865,7 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep
testController.reconcile(appCluster, context);
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -894,7 +893,7 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep
testController.reconcile(appCluster, context);
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -1108,7 +1107,7 @@ public void testInitialHaError() throws Exception {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
}
@@ -1154,7 +1153,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
JobManagerDeploymentStatus.DEPLOYING,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(4, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1177,7 +1176,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1191,7 +1190,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1206,7 +1205,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1220,7 +1219,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
assertEquals(
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec(),
appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 5b3578f386..82ccaea51d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.controller;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -51,6 +50,9 @@
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.TestUtils.MAX_RECONCILE_TIMES;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
@@ -119,7 +121,7 @@ public void verifyBasicReconcileLoop() throws Exception {
sessionJob.getSpec().getJob().setParallelism(-1);
updateControl = testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -137,7 +139,7 @@ public void verifyBasicReconcileLoop() throws Exception {
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
// Validate last stable spec is still the old one
assertEquals(
@@ -271,7 +273,7 @@ public void verifyLastStateUpgrade() throws Exception {
testController.reconcile(sessionJob, context);
// Make sure we are cancelling
- assertEquals("CANCELLING", sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING, sessionJob.getStatus().getJobStatus().getState());
// Once cancelling completed make sure that last reconciled spec is correctly upgraded and
// job was started from cp
@@ -285,7 +287,7 @@ public void verifyLastStateUpgrade() throws Exception {
.deserializeLastReconciledSpec()
.getJob()
.getUpgradeMode());
- assertEquals("RECONCILING", sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING, sessionJob.getStatus().getJobStatus().getState());
assertEquals(
ReconciliationState.DEPLOYED,
sessionJob.getStatus().getReconciliationStatus().getState());
@@ -296,7 +298,7 @@ public void verifyLastStateUpgrade() throws Exception {
assertEquals("cp1", jobs.get(0).f0);
testController.reconcile(sessionJob, context);
- assertEquals("RUNNING", sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
// Suspend job
flinkService.setCheckpointInfo(
@@ -330,7 +332,7 @@ public void verifyLastStateUpgradeFailure() throws Exception {
testController.reconcile(sessionJob, context);
// Make sure we are cancelling
- assertEquals("CANCELLING", sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING, sessionJob.getStatus().getJobStatus().getState());
testController.events().poll();
assertEquals(
testController.events().poll().getReason(),
@@ -342,7 +344,7 @@ public void verifyLastStateUpgradeFailure() throws Exception {
testController.reconcile(sessionJob, context);
assertEquals(JobStatusObserver.JOB_NOT_FOUND_ERR, sessionJob.getStatus().getError());
- assertEquals("RECONCILING", sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING, sessionJob.getStatus().getJobStatus().getState());
assertEquals(
ReconciliationState.DEPLOYED,
sessionJob.getStatus().getReconciliationStatus().getState());
@@ -496,7 +498,7 @@ public void verifyStatelessUpgrade() throws Exception {
EventRecorder.Reason.JobStatusChanged,
EventRecorder.Reason.valueOf(statusEvents.get(2).getReason()));
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
assertEquals(
JobState.RUNNING,
sessionJob
@@ -518,8 +520,7 @@ public void verifyReconcileWithBadConfig() throws Exception {
.put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
updateControl = testController.reconcile(sessionJob, context);
assertFalse(updateControl.isUpdateStatus());
- assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING, sessionJob.getStatus().getJobStatus().getState());
// Check when the bad config is applied, observe() will change the cluster state correctly
sessionJob.getSpec().getJob().setParallelism(-1);
@@ -531,7 +532,7 @@ public void verifyReconcileWithBadConfig() throws Exception {
.getError()
.contains("Job parallelism must be larger than 0"));
assertFalse(updateControl.isUpdateStatus());
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
// Make sure we do validation before getting effective config in reconcile().
// Verify the saved headers in lastReconciledSpec is actually used in observe() by
@@ -547,7 +548,7 @@ public void verifyReconcileWithBadConfig() throws Exception {
configuration.get(
KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER)));
testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
}
@Test
@@ -572,7 +573,7 @@ public void testSuccessfulObservationShouldClearErrors() throws Exception {
assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
assertNull(sessionJob.getStatus().getError());
assertEquals(
@@ -649,7 +650,7 @@ public void testCancelJobNotFound() throws Exception {
var deleteControl = testController.cleanup(sessionJob, context);
- assertEquals("CANCELLING", sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING, sessionJob.getStatus().getJobStatus().getState());
assertFalse(deleteControl.isRemoveFinalizer());
assertEquals(
configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis(),
@@ -688,8 +689,7 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E
testController.reconcile(sessionJob, context);
// Reconciling
- assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING, sessionJob.getStatus().getJobStatus().getState());
assertEquals(4, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
@@ -706,7 +706,7 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E
// Running
updateControl = testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
@@ -716,7 +716,7 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E
// Stable loop
updateControl = testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
@@ -729,7 +729,7 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
assertEquals(
sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
index 81cf121b27..09f2c4118b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
@@ -59,6 +59,8 @@
import java.util.Optional;
import java.util.function.BiConsumer;
+import static org.apache.flink.api.common.JobStatus.CANCELED;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED;
@@ -176,7 +178,7 @@ public void testReconcileSnapshotAbandoned() {
controller.reconcile(snapshot, context);
assertThat(snapshot.getStatus().getState()).isEqualTo(IN_PROGRESS);
- deployment.getStatus().getJobStatus().setState("CANCELED");
+ deployment.getStatus().getJobStatus().setState(CANCELED);
controller.reconcile(snapshot, context);
var status = snapshot.getStatus();
var createdAt = Instant.parse(snapshot.getMetadata().getCreationTimestamp());
@@ -269,14 +271,14 @@ public void testReconcileSavepointCleanup() {
assertThat(flinkService.getDisposedSavepoints()).isEmpty();
// Failed dispose, job not running
- deployment.getStatus().getJobStatus().setState("CANCELED");
+ deployment.getStatus().getJobStatus().setState(CANCELED);
snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
assertDeleteControl(
controller.cleanup(snapshot, context),
false,
configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
assertThat(flinkService.getDisposedSavepoints()).isEmpty();
- deployment.getStatus().getJobStatus().setState("RUNNING");
+ deployment.getStatus().getJobStatus().setState(RUNNING);
// Failed dispose, REST error
snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
@@ -551,7 +553,7 @@ public void testReconcileJobNotFound() {
@Test
public void testReconcileJobNotRunning() {
var deployment = createDeployment();
- deployment.getStatus().getJobStatus().setState("CANCELED");
+ deployment.getStatus().getJobStatus().setState(CANCELED);
context = TestUtils.createSnapshotContext(client, deployment);
var snapshot = createSavepoint(deployment);
var errorMessage =
@@ -623,7 +625,7 @@ private FlinkDeployment createDeployment(FlinkVersion flinkVersion) {
var deployment = TestUtils.buildApplicationCluster();
deployment
.getStatus()
- .setJobStatus(JobStatus.builder().state("RUNNING").jobId(JOB_ID).build());
+ .setJobStatus(JobStatus.builder().state(RUNNING).jobId(JOB_ID).build());
deployment.getSpec().setFlinkVersion(flinkVersion);
deployment
.getSpec()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 753a241e60..83fb762440 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -45,6 +45,7 @@
import java.util.LinkedList;
import java.util.Map;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -110,7 +111,7 @@ public void testStatefulRollback(UpgradeMode upgradeMode) throws Exception {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING", dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, dep.getStatus().getJobStatus().getState());
assertEquals(1, flinkService.listJobs().size());
dep.getSpec().setRestartNonce(10L);
testController.reconcile(dep, context);
@@ -154,7 +155,7 @@ public void testSavepointRollbackWithoutHaMetadata() throws Exception {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING", dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, dep.getStatus().getJobStatus().getState());
assertEquals(1, flinkService.listJobs().size());
dep.getSpec().setRestartNonce(10L);
testController.reconcile(dep, context);
@@ -243,7 +244,7 @@ public void testRollbackFailureWithLastState() throws Exception {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING", dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, dep.getStatus().getJobStatus().getState());
assertEquals(1, flinkService.listJobs().size());
// Trigger deployment recovery
@@ -316,7 +317,7 @@ public void testRollbackStateless() throws Exception {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING", dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, dep.getStatus().getJobStatus().getState());
// Make sure we started from empty state even if savepoint was available
assertNull(new LinkedList<>(flinkService.listJobs()).getLast().f0);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index c7b26d1b90..e19089c507 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -36,6 +36,7 @@
import java.time.Duration;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -95,7 +96,7 @@ public void verifyApplicationUnhealthyJmRecovery(
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
// Make deployment unhealthy
flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "100");
@@ -111,7 +112,7 @@ public void verifyApplicationUnhealthyJmRecovery(
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
}
@ParameterizedTest
@@ -129,7 +130,7 @@ public void verifyApplicationNoCompletedCheckpointsJmRecovery(
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
// Make deployment unhealthy
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
@@ -153,6 +154,6 @@ public void verifyApplicationNoCompletedCheckpointsJmRecovery(
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, appCluster.getStatus().getJobStatus().getState());
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 2620fb9b6e..a399d871cc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -77,7 +77,7 @@ public void lifecycleStateTest() {
application.getStatus().setError("errr");
assertEquals(STABLE, application.getStatus().getLifecycleState());
- application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ application.getStatus().getJobStatus().setState(JobStatus.FAILED);
assertEquals(FAILED, application.getStatus().getLifecycleState());
application.getStatus().setError(null);
@@ -88,14 +88,14 @@ public void lifecycleStateTest() {
.setState(ReconciliationState.ROLLING_BACK);
assertEquals(ROLLING_BACK, application.getStatus().getLifecycleState());
- application.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+ application.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
application.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLED_BACK);
assertEquals(ROLLED_BACK, application.getStatus().getLifecycleState());
- application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ application.getStatus().getJobStatus().setState(JobStatus.FAILED);
assertEquals(FAILED, application.getStatus().getLifecycleState());
- application.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ application.getStatus().getJobStatus().setState(JobStatus.RUNNING);
application.getSpec().getJob().setState(JobState.SUSPENDED);
ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
assertEquals(SUSPENDED, application.getStatus().getLifecycleState());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index f5b388871b..512e9f7c49 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -63,7 +63,7 @@ void testCancellingToMissing(
job.getSpec().getJob().setUpgradeMode(upgradeMode);
var status = job.getStatus();
var jobStatus = status.getJobStatus();
- jobStatus.setState(fromStatus.name());
+ jobStatus.setState(fromStatus);
assertEquals(
JobState.RUNNING,
status.getReconciliationStatus()
@@ -91,7 +91,7 @@ void testCancellingToTerminal(JobStatus fromStatus) throws Exception {
var deployment = initDeployment();
var status = deployment.getStatus();
var jobStatus = status.getJobStatus();
- jobStatus.setState(fromStatus.name());
+ jobStatus.setState(fromStatus);
assertEquals(
JobState.RUNNING,
status.getReconciliationStatus()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
index fdbfb87764..0eee471237 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.observer;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
@@ -328,7 +329,7 @@ public void testPeriodicSavepoint() throws Exception {
var jobStatus = status.getJobStatus();
status.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
- jobStatus.setState("RUNNING");
+ jobStatus.setState(JobStatus.RUNNING);
var savepointInfo = jobStatus.getSavepointInfo();
flinkService.triggerSavepointLegacy(null, SnapshotTriggerType.PERIODIC, deployment, conf);
@@ -364,7 +365,7 @@ public void testPeriodicCheckpoint() {
var jobStatus = status.getJobStatus();
status.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
- jobStatus.setState("RUNNING");
+ jobStatus.setState(JobStatus.RUNNING);
var checkpointInfo = jobStatus.getCheckpointInfo();
var triggerId = flinkService.triggerCheckpoint(null, CheckpointType.FULL, conf);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 2f06504b45..b9adee6242 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -128,7 +128,9 @@ public void observeApplicationCluster() throws Exception {
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertEquals(JobState.RUNNING.name(), deployment.getStatus().getJobStatus().getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ deployment.getStatus().getJobStatus().getState());
assertEquals(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec(),
deployment.getStatus().getReconciliationStatus().getLastStableSpec());
@@ -137,7 +139,9 @@ public void observeApplicationCluster() throws Exception {
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertEquals(JobState.RUNNING.name(), deployment.getStatus().getJobStatus().getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ deployment.getStatus().getJobStatus().getState());
assertEquals(
deployment.getMetadata().getName(),
@@ -177,7 +181,7 @@ public void observeApplicationCluster() throws Exception {
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
deployment.getStatus().getJobStatus().getState());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
}
@@ -488,7 +492,7 @@ public void observeSavepoint() throws Exception {
observer.observe(deployment, readyContext);
assertEquals(
- org.apache.flink.api.common.JobStatus.FAILED.name(),
+ org.apache.flink.api.common.JobStatus.FAILED,
deployment.getStatus().getJobStatus().getState());
assertEquals("last-SP", deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
@@ -653,8 +657,7 @@ public void observeCheckpoint() throws Exception {
observer.observe(deployment, readyContext);
assertEquals(
- org.apache.flink.api.common.JobStatus.FAILED.name(),
- getJobStatus(deployment).getState());
+ org.apache.flink.api.common.JobStatus.FAILED, getJobStatus(deployment).getState());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
}
@@ -745,7 +748,7 @@ private void bringToReadyStatus(FlinkDeployment deployment) {
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobName("jobname");
- jobStatus.setState(JobState.RUNNING.name());
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
deployment.getStatus().setJobStatus(jobStatus);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index 1b6bdb0b53..530d35844b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -85,12 +85,12 @@ public void testBasicObserve() throws Exception {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
// observe with empty context will do nothing
observer.observe(sessionJob, TestUtils.createEmptyContext());
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
var reconStatus = sessionJob.getStatus().getReconciliationStatus();
Assertions.assertNotEquals(
@@ -99,22 +99,22 @@ public void testBasicObserve() throws Exception {
// observe with ready context
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING, sessionJob.getStatus().getJobStatus().getState());
Assertions.assertEquals(
reconStatus.getLastReconciledSpec(), reconStatus.getLastStableSpec());
flinkService.setPortReady(false);
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
- sessionJob.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ sessionJob.getStatus().getJobStatus().setState(JobStatus.RUNNING);
// no matched job id, update the state to unknown
flinkService.setPortReady(true);
sessionJob.getStatus().getJobStatus().setJobId(new JobID().toHexString());
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
Assertions.assertEquals(
JobState.SUSPENDED,
sessionJob
@@ -138,13 +138,13 @@ public void testBasicObserve() throws Exception {
Assertions.assertNotNull(jobID);
Assertions.assertNotEquals(jobID, jobID2);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
observer.observe(sessionJob2, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(), sessionJob2.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING, sessionJob2.getStatus().getJobStatus().getState());
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING, sessionJob.getStatus().getJobStatus().getState());
// test error behaviour if job not present
flinkService.clear();
@@ -153,7 +153,7 @@ public void testBasicObserve() throws Exception {
observer.observe(sessionJob2, readyContext);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob2.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob2.getStatus().getJobStatus().getState());
Assertions.assertTrue(
sessionJob2.getStatus().getError().contains(JobStatusObserver.JOB_NOT_FOUND_ERR));
Assertions.assertEquals(
@@ -173,14 +173,14 @@ public void testObserveWithEffectiveConfig() throws Exception {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
flinkService.setListJobConsumer(
(configuration) ->
Assertions.assertEquals(8088, configuration.getInteger(RestOptions.PORT)));
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING, sessionJob.getStatus().getJobStatus().getState());
}
@Test
@@ -192,11 +192,11 @@ public void testObserveSavepoint() throws Exception {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING, sessionJob.getStatus().getJobStatus().getState());
var savepointInfo = sessionJob.getStatus().getJobStatus().getSavepointInfo();
Assertions.assertFalse(
@@ -238,10 +238,10 @@ public void testObserveCheckpoint() throws Exception {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING, sessionJob.getStatus().getJobStatus().getState());
observer.observe(sessionJob, readyContext);
- assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(JobStatus.RUNNING, sessionJob.getStatus().getJobStatus().getState());
var checkpointInfo = sessionJob.getStatus().getJobStatus().getCheckpointInfo();
assertFalse(SnapshotUtils.checkpointInProgress(sessionJob.getStatus().getJobStatus()));
@@ -280,7 +280,7 @@ public void testObserveCheckpoint() throws Exception {
@ValueSource(booleans = {true, false})
public void testObserveAlreadySubmitted(boolean submitted) {
var sessionJob = TestUtils.buildSessionJob();
- sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+ sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
sessionJob.getMetadata().setGeneration(10L);
var readyContext = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
@@ -305,7 +305,7 @@ public void testObserveAlreadySubmitted(boolean submitted) {
submitted ? ReconciliationState.DEPLOYED : ReconciliationState.UPGRADING,
sessionJob.getStatus().getReconciliationStatus().getState());
Assertions.assertEquals(
- submitted ? JobStatus.RUNNING.name() : JobStatus.RECONCILING.name(),
+ submitted ? JobStatus.RUNNING : JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index e00305d3cb..8bf19ff110 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -109,6 +109,9 @@
import java.util.function.Function;
import java.util.function.Predicate;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobSpec;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus;
@@ -321,7 +324,7 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
.setLastStableSpec(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
flinkService.setHaDataAvailable(false);
- getJobStatus(deployment).setState("RECONCILING");
+ getJobStatus(deployment).setState(RECONCILING);
try {
deployment
@@ -343,7 +346,7 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
deployment.getSpec().setRestartNonce(200L);
flinkService.setHaDataAvailable(false);
- getJobStatus(deployment).setState("FINISHED");
+ getJobStatus(deployment).setState(FINISHED);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler.reconcile(deployment, context);
reconciler.reconcile(deployment, context);
@@ -693,7 +696,7 @@ private void verifyAndSetRunningJobsToStatus(
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
.jobName(runningJobs.get(0).f1.getJobName())
.updateTime(Long.toString(System.currentTimeMillis()))
- .state("RUNNING")
+ .state(RUNNING)
.build());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
@@ -711,7 +714,9 @@ public void testJobUpgradeIgnorePendingSavepointLegacy() throws Exception {
getJobSpec(spDeployment).setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
reconciler.reconcile(spDeployment, context);
assertEquals("savepoint_trigger_0", getSavepointInfo(spDeployment).getTriggerId());
- assertEquals(JobState.RUNNING.name(), getJobStatus(spDeployment).getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ getJobStatus(spDeployment).getState());
// Force upgrade when savepoint is in progress.
spDeployment
@@ -724,7 +729,7 @@ public void testJobUpgradeIgnorePendingSavepointLegacy() throws Exception {
reconciler.reconcile(spDeployment, context);
assertEquals("savepoint_trigger_0", getSavepointInfo(spDeployment).getTriggerId());
assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED.name(),
+ org.apache.flink.api.common.JobStatus.FINISHED,
getJobStatus(spDeployment).getState());
}
@@ -741,7 +746,7 @@ public void testRandomJobResultStorePath() throws Exception {
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta, spec);
- status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
@@ -750,7 +755,7 @@ public void testRandomJobResultStorePath() throws Exception {
String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
Assertions.assertTrue(path1.startsWith(haStoragePath));
- status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
@@ -775,7 +780,7 @@ public void testAlwaysSavepointOnFlinkVersionChange() throws Exception {
reconciler.reconcile(deployment, context);
assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
- getJobStatus(deployment).setState(JobState.RUNNING.name());
+ getJobStatus(deployment).setState(RUNNING);
getJobStatus(deployment)
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
@@ -951,7 +956,7 @@ public void scale(KubernetesJobAutoScalerContext ctx) {
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
- assertEquals("RUNNING", deployment.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING, deployment.getStatus().getJobStatus().getState());
// Test overrides are applied correctly
var v1 = new JobVertexID();
@@ -998,7 +1003,7 @@ public void testSetOwnerReference() throws Exception {
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta, spec);
- status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
@@ -1022,12 +1027,20 @@ public void testTerminalJmTtlOnSuspend() throws Throwable {
@Test
public void testTerminalJmTtlOnFinished() throws Throwable {
- testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FINISHED"));
+ testTerminalJmTtl(
+ dep ->
+ dep.getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.FINISHED));
}
@Test
public void testTerminalJmTtlOnFailed() throws Throwable {
- testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FAILED"));
+ testTerminalJmTtl(
+ dep ->
+ dep.getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.FAILED));
}
public void testTerminalJmTtl(ThrowingConsumer deploymentSetup)
@@ -1076,7 +1089,7 @@ public void testClusterCleanupBeforeDeploy(boolean requireMetadata) throws Excep
status.getReconciliationStatus().serializeAndSetLastReconciledSpec(spec, flinkApp);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
- status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
var deleted = new AtomicBoolean(false);
@@ -1112,9 +1125,7 @@ public void testDeploymentRecoveryEvent() throws Exception {
flinkService.clear();
FlinkDeploymentStatus deploymentStatus = deployment.getStatus();
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- deploymentStatus
- .getJobStatus()
- .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ deploymentStatus.getJobStatus().setState(RECONCILING);
reconciler.reconcile(deployment, context);
Assertions.assertEquals(
MSG_RECOVERY, flinkResourceEventCollector.events.remove().getMessage());
@@ -1298,7 +1309,7 @@ public void testRollbackUpgradeModeHandling(boolean jmStarted) throws Exception
ReconciliationState.ROLLED_BACK,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(1, flinkService.listJobs().size());
- assertEquals("RECONCILING", deployment.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING, deployment.getStatus().getJobStatus().getState());
}
@ParameterizedTest
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 93a25e8297..9d4893023c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -63,6 +63,12 @@
import java.util.Optional;
import java.util.stream.Stream;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.FAILING;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RESTARTING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -210,7 +216,10 @@ private void testUpgradeToLastState(FlinkVersion flinkVersion, UpgradeMode fromU
.setLastStableSpec(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
flinkService.setHaDataAvailable(false);
- deployment.getStatus().getJobStatus().setState("RECONCILING");
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.RECONCILING);
Assertions.assertThrows(
UpgradeFailureException.class,
@@ -236,7 +245,10 @@ private void testUpgradeToLastState(FlinkVersion flinkVersion, UpgradeMode fromU
deployment.getSpec().setRestartNonce(200L);
flinkService.setHaDataAvailable(false);
deployment.getStatus().getJobStatus().setUpgradeSavepointPath("finished_sp");
- deployment.getStatus().getJobStatus().setState("FINISHED");
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.FINISHED);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
deployment
.getSpec()
@@ -287,7 +299,7 @@ public void testUpgradeUsesLatestSnapshot(boolean useLegacyFields) throws Except
0L));
}
- deployment.getStatus().getJobStatus().setState("FINISHED");
+ deployment.getStatus().getJobStatus().setState(FINISHED);
reconciler.reconcile(deployment, context);
reconciler.reconcile(deployment, context);
@@ -503,7 +515,7 @@ public void testLastStateMaxCheckpointAge(boolean cancellable) throws Exception
var jobStatus = deployment.getStatus().getJobStatus();
long now = System.currentTimeMillis();
- jobStatus.setState("RUNNING");
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
jobStatus.setStartTime(Long.toString(now));
jobStatus.setJobId(new JobID().toString());
@@ -626,7 +638,7 @@ public void testFlinkVersionSwitching(
jobStatus.setJobId(new JobID().toString());
// Running state, savepoint if possible
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
var ctx = getResourceContext(deployment);
var deployConf = ctx.getDeployConfig(deployment.getSpec());
@@ -637,13 +649,13 @@ public void testFlinkVersionSwitching(
jobReconciler.getJobUpgrade(ctx, deployConf));
// Not running (but cancellable)
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
+ jobStatus.setState(RESTARTING);
assertEquals(
AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
jobReconciler.getJobUpgrade(ctx, deployConf));
// Unknown / reconciling
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ jobStatus.setState(RECONCILING);
assertEquals(
AbstractJobReconciler.JobUpgrade.pendingUpgrade(),
jobReconciler.getJobUpgrade(ctx, deployConf));
@@ -690,7 +702,7 @@ public void testLastStateNoHaMeta(UpgradeMode upgradeMode, boolean allowFallback
jobStatus.setJobId(new JobID().toString());
// Running state, savepoint if possible
- jobStatus.setState(org.apache.flink.api.common.JobStatus.FAILING.name());
+ jobStatus.setState(FAILING);
var ctx = getResourceContext(deployment);
var deployConf = ctx.getDeployConfig(deployment.getSpec());
@@ -798,7 +810,7 @@ public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
reconciler.reconcile(deployment, context);
assertEquals(0, flinkService.getRunningCount());
assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED.name(),
+ org.apache.flink.api.common.JobStatus.FINISHED,
deployment.getStatus().getJobStatus().getState());
var snapshots = TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment);
@@ -845,11 +857,11 @@ public void testUpgradeModeChangedToLastStateShouldCancelWhileHADisabled() throw
// Ready for spec changes, the reconciliation should be performed
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
reconciler.reconcile(deployment, context);
- assertEquals("CANCELLING", deployment.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING, deployment.getStatus().getJobStatus().getState());
String expectedSavepointPath = "savepoint_0";
var jobStatus = deployment.getStatus().getJobStatus();
- jobStatus.setState("CANCELED");
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.CANCELED);
jobStatus
.getSavepointInfo()
.setLastSavepoint(Savepoint.of(expectedSavepointPath, SnapshotTriggerType.UNKNOWN));
@@ -947,7 +959,7 @@ private void verifyAndSetRunningJobsToStatus(
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
.jobName(runningJobs.get(0).f1.getJobName())
.startTime(Long.toString(System.currentTimeMillis()))
- .state("RUNNING")
+ .state(org.apache.flink.api.common.JobStatus.RUNNING)
.build());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 87f44b49bc..54484065fa 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -127,7 +127,7 @@ public void testSetOwnerReference() throws Exception {
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta, spec);
- status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 0185b765e8..c72b50826d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -122,7 +122,7 @@ public void testSubmitAndCleanUpWithSavepoint(boolean legacySnapshots) throws Ex
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
// clean up
reconciler.cleanup(
@@ -165,7 +165,7 @@ public void testSubmitAndCleanUpWithSavepointOnResource(boolean legacySnapshots)
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
// clean up
reconciler.cleanup(
@@ -201,7 +201,7 @@ public void testSubmitAndCleanUp() throws Exception {
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
// clean up
reconciler.cleanup(
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
@@ -217,7 +217,7 @@ public void testCancelJobRescheduled() throws Exception {
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
// clean up
flinkService.setPortReady(false);
var deleteControl =
@@ -230,7 +230,10 @@ public void testCancelJobRescheduled() throws Exception {
flinkService.setPortReady(true);
reconciler.cleanup(
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
deleteControl =
reconciler.cleanup(
sessionJob,
@@ -248,14 +251,17 @@ public void testCancelJobTerminatedWithoutCancellation() throws Exception {
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
// clean up
flinkService.setFlinkJobTerminatedWithoutCancellation(true);
var deleteControl =
reconciler.cleanup(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
deleteControl =
reconciler.cleanup(
@@ -273,16 +279,19 @@ public void testRestart() throws Exception {
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
sessionJob.getSpec().setRestartNonce(2L);
reconciler.reconcile(
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(CANCELED, flinkService.listJobs().get(0).f1.getJobState());
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
reconciler.reconcile(
sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
}
@Test
@@ -294,9 +303,9 @@ public void testRestartWhenFailed() throws Exception {
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
- sessionJob.getStatus().getJobStatus().setState(FAILED.name());
+ sessionJob.getStatus().getJobStatus().setState(FAILED);
reconciler.reconcile(sessionJob, readyContext);
assertEquals(2, flinkService.listJobs().size());
assertEquals(RUNNING, flinkService.listJobs().get(1).f1.getJobState());
@@ -313,7 +322,7 @@ public void testSubmitWithInitialSavepointPath() throws Exception {
verifyAndSetRunningJobsToStatus(
sessionJob,
JobState.RUNNING,
- RECONCILING.name(),
+ RECONCILING,
initSavepointPath,
flinkService.listJobs());
}
@@ -326,7 +335,7 @@ public void testStatelessUpgrade() throws Exception {
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -334,18 +343,20 @@ public void testStatelessUpgrade() throws Exception {
// job suspended first
reconciler.reconcile(statelessSessionJob, readyContext);
assertEquals(CANCELED, flinkService.listJobs().get(0).f1.getJobState());
- verifyJobState(statelessSessionJob, JobState.RUNNING, "CANCELLING");
- statelessSessionJob.getStatus().getJobStatus().setState("CANCELED");
+ verifyJobState(
+ statelessSessionJob,
+ JobState.RUNNING,
+ org.apache.flink.api.common.JobStatus.CANCELLING);
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
flinkService.clear();
reconciler.reconcile(statelessSessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- statelessSessionJob,
- JobState.RUNNING,
- RECONCILING.name(),
- null,
- flinkService.listJobs());
+ statelessSessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
}
@ParameterizedTest
@@ -377,17 +388,13 @@ public void testSavepointUpgrade(boolean legacySnapshots) throws Exception {
statefulSessionJob.getSpec().getJob().setParallelism(3);
verifyAndSetRunningJobsToStatus(
- statefulSessionJob,
- JobState.RUNNING,
- RECONCILING.name(),
- null,
- flinkService.listJobs());
+ statefulSessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
reconciler.reconcile(statefulSessionJob, readyContext);
// job suspended first
assertEquals(FINISHED, flinkService.listJobs().get(0).f1.getJobState());
- verifyJobState(statefulSessionJob, JobState.SUSPENDED, "FINISHED");
+ verifyJobState(statefulSessionJob, JobState.SUSPENDED, FINISHED);
if (legacySnapshots) {
assertEquals(
"savepoint_0",
@@ -416,7 +423,7 @@ public void testSavepointUpgrade(boolean legacySnapshots) throws Exception {
verifyAndSetRunningJobsToStatus(
statefulSessionJob,
JobState.RUNNING,
- RECONCILING.name(),
+ RECONCILING,
"savepoint_0",
flinkService.listJobs());
}
@@ -431,7 +438,7 @@ public void testTriggerSavepointLegacyLegacy() throws Exception {
var readyContext = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
reconciler.reconcile(sessionJob, readyContext);
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
assertFalse(SnapshotUtils.savepointInProgress(sessionJob.getStatus().getJobStatus()));
@@ -443,12 +450,12 @@ public void testTriggerSavepointLegacyLegacy() throws Exception {
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(2L);
- sp1SessionJob.getStatus().getJobStatus().setState(CREATED.name());
+ sp1SessionJob.getStatus().getJobStatus().setState(CREATED);
reconciler.reconcile(sp1SessionJob, readyContext);
// do not trigger savepoint if job is not running
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
- sp1SessionJob.getStatus().getJobStatus().setState(RUNNING.name());
+ sp1SessionJob.getStatus().getJobStatus().setState(RUNNING);
reconciler.reconcile(sp1SessionJob, readyContext);
assertTrue(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
@@ -505,7 +512,10 @@ public void testTriggerSavepointLegacyLegacy() throws Exception {
// running -> suspended
reconciler.reconcile(sp1SessionJob, readyContext);
- sp1SessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sp1SessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
// suspended -> running
reconciler.reconcile(sp1SessionJob, readyContext);
// parallelism changed
@@ -518,7 +528,7 @@ public void testTriggerSavepointLegacyLegacy() throws Exception {
.getJob()
.getParallelism());
verifyAndSetRunningJobsToStatus(
- sp1SessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sp1SessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
@@ -555,7 +565,7 @@ public void testTriggerCheckpoint() throws Exception {
var readyContext = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
reconciler.reconcile(sessionJob, readyContext);
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob)));
@@ -567,12 +577,12 @@ public void testTriggerCheckpoint() throws Exception {
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(2L);
- getJobStatus(sp1SessionJob).setState(CREATED.name());
+ getJobStatus(sp1SessionJob).setState(CREATED);
reconciler.reconcile(sp1SessionJob, readyContext);
// do not trigger checkpoint if job is not running
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
- getJobStatus(sp1SessionJob).setState(RUNNING.name());
+ getJobStatus(sp1SessionJob).setState(RUNNING);
reconciler.reconcile(sp1SessionJob, readyContext);
assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
@@ -618,7 +628,7 @@ public void testCancelStatelessSessionJob(SuspendMode suspendMode) throws Except
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
var job = flinkService.listJobs().get(0);
var jobStatusMessage = job.f1;
@@ -634,13 +644,16 @@ public void testCancelStatelessSessionJob(SuspendMode suspendMode) throws Except
RUNNING,
jobStatusMessage.getStartTime());
// Set state which must be overwritten by cancelSessionJob
- sessionJob.getStatus().getJobStatus().setState("RUNNING");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.RUNNING);
flinkService.cancelSessionJob(sessionJob, suspendMode, jobConfig);
assertEquals(1, flinkService.getCancelJobCallCount());
assertEquals(CANCELED, job.f1.getJobState());
- assertEquals(CANCELLING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING, sessionJob.getStatus().getJobStatus().getState());
}
private static Stream cancelSavepointSessionJobParams() {
@@ -683,7 +696,7 @@ public void testCancelSavepointSessionJob(
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
var job = flinkService.listJobs().get(0);
var jobStatusMessage = job.f1;
@@ -699,7 +712,7 @@ public void testCancelSavepointSessionJob(
fromJobStatus,
jobStatusMessage.getStartTime());
// Set state which must be overwritten by cancelSessionJob
- sessionJob.getStatus().getJobStatus().setState(fromJobStatus.name());
+ sessionJob.getStatus().getJobStatus().setState(fromJobStatus);
if (!shouldThrowException) {
flinkService.cancelSessionJob(sessionJob, SuspendMode.SAVEPOINT, jobConfig);
@@ -723,7 +736,7 @@ public void testCancelSavepointSessionJob(
assertEquals(FINISHED, job.f1.getJobState());
}
if (!shouldThrowException) {
- assertEquals(FINISHED.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(FINISHED, sessionJob.getStatus().getJobStatus().getState());
}
}
@@ -740,7 +753,7 @@ private Tuple3 verifyAndReturnTheSubmit
private void verifyAndSetRunningJobsToStatus(
FlinkSessionJob sessionJob,
JobState expectedState,
- String jobStatusObserved,
+ org.apache.flink.api.common.JobStatus jobStatusObserved,
@Nullable String expectedSavepointPath,
List> jobs) {
@@ -750,11 +763,13 @@ private void verifyAndSetRunningJobsToStatus(
verifyJobState(sessionJob, expectedState, jobStatusObserved);
JobStatus jobStatus = sessionJob.getStatus().getJobStatus();
jobStatus.setJobName(submittedJobInfo.f1.getJobName());
- jobStatus.setState("RUNNING");
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
}
private void verifyJobState(
- FlinkSessionJob sessionJob, JobState expectedState, String jobStatusObserved) {
+ FlinkSessionJob sessionJob,
+ JobState expectedState,
+ org.apache.flink.api.common.JobStatus jobStatusObserved) {
assertEquals(
expectedState,
sessionJob
@@ -773,7 +788,7 @@ public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
reconciler.reconcile(sessionJob, readyContext);
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null, flinkService.listJobs());
FlinkSessionJob spSessionJob = ReconciliationUtils.clone(sessionJob);
spSessionJob
@@ -788,7 +803,9 @@ public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
assertEquals(
"savepoint_trigger_0",
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- assertEquals(JobState.RUNNING.name(), spSessionJob.getStatus().getJobStatus().getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ spSessionJob.getStatus().getJobStatus().getState());
configManager.updateDefaultConfig(
Configuration.fromMap(
@@ -802,7 +819,7 @@ public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
assertEquals(
"savepoint_trigger_0",
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- assertEquals("CANCELLING", spSessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING, spSessionJob.getStatus().getJobStatus().getState());
}
@Test
@@ -816,8 +833,7 @@ public void testJobIdGeneration() throws Exception {
ReconciliationState.DEPLOYED,
sessionJob.getStatus().getReconciliationStatus().getState());
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
- Assertions.assertEquals(
- RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ Assertions.assertEquals(RECONCILING, sessionJob.getStatus().getJobStatus().getState());
Assertions.assertEquals(jobID, flinkService.listJobs().get(0).f1.getJobId().toString());
flinkService.setSessionJobSubmittedCallback(
@@ -831,7 +847,10 @@ public void testJobIdGeneration() throws Exception {
() -> {
// suspend
reconciler.reconcile(sessionJob, readyContext);
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
// upgrade
reconciler.reconcile(sessionJob, readyContext);
});
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 514acbd405..336de17e5f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -133,6 +133,10 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.FAILING;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -293,7 +297,7 @@ public void cancelJobWithStatelessUpgradeModeTest() throws Exception {
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
- deployment.getStatus().getJobStatus().setState("RUNNING");
+ deployment.getStatus().getJobStatus().setState(RUNNING);
flinkService.cancelJob(
deployment,
SuspendMode.STATELESS,
@@ -303,7 +307,7 @@ public void cancelJobWithStatelessUpgradeModeTest() throws Exception {
assertEquals(jobID, cancelFuture.get());
assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
assertNull(jobStatus.getUpgradeSavepointPath());
- assertEquals("FINISHED", jobStatus.getState());
+ assertEquals(FINISHED, jobStatus.getState());
assertEquals(
List.of(
Tuple2.of(
@@ -336,7 +340,7 @@ public void cancelErrorHandling(int statusCode) throws Exception {
var job = TestUtils.buildSessionJob();
var jobStatus = job.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState("RUNNING");
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(job, new Configuration());
if (statusCode == 500) {
@@ -345,10 +349,10 @@ public void cancelErrorHandling(int statusCode) throws Exception {
() ->
flinkService.cancelSessionJob(
job, SuspendMode.STATELESS, new Configuration()));
- assertEquals("RUNNING", jobStatus.getState());
+ assertEquals(RUNNING, jobStatus.getState());
} else {
flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new Configuration());
- assertEquals("CANCELLING", jobStatus.getState());
+ assertEquals(CANCELLING, jobStatus.getState());
}
}
@@ -380,7 +384,7 @@ public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint)
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
var result =
@@ -396,7 +400,7 @@ public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint)
assertFalse(stopWithSavepointFuture.get().f1);
assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
- assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name());
+ assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED);
assertEquals(
deployment.getStatus().getJobManagerDeploymentStatus(),
deleteAfterSavepoint
@@ -443,7 +447,7 @@ public void cancelJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnSavepoin
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
if (drainOnSavepoint) {
@@ -467,7 +471,7 @@ public void cancelJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnSavepoin
assertTrue(stopWithSavepointFuture.isDone());
assertEquals(jobID, stopWithSavepointFuture.get().f0);
- assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name());
+ assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED);
if (drainOnSavepoint) {
assertTrue(stopWithSavepointFuture.get().f1);
@@ -509,7 +513,7 @@ public void cancelSessionJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnS
JobStatus jobStatus = job.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(job, new Configuration());
if (drainOnSavepoint) {
@@ -527,7 +531,7 @@ public void cancelSessionJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnS
var result = flinkService.cancelSessionJob(job, SuspendMode.SAVEPOINT, deployConf);
assertTrue(stopWithSavepointFuture.isDone());
assertEquals(jobID, stopWithSavepointFuture.get().f0);
- assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name());
+ assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED);
assertEquals(savepointPath, result.getSavepointPath().get());
@@ -549,15 +553,13 @@ public void jobCancelTest() throws Exception {
JobID jobID = JobID.generate();
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState("FAILING");
+ jobStatus.setState(FAILING);
flinkService.cancelJob(
deployment, SuspendMode.CANCEL, configManager.getObserveConfig(deployment), false);
assertTrue(flinkService.haDeleted.isEmpty());
assertTrue(flinkService.deleted.isEmpty());
- assertEquals("CANCELLING", jobStatus.getState());
- assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
- assertNull(jobStatus.getUpgradeSavepointPath());
+ assertEquals(CANCELLING, jobStatus.getState());
}
@Test
@@ -752,7 +754,7 @@ private void runNativeSavepointFormatTest(boolean failAfterSavepointCompletes)
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
jobStatus.setJobId(jobID.toString());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 23f179b2f1..352c726cec 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
@@ -288,7 +289,7 @@ protected void updateVertexResources(
Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
spec.setFlinkConfiguration(appConfig.toMap());
- flinkDep.getStatus().getJobStatus().setState("RUNNING");
+ flinkDep.getStatus().getJobStatus().setState(JobStatus.RUNNING);
current.set(
Map.of(
@@ -357,16 +358,22 @@ protected void updateVertexResources(
// Make sure we only try to rescale non-terminal
testScaleConditionDep(
- flinkDep, service, d -> d.getStatus().getJobStatus().setState("FAILED"), false);
+ flinkDep,
+ service,
+ d -> d.getStatus().getJobStatus().setState(JobStatus.FAILED),
+ false);
testScaleConditionDep(
flinkDep,
service,
- d -> d.getStatus().getJobStatus().setState("RECONCILING"),
+ d -> d.getStatus().getJobStatus().setState(JobStatus.RECONCILING),
false);
testScaleConditionDep(
- flinkDep, service, d -> d.getStatus().getJobStatus().setState("RUNNING"), true);
+ flinkDep,
+ service,
+ d -> d.getStatus().getJobStatus().setState(JobStatus.RUNNING),
+ true);
testScaleConditionDep(flinkDep, service, d -> d.getSpec().setJob(null), false);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index 1a3dc8e69f..7b05226bbc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -42,6 +42,7 @@
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.flink.api.common.JobStatus.FAILED;
import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
@@ -243,7 +244,7 @@ public void testAbandonSnapshotIfJobNotRunningNoAbandon() {
assertFalse(result);
assertThat(eventCollector.events).isEmpty();
- deployment.getStatus().getJobStatus().setState("FAILED");
+ deployment.getStatus().getJobStatus().setState(FAILED);
result =
FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
client, snapshot, deployment, eventRecorder);
@@ -262,7 +263,7 @@ public void testAbandonSnapshotIfJobNotRunningNoAbandon() {
@Test
public void testAbandonSnapshotIfJobNotRunningJobFailed() {
var deployment = initDeployment();
- deployment.getStatus().getJobStatus().setState("FAILED");
+ deployment.getStatus().getJobStatus().setState(FAILED);
var snapshot = initSavepoint(IN_PROGRESS, null);
var eventCollector = new FlinkStateSnapshotEventCollector();
var eventRecorder = new EventRecorder((x, y) -> {}, eventCollector);
@@ -341,7 +342,7 @@ private void assertCheckpointResource(
private static FlinkDeployment initDeployment() {
FlinkDeployment deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_19);
- deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconcileSpec(deployment);
return deployment;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index 37f05309e5..5b3cb541cc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -359,7 +359,7 @@ private static FlinkDeployment initDeployment(FlinkVersion flinkVersion) {
.getMetadata()
.setCreationTimestamp(Instant.now().minus(Duration.ofMinutes(15)).toString());
- deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconcileSpec(deployment);
return deployment;
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index b4c2d4c0f4..6042799dc5 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -10355,6 +10355,18 @@ spec:
startTime:
type: string
state:
+ enum:
+ - CANCELED
+ - CANCELLING
+ - CREATED
+ - FAILED
+ - FAILING
+ - FINISHED
+ - INITIALIZING
+ - RECONCILING
+ - RESTARTING
+ - RUNNING
+ - SUSPENDED
type: string
updateTime:
type: string
diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index b7ea567d5b..431787c7a7 100644
--- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -197,6 +197,18 @@ spec:
startTime:
type: string
state:
+ enum:
+ - CANCELED
+ - CANCELLING
+ - CREATED
+ - FAILED
+ - FAILING
+ - FINISHED
+ - INITIALIZING
+ - RECONCILING
+ - RESTARTING
+ - RUNNING
+ - SUSPENDED
type: string
updateTime:
type: string