diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java index 713e132ad2..cd817ec89d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java @@ -38,7 +38,6 @@ import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; -import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,11 +116,11 @@ protected void observeJmDeployment(FlinkResourceContext ctx) { ctx.getJosdkContext().getSecondaryResource(Deployment.class); if (deployment.isPresent()) { DeploymentStatus status = deployment.get().getStatus(); - DeploymentSpec spec = deployment.get().getSpec(); if (status != null && status.getAvailableReplicas() != null - && spec.getReplicas().intValue() == status.getReplicas() - && spec.getReplicas().intValue() == status.getAvailableReplicas() + // One available JM is enough to run the job correctly + && status.getReplicas() > 0 + && status.getAvailableReplicas() > 0 && ctx.getFlinkService().isJobManagerPortReady(ctx.getObserveConfig())) { // typically it takes a few seconds for the REST server to be ready diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 6af375edb3..d9492bfd60 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -117,7 +117,7 @@ public static Deployment createDeployment(boolean ready) { String nowTs = Instant.now().toString(); var status = new DeploymentStatus(); status.setAvailableReplicas(ready ? 1 : 0); - status.setReplicas(1); + status.setReplicas(2); var availableCondition = new DeploymentCondition(); availableCondition.setType("Available"); availableCondition.setStatus(ready ? "True" : "False"); @@ -125,7 +125,7 @@ public static Deployment createDeployment(boolean ready) { status.setConditions(List.of(availableCondition)); DeploymentSpec spec = new DeploymentSpec(); - spec.setReplicas(1); + spec.setReplicas(3); var meta = new ObjectMeta(); meta.setCreationTimestamp(nowTs);