Skip to content

Commit b3b45ba

Browse files
committed
[FLINK-36425][refactor] Replace jobStatus.state string with Flink JobStatus enum
1 parent 6a27b3e commit b3b45ba

File tree

39 files changed

+323
-265
lines changed

39 files changed

+323
-265
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
402402
| ----------| ---- | ---- |
403403
| jobName | java.lang.String | Name of the job. |
404404
| jobId | java.lang.String | Flink JobId of the Job. |
405-
| state | java.lang.String | Last observed state of the job. |
405+
| state | org.apache.flink.api.common.JobStatus | Last observed state of the job. |
406406
| startTime | java.lang.String | Start time of the job. |
407407
| updateTime | java.lang.String | Update time of the job. |
408408
| upgradeSavepointPath | java.lang.String | |

flink-kubernetes-operator-api/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ under the License.
226226
fork="true" failonerror="true">
227227
<classpath refid="maven.compile.classpath"/>
228228
<arg value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
229-
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
229+
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
230230
</java>
231231
</target>
232232
</configuration>
@@ -243,7 +243,7 @@ under the License.
243243
fork="true" failonerror="true">
244244
<classpath refid="maven.compile.classpath"/>
245245
<arg value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
246-
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
246+
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
247247
</java>
248248
</target>
249249
</configuration>

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,7 @@ public ResourceLifecycleState getLifecycleState() {
8181
return ResourceLifecycleState.SUSPENDED;
8282
}
8383

84-
var jobState = getJobStatus().getState();
85-
if (jobState != null
86-
&& org.apache.flink.api.common.JobStatus.valueOf(jobState)
87-
.equals(org.apache.flink.api.common.JobStatus.FAILED)) {
84+
if (getJobStatus().getState() == org.apache.flink.api.common.JobStatus.FAILED) {
8885
return ResourceLifecycleState.FAILED;
8986
}
9087

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class JobStatus {
4242

4343
/** Last observed state of the job. */
4444
@PrinterColumn(name = "Job Status")
45-
private String state;
45+
private org.apache.flink.api.common.JobStatus state;
4646

4747
/** Start time of the job. */
4848
private String startTime;

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public void resetTrigger() {
8888
*/
8989
public void updateLastSavepoint(Savepoint savepoint) {
9090
if (savepoint == null) {
91+
// In terminal states we have to handle the case when there is actually no savepoint to
92+
// not restore from an old one
9193
lastSavepoint = null;
9294
} else if (lastSavepoint == null
9395
|| !lastSavepoint.getLocation().equals(savepoint.getLocation())) {

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ protected static void verifyOtherPropsMatch(String path, JsonNode oldNode, JsonN
160160
protected static void checkStringTypeCompatibility(
161161
String path, JsonNode oldNode, JsonNode newNode) {
162162
if (!oldNode.has("enum") && newNode.has("enum")) {
163-
err("Cannot turn string into enum for " + path);
163+
// We make an exception here for jobstatus.state, this is a backward compatible change
164+
if (!path.equals(".status.jobStatus.state")) {
165+
err("Cannot turn string into enum for " + path);
166+
}
164167
}
165168

166169
if (oldNode.has("enum")) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private void handleDeploymentFailed(
176176
var flinkApp = ctx.getResource();
177177
LOG.error("Flink Deployment failed", dfe);
178178
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
179-
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
179+
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
180180
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
181181
eventRecorder.triggerEvent(
182182
flinkApp,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
25-
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2625
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
2726
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
2827
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
@@ -73,7 +72,7 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
7372
CommonStatus<?> status = getResource().getStatus();
7473
String jobId = status.getJobStatus().getJobId();
7574

76-
JobStatus jobStatus = generateJobStatusEnum(status);
75+
JobStatus jobStatus = status.getJobStatus().getState();
7776

7877
return new KubernetesJobAutoScalerContext(
7978
jobId == null ? null : JobID.fromHexString(jobId),
@@ -84,19 +83,6 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
8483
this);
8584
}
8685

87-
@Nullable
88-
private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
89-
if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
90-
return null;
91-
}
92-
93-
String state = status.getJobStatus().getState();
94-
if (state == null) {
95-
return null;
96-
}
97-
return JobStatus.valueOf(state);
98-
}
99-
10086
/**
10187
* Get the config that is currently deployed for the resource spec. The returned config may be
10288
* null in case the resource is not accessible/ready yet.

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected void onTargetJobNotFound(FlinkResourceContext<R> ctx) {
122122
// upgrading state and retry the upgrade (if possible)
123123
resource.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
124124
}
125-
jobStatus.setState(JobStatus.RECONCILING.name());
125+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING);
126126
resource.getStatus().setError(JOB_NOT_FOUND_ERR);
127127
}
128128

@@ -135,9 +135,9 @@ protected void onTargetJobNotFound(FlinkResourceContext<R> ctx) {
135135
*/
136136
private void ifRunningMoveToReconciling(
137137
org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus,
138-
String previousJobStatus) {
139-
if (JobStatus.RUNNING.name().equals(previousJobStatus)) {
140-
jobStatus.setState(JobStatus.RECONCILING.name());
138+
JobStatus previousJobStatus) {
139+
if (JobStatus.RUNNING == previousJobStatus) {
140+
jobStatus.setState(JobStatus.RECONCILING);
141141
}
142142
}
143143

@@ -160,7 +160,7 @@ private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clust
160160
var previousJobStatus = jobStatus.getState();
161161
var currentJobStatus = clusterJobStatus.getJobState();
162162

163-
jobStatus.setState(clusterJobStatus.getJobState().name());
163+
jobStatus.setState(currentJobStatus);
164164
jobStatus.setJobName(clusterJobStatus.getJobName());
165165
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
166166

@@ -177,7 +177,7 @@ private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clust
177177

178178
if (JobStatus.CANCELED == currentJobStatus
179179
|| (currentJobStatus.isGloballyTerminalState()
180-
&& JobStatus.CANCELLING.name().equals(previousJobStatus))) {
180+
&& JobStatus.CANCELLING.equals(previousJobStatus))) {
181181
// The job was cancelled
182182
markSuspended(resource);
183183
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
135135
checkContainerBackoff(ctx);
136136
} catch (DeploymentFailedException dfe) {
137137
// throw only when not already in error status to allow for spec update
138-
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
138+
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
139139
if (!JobManagerDeploymentStatus.ERROR.equals(
140140
deploymentStatus.getJobManagerDeploymentStatus())) {
141141
throw dfe;
@@ -149,7 +149,7 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
149149
}
150150

151151
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
152-
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
152+
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
153153

154154
if (previousJmStatus != JobManagerDeploymentStatus.MISSING
155155
&& previousJmStatus != JobManagerDeploymentStatus.ERROR) {
@@ -192,7 +192,7 @@ protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment dep) {
192192
FlinkDeploymentStatus status = dep.getStatus();
193193
var reconciliationStatus = status.getReconciliationStatus();
194194
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR
195-
&& !JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
195+
&& !JobStatus.FAILED.equals(dep.getStatus().getJobStatus().getState())
196196
&& reconciliationStatus.isLastReconciledSpecStable()) {
197197
status.setError(null);
198198
}

0 commit comments

Comments
 (0)