Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| ----------| ---- | ---- |
| jobName | java.lang.String | Name of the job. |
| jobId | java.lang.String | Flink JobId of the Job. |
| state | java.lang.String | Last observed state of the job. |
| state | org.apache.flink.api.common.JobStatus | Last observed state of the job. |
| startTime | java.lang.String | Start time of the job. |
| updateTime | java.lang.String | Update time of the job. |
| upgradeSavepointPath | java.lang.String | |
Expand Down
4 changes: 2 additions & 2 deletions flink-kubernetes-operator-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ under the License.
fork="true" failonerror="true">
<classpath refid="maven.compile.classpath"/>
<arg value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
</java>
</target>
</configuration>
Expand All @@ -243,7 +243,7 @@ under the License.
fork="true" failonerror="true">
<classpath refid="maven.compile.classpath"/>
<arg value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
<arg value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
</java>
</target>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ public ResourceLifecycleState getLifecycleState() {
return ResourceLifecycleState.SUSPENDED;
}

var jobState = getJobStatus().getState();
if (jobState != null
&& org.apache.flink.api.common.JobStatus.valueOf(jobState)
.equals(org.apache.flink.api.common.JobStatus.FAILED)) {
if (getJobStatus().getState() == org.apache.flink.api.common.JobStatus.FAILED) {
return ResourceLifecycleState.FAILED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class JobStatus {

/** Last observed state of the job. */
@PrinterColumn(name = "Job Status")
private String state;
private org.apache.flink.api.common.JobStatus state;

/** Start time of the job. */
private String startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public void resetTrigger() {
*/
public void updateLastSavepoint(Savepoint savepoint) {
if (savepoint == null) {
// In terminal states we have to handle the case when there is actually no savepoint to
// not restore from an old one
lastSavepoint = null;
} else if (lastSavepoint == null
|| !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ protected static void verifyOtherPropsMatch(String path, JsonNode oldNode, JsonN
protected static void checkStringTypeCompatibility(
String path, JsonNode oldNode, JsonNode newNode) {
if (!oldNode.has("enum") && newNode.has("enum")) {
err("Cannot turn string into enum for " + path);
// We make an exception here for jobstatus.state, this is a backward compatible change
if (!path.equals(".status.jobStatus.state")) {
err("Cannot turn string into enum for " + path);
}
}

if (oldNode.has("enum")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void handleDeploymentFailed(
var flinkApp = ctx.getResource();
LOG.error("Flink Deployment failed", dfe);
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
eventRecorder.triggerEvent(
flinkApp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
Expand Down Expand Up @@ -73,7 +72,7 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
CommonStatus<?> status = getResource().getStatus();
String jobId = status.getJobStatus().getJobId();

JobStatus jobStatus = generateJobStatusEnum(status);
JobStatus jobStatus = status.getJobStatus().getState();

return new KubernetesJobAutoScalerContext(
jobId == null ? null : JobID.fromHexString(jobId),
Expand All @@ -84,19 +83,6 @@ private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
this);
}

@Nullable
private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
return null;
}

String state = status.getJobStatus().getState();
if (state == null) {
return null;
}
return JobStatus.valueOf(state);
}

/**
* Get the config that is currently deployed for the resource spec. The returned config may be
* null in case the resource is not accessible/ready yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected void onTargetJobNotFound(FlinkResourceContext<R> ctx) {
// upgrading state and retry the upgrade (if possible)
resource.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
}
jobStatus.setState(JobStatus.RECONCILING.name());
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING);
resource.getStatus().setError(JOB_NOT_FOUND_ERR);
}

Expand All @@ -135,9 +135,9 @@ protected void onTargetJobNotFound(FlinkResourceContext<R> ctx) {
*/
private void ifRunningMoveToReconciling(
org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus,
String previousJobStatus) {
if (JobStatus.RUNNING.name().equals(previousJobStatus)) {
jobStatus.setState(JobStatus.RECONCILING.name());
JobStatus previousJobStatus) {
if (JobStatus.RUNNING == previousJobStatus) {
jobStatus.setState(JobStatus.RECONCILING);
}
}

Expand All @@ -160,7 +160,7 @@ private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clust
var previousJobStatus = jobStatus.getState();
var currentJobStatus = clusterJobStatus.getJobState();

jobStatus.setState(clusterJobStatus.getJobState().name());
jobStatus.setState(currentJobStatus);
jobStatus.setJobName(clusterJobStatus.getJobName());
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));

Expand All @@ -177,7 +177,7 @@ private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clust

if (JobStatus.CANCELED == currentJobStatus
|| (currentJobStatus.isGloballyTerminalState()
&& JobStatus.CANCELLING.name().equals(previousJobStatus))) {
&& JobStatus.CANCELLING.equals(previousJobStatus))) {
// The job was cancelled
markSuspended(resource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
checkContainerBackoff(ctx);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for spec update
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
if (!JobManagerDeploymentStatus.ERROR.equals(
deploymentStatus.getJobManagerDeploymentStatus())) {
throw dfe;
Expand All @@ -149,7 +149,7 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
}

deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);

if (previousJmStatus != JobManagerDeploymentStatus.MISSING
&& previousJmStatus != JobManagerDeploymentStatus.ERROR) {
Expand Down Expand Up @@ -192,7 +192,7 @@ protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment dep) {
FlinkDeploymentStatus status = dep.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR
&& !JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
&& !JobStatus.FAILED.equals(dep.getStatus().getJobStatus().getState())
&& reconciliationStatus.isLastReconciledSpecStable()) {
status.setError(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
import java.time.Instant;
import java.util.function.BiConsumer;

import static org.apache.flink.api.common.JobStatus.CANCELED;
import static org.apache.flink.api.common.JobStatus.CANCELLING;
import static org.apache.flink.api.common.JobStatus.FINISHED;
import static org.apache.flink.api.common.JobStatus.RECONCILING;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkStateSnapshotException;
Expand Down Expand Up @@ -233,7 +236,7 @@ private static TaskManagerInfo getTaskManagerInfo(
}
}

public static void updateForReconciliationError(FlinkResourceContext ctx, Throwable error) {
public static void updateForReconciliationError(FlinkResourceContext<?> ctx, Throwable error) {
updateFlinkResourceException(error, ctx.getResource(), ctx.getOperatorConfig());
}

Expand Down Expand Up @@ -352,35 +355,23 @@ private static boolean upgradeStarted(

public static boolean isJobInTerminalState(CommonStatus<?> status) {
var jobState = status.getJobStatus().getState();
if (jobState == null) {
jobState = org.apache.flink.api.common.JobStatus.RECONCILING.name();
}
return org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState();
return jobState != null && jobState.isGloballyTerminalState();
}

public static boolean isJobRunning(CommonStatus<?> status) {
return org.apache.flink.api.common.JobStatus.RUNNING
.name()
.equals(status.getJobStatus().getState());
return RUNNING == status.getJobStatus().getState();
}

public static boolean isJobCancelled(CommonStatus<?> status) {
return org.apache.flink.api.common.JobStatus.CANCELED
.name()
.equals(status.getJobStatus().getState());
return CANCELED == status.getJobStatus().getState();
}

public static boolean isJobCancellable(CommonStatus<?> status) {
return !org.apache.flink.api.common.JobStatus.RECONCILING
.name()
.equals(status.getJobStatus().getState());
return RECONCILING != status.getJobStatus().getState();
}

public static boolean isJobCancelling(CommonStatus<?> status) {
return status.getJobStatus() != null
&& org.apache.flink.api.common.JobStatus.CANCELLING
.name()
.equals(status.getJobStatus().getState());
return status.getJobStatus() != null && CANCELLING == status.getJobStatus().getState();
}

/**
Expand Down Expand Up @@ -503,8 +494,7 @@ public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?,
* @param status Status to be updated.
*/
public static void checkAndUpdateStableSpec(CommonStatus<?> status) {
var flinkJobStatus =
org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
var flinkJobStatus = status.getJobStatus().getState();

if (status.getReconciliationStatus().getState() != ReconciliationState.DEPLOYED) {
return;
Expand Down Expand Up @@ -542,8 +532,7 @@ public static void updateStatusForAlreadyUpgraded(AbstractFlinkResource<?, ?> re
var lastJobSpec = lastSpecWithMeta.getSpec().getJob();
if (lastJobSpec != null) {
lastJobSpec.setState(JobState.RUNNING);
status.getJobStatus()
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
status.getJobStatus().setState(RECONCILING);
}
reconciliationStatus.setState(ReconciliationState.DEPLOYED);
reconciliationStatus.setLastReconciledSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
Expand Down Expand Up @@ -511,7 +510,7 @@ private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
boolean nonTerminalApplication =
!sessionCluster
&& deployedJob.getState() == JobState.RUNNING
&& !JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState();
&& !jobStatus.getState().isGloballyTerminalState();
boolean jmShouldBeRunning = sessionCluster || nonTerminalApplication;
return jmShouldBeRunning
&& (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ protected void setUpgradeSavepointPath(FlinkResourceContext<?> ctx, String savep
@Override
public boolean reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws Exception {
var status = ctx.getResource().getStatus();
var jobStatus =
org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
var jobStatus = status.getJobStatus().getState();
if (jobStatus == org.apache.flink.api.common.JobStatus.FAILED
&& ctx.getObserveConfig().getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
LOG.info("Stopping failed Flink job...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected JobUpgrade getJobUpgrade(

private void deleteJmThatNeverStarted(
FlinkService flinkService, FlinkDeployment deployment, Configuration deployConfig) {
deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
deployment.getStatus().getJobStatus().setState(JobStatus.FAILED);
flinkService.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), deployConfig, false);
LOG.info("Deleted application cluster that never started.");
Expand Down Expand Up @@ -181,7 +181,7 @@ public void deploy(
MSG_SUBMIT,
ctx.getKubernetesClient());
flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);

IngressUtils.updateIngressRules(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void deploy(
savepoint.orElse(null));

var status = ctx.getResource().getStatus();
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ protected CancelResult cancelJob(
deployment.getMetadata(), status, conf, suspendMode.deleteHaMeta());
}

status.getJobStatus().setState(JobStatus.FINISHED.name());
status.getJobStatus().setState(JobStatus.FINISHED);
return CancelResult.completed(savepointPath);
}

Expand All @@ -366,7 +366,7 @@ public CancelResult cancelSessionJob(
break;
}
}
status.getJobStatus().setState(JobStatus.FINISHED.name());
status.getJobStatus().setState(JobStatus.FINISHED);
status.getJobStatus().setJobId(null);
return CancelResult.completed(savepointPath);
}
Expand Down Expand Up @@ -404,7 +404,7 @@ public void cancelJobOrError(
"Cancellation Error", EventRecorder.Reason.CleanupFailed.name(), e);
}
}
status.getJobStatus().setState(JobStatus.CANCELLING.name());
status.getJobStatus().setState(JobStatus.CANCELLING);
}

public String savepointJobOrError(
Expand Down Expand Up @@ -1037,9 +1037,8 @@ protected void deleteHAData(String namespace, String clusterId, Configuration co
protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
var currentJobState = status.getJobStatus().getState();
if (currentJobState == null
|| !JobStatus.valueOf(currentJobState).isGloballyTerminalState()) {
status.getJobStatus().setState(JobStatus.FINISHED.name());
if (currentJobState == null || !currentJobState.isGloballyTerminalState()) {
status.getJobStatus().setState(JobStatus.FINISHED);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private static boolean supportsInPlaceScaling(

var status = resource.getStatus();
if (ReconciliationUtils.isJobInTerminalState(status)
|| JobStatus.RECONCILING.name().equals(status.getJobStatus().getState())) {
|| JobStatus.RECONCILING.equals(status.getJobStatus().getState())) {
LOG.info("Job in terminal or reconciling state cannot be scaled in-place");
return false;
}
Expand Down
Loading
Loading