diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index b455fffa5a..cff4a43f9e 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -184,6 +184,13 @@ under the License.
+
+ io.fabric8
+ kube-api-test-client-inject
+ ${fabric8.version}
+ test
+
+
com.squareup.okhttp3
mockwebserver
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
index a8f9abf0b3..9fc6143f06 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -17,15 +17,25 @@
package org.apache.flink.kubernetes.operator.exception;
-import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
+
+import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import java.util.Optional;
+import java.util.Set;
+
/** Exception to signal terminal deployment failure. */
public class DeploymentFailedException extends RuntimeException {
- public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
- public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
- public static final String REASON_ERR_IMAGE_PULL = "ErrImagePull";
+ public static final Set CONTAINER_ERROR_REASONS =
+ ImmutableSet.of(
+ "CrashLoopBackOff",
+ "ImagePullBackOff",
+ "ErrImagePull",
+ "RunContainerError",
+ "CreateContainerConfigError",
+ "OOMKilled");
private static final long serialVersionUID = -1070179896083579221L;
@@ -36,11 +46,6 @@ public DeploymentFailedException(DeploymentCondition deployCondition) {
this.reason = deployCondition.getReason();
}
- public DeploymentFailedException(ContainerStateWaiting stateWaiting) {
- super(stateWaiting.getMessage());
- this.reason = stateWaiting.getReason();
- }
-
public DeploymentFailedException(String message, String reason) {
super(message);
this.reason = reason;
@@ -49,4 +54,24 @@ public DeploymentFailedException(String message, String reason) {
public String getReason() {
return reason;
}
+
+ public static DeploymentFailedException forContainerStatus(ContainerStatus status) {
+ var waiting = status.getState().getWaiting();
+ var lastState = status.getLastState();
+ String message = null;
+ if ("CrashLoopBackOff".equals(waiting.getReason())
+ && lastState != null
+ && lastState.getTerminated() != null) {
+ message =
+ Optional.ofNullable(lastState.getTerminated().getMessage())
+ .map(err -> "CrashLoop - " + err)
+ .orElse(null);
+ }
+
+ if (message == null) {
+ message = waiting.getMessage();
+ }
+ return new DeploymentFailedException(
+ String.format("[%s] %s", status.getName(), message), waiting.getReason());
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index 713e132ad2..954d9fdb50 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -30,9 +30,9 @@
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
@@ -46,7 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
+import java.util.stream.Stream;
/** Base observer for session and application clusters. */
public abstract class AbstractFlinkDeploymentObserver
@@ -135,7 +135,7 @@ protected void observeJmDeployment(FlinkResourceContext ctx) {
try {
checkFailedCreate(status);
// checking the pod is expensive; only do it when the deployment isn't ready
- checkContainerBackoff(ctx);
+ checkContainerErrors(ctx);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for spec update
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
@@ -172,21 +172,28 @@ private void checkFailedCreate(DeploymentStatus status) {
}
}
- private void checkContainerBackoff(FlinkResourceContext ctx) {
+ private void checkContainerErrors(FlinkResourceContext ctx) {
PodList jmPods =
ctx.getFlinkService().getJmPodList(ctx.getResource(), ctx.getObserveConfig());
for (Pod pod : jmPods.getItems()) {
- for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
- ContainerStateWaiting csw = cs.getState().getWaiting();
- if (csw != null
- && Set.of(
- DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
- DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
- DeploymentFailedException.REASON_ERR_IMAGE_PULL)
- .contains(csw.getReason())) {
- throw new DeploymentFailedException(csw);
- }
- }
+ var podStatus = pod.getStatus();
+ Stream.concat(
+ podStatus.getContainerStatuses().stream(),
+ podStatus.getInitContainerStatuses().stream())
+ .forEach(AbstractFlinkDeploymentObserver::checkContainerError);
+
+ // No obvious errors were found, check for volume mount issues
+ EventUtils.checkForVolumeMountErrors(ctx.getKubernetesClient(), pod);
+ }
+ }
+
+ private static void checkContainerError(ContainerStatus cs) {
+ if (cs.getState() == null || cs.getState().getWaiting() == null) {
+ return;
+ }
+ if (DeploymentFailedException.CONTAINER_ERROR_REASONS.contains(
+ cs.getState().getWaiting().getReason())) {
+ throw DeploymentFailedException.forContainerStatus(cs);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index b515cdda38..fb0dbfc663 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -17,11 +17,16 @@
package org.apache.flink.kubernetes.operator.utils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectReference;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodCondition;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import org.slf4j.Logger;
@@ -32,10 +37,14 @@
import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
/**
* The util to generate an event for the target resource. It is copied from
@@ -182,13 +191,7 @@ private static Event buildEvent(
String eventName) {
return new EventBuilder()
.withApiVersion("v1")
- .withInvolvedObject(
- new ObjectReferenceBuilder()
- .withKind(target.getKind())
- .withUid(target.getMetadata().getUid())
- .withName(target.getMetadata().getName())
- .withNamespace(target.getMetadata().getNamespace())
- .build())
+ .withInvolvedObject(getObjectReference(target))
.withType(type.name())
.withReason(reason)
.withFirstTimestamp(Instant.now().toString())
@@ -235,4 +238,78 @@ private static Optional createOrReplaceEvent(KubernetesClient client, Eve
}
return Optional.empty();
}
+
+ private static List getPodEvents(KubernetesClient client, Pod pod) {
+ var ref = getObjectReference(pod);
+
+ var eventList =
+ client.v1()
+ .events()
+ .inNamespace(pod.getMetadata().getNamespace())
+ .withInvolvedObject(ref)
+ .list();
+
+ if (eventList == null) {
+ return new ArrayList<>();
+ }
+
+ var items = eventList.getItems();
+ if (items == null) {
+ return new ArrayList<>();
+ }
+ return items;
+ }
+
+ @VisibleForTesting
+ protected static ObjectReference getObjectReference(HasMetadata resource) {
+ var ref = new ObjectReference();
+ ref.setApiVersion(resource.getApiVersion());
+ ref.setKind(resource.getKind());
+ ref.setName(resource.getMetadata().getName());
+ ref.setNamespace(resource.getMetadata().getNamespace());
+ ref.setUid(resource.getMetadata().getUid());
+ return ref;
+ }
+
+ /**
+ * Check that pod is stuck during volume mount stage and throw {@link DeploymentFailedException}
+ * with the right reason message if that's the case.
+ *
+ * @param client Kubernetes client
+ * @param pod Pod to be checked
+ */
+ public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
+ var conditions = pod.getStatus().getConditions();
+ if (conditions == null) {
+ return;
+ }
+ var conditionMap =
+ conditions.stream()
+ .collect(Collectors.toMap(PodCondition::getType, Function.identity()));
+
+ // We use PodReadyToStartContainers if available otherwise use Initialized, but it's only
+ // there k8s 1.29+
+ boolean failedInitialization =
+ checkStatusWasAlways(
+ pod,
+ conditionMap.getOrDefault(
+ "PodReadyToStartContainers", conditionMap.get("Initialized")),
+ "False");
+
+ boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False");
+
+ if (notReady && failedInitialization) {
+ getPodEvents(client, pod).stream()
+ .filter(e -> e.getReason().equals("FailedMount"))
+ .findAny()
+ .ifPresent(
+ e -> {
+ throw new DeploymentFailedException(e.getMessage(), e.getReason());
+ });
+ }
+ }
+
+ private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) {
+ return condition != null && condition.getStatus().equals(status);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 6af375edb3..774cb9df8e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -97,6 +97,24 @@ public class TestUtils extends BaseTestUtils {
public static PodList createFailedPodList(String crashLoopMessage, String reason) {
ContainerStatus cs =
new ContainerStatusBuilder()
+ .withName("c1")
+ .withNewState()
+ .withNewWaiting()
+ .withReason(reason)
+ .withMessage(crashLoopMessage)
+ .endWaiting()
+ .endState()
+ .build();
+
+ Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
+ pod.setStatus(new PodStatusBuilder().withContainerStatuses(cs).build());
+ return new PodListBuilder().withItems(pod).build();
+ }
+
+ public static PodList createFailedInitContainerPodList(String crashLoopMessage, String reason) {
+ ContainerStatus cs =
+ new ContainerStatusBuilder()
+ .withName("c1")
.withNewState()
.withNewWaiting()
.withReason(reason)
@@ -108,7 +126,8 @@ public static PodList createFailedPodList(String crashLoopMessage, String reason
Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
pod.setStatus(
new PodStatusBuilder()
- .withContainerStatuses(Collections.singletonList(cs))
+ .withContainerStatuses(new ContainerStatusBuilder().withReady().build())
+ .withInitContainerStatuses(cs)
.build());
return new PodListBuilder().withItems(pod).build();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 18273c6372..97a2051786 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -35,7 +35,6 @@
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -59,7 +58,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Map;
@@ -299,94 +297,6 @@ public void verifyFailedDeployment() throws Exception {
updateControl.getScheduleDelay().get());
}
- @ParameterizedTest()
- @ValueSource(
- strings = {
- DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
- DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
- DeploymentFailedException.REASON_ERR_IMAGE_PULL
- })
- public void verifyInProgressDeploymentWithError(String reason) throws Exception {
- String crashLoopMessage = "container fails";
-
- var submittedEventValidatingResponseProvider =
- new TestUtils.ValidatingResponseProvider<>(
- mockedEvent,
- r ->
- assertTrue(
- r.getBody()
- .readUtf8()
- .contains(
- AbstractFlinkResourceReconciler
- .MSG_SUBMIT)));
- mockServer
- .expect()
- .post()
- .withPath("/api/v1/namespaces/flink-operator-test/events")
- .andReply(submittedEventValidatingResponseProvider)
- .once();
-
- var validatingResponseProvider =
- new TestUtils.ValidatingResponseProvider<>(
- mockedEvent,
- r -> {
- String recordedRequestBody = r.getBody().readUtf8();
- assertTrue(recordedRequestBody.contains(reason));
- assertTrue(recordedRequestBody.contains(crashLoopMessage));
- });
- mockServer
- .expect()
- .post()
- .withPath("/api/v1/namespaces/flink-operator-test/events")
- .andReply(validatingResponseProvider)
- .once();
-
- flinkService.setPodList(TestUtils.createFailedPodList(crashLoopMessage, reason));
-
- FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
- UpdateControl updateControl;
-
- testController.reconcile(appCluster, context);
- updateControl =
- testController.reconcile(
- appCluster,
- TestUtils.createContextWithInProgressDeployment(kubernetesClient));
- submittedEventValidatingResponseProvider.assertValidated();
- assertFalse(updateControl.isUpdateStatus());
- assertEquals(
- Optional.of(
- configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
- updateControl.getScheduleDelay());
-
- assertEquals(
- JobManagerDeploymentStatus.ERROR,
- appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING,
- appCluster.getStatus().getJobStatus().getState());
-
- // Validate status status
- assertNotNull(appCluster.getStatus().getError());
-
- // next cycle should not create another event
- updateControl =
- testController.reconcile(
- appCluster,
- TestUtils.createContextWithFailedJobManagerDeployment(kubernetesClient));
- assertEquals(
- JobManagerDeploymentStatus.ERROR,
- appCluster.getStatus().getJobManagerDeploymentStatus());
- assertFalse(updateControl.isUpdateStatus());
- assertEquals(
- ReconciliationUtils.rescheduleAfter(
- JobManagerDeploymentStatus.READY,
- appCluster,
- configManager.getOperatorConfiguration())
- .toMillis(),
- updateControl.getScheduleDelay().get());
- validatingResponseProvider.assertValidated();
- }
-
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) throws Exception {
@@ -915,9 +825,7 @@ private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Excep
@Test
public void testSuccessfulObservationShouldClearErrors() throws Exception {
final String crashLoopMessage = "deploy errors";
- flinkService.setPodList(
- TestUtils.createFailedPodList(
- crashLoopMessage, DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
+ flinkService.setPodList(TestUtils.createFailedPodList(crashLoopMessage, "ErrImagePull"));
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java
new file mode 100644
index 0000000000..2437d3b990
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+import io.fabric8.kubernetes.api.model.ContainerState;
+import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
+import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link DeploymentFailedException}. */
+public class DeploymentFailedExceptionTest {
+
+ @Test
+ public void testErrorFromContainerStatus() {
+ var containerStatus = new ContainerStatus();
+ containerStatus.setName("c1");
+ var state = new ContainerState();
+ var waiting = new ContainerStateWaiting();
+ waiting.setMessage("msg");
+ waiting.setReason("r");
+ state.setWaiting(waiting);
+
+ containerStatus.setState(state);
+
+ var ex = DeploymentFailedException.forContainerStatus(containerStatus);
+ assertEquals("[c1] msg", ex.getMessage());
+ assertEquals("r", ex.getReason());
+
+ waiting.setReason("CrashLoopBackOff");
+ waiting.setMessage("backing off");
+ ex = DeploymentFailedException.forContainerStatus(containerStatus);
+ assertEquals("[c1] backing off", ex.getMessage());
+ assertEquals("CrashLoopBackOff", ex.getReason());
+
+ // Last state set but not terminated
+ var lastState = new ContainerState();
+ containerStatus.setLastState(lastState);
+
+ ex = DeploymentFailedException.forContainerStatus(containerStatus);
+ assertEquals("[c1] backing off", ex.getMessage());
+ assertEquals("CrashLoopBackOff", ex.getReason());
+
+ var terminated = new ContainerStateTerminated();
+ terminated.setMessage("crash");
+ lastState.setTerminated(terminated);
+
+ ex = DeploymentFailedException.forContainerStatus(containerStatus);
+ assertEquals("[c1] CrashLoop - crash", ex.getMessage());
+ assertEquals("CrashLoopBackOff", ex.getReason());
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index aa4898d827..68233d397e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -48,12 +48,16 @@
import lombok.Getter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo;
import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus;
@@ -753,8 +757,9 @@ private void bringToReadyStatus(FlinkDeployment deployment) {
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
- @Test
- public void observeListJobsError() {
+ @ParameterizedTest
+ @MethodSource("containerFailureReasons")
+ public void observeListJobsError(String reason, boolean initError) {
bringToReadyStatus(deployment);
observer.observe(deployment, readyContext);
assertEquals(
@@ -762,9 +767,12 @@ public void observeListJobsError() {
deployment.getStatus().getJobManagerDeploymentStatus());
// simulate deployment failure
String podFailedMessage = "list jobs error";
- flinkService.setPodList(
- TestUtils.createFailedPodList(
- podFailedMessage, DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
+ if (initError) {
+ flinkService.setPodList(
+ TestUtils.createFailedInitContainerPodList(podFailedMessage, reason));
+ } else {
+ flinkService.setPodList(TestUtils.createFailedPodList(podFailedMessage, reason));
+ }
flinkService.setPortReady(false);
Exception exception =
assertThrows(
@@ -774,7 +782,14 @@ public void observeListJobsError() {
deployment,
TestUtils.createContextWithInProgressDeployment(
kubernetesClient)));
- assertEquals(podFailedMessage, exception.getMessage());
+ assertEquals("[c1] " + podFailedMessage, exception.getMessage());
+ }
+
+ private static Stream containerFailureReasons() {
+ return DeploymentFailedException.CONTAINER_ERROR_REASONS.stream()
+ .flatMap(
+ reason ->
+ Stream.of(Arguments.of(reason, true), Arguments.of(reason, false)));
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
new file mode 100644
index 0000000000..d4e578097f
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+
+import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
+import io.fabric8.kubernetes.api.model.EventBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodCondition;
+import io.fabric8.kubernetes.api.model.PodConditionBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link EventUtils}. */
+@EnableKubeAPIServer
+public class PodErrorTest {
+
+ static KubernetesClient client;
+
+ @Test
+ public void testVolumeMountErrors() {
+ var pod =
+ new PodBuilder()
+ .withNewMetadata()
+ .withName("test")
+ .withNamespace("default")
+ .endMetadata()
+ .withNewStatus()
+ .endStatus()
+ .build();
+
+ // No conditions, no error expected
+ EventUtils.checkForVolumeMountErrors(client, pod);
+
+ var conditions = new ArrayList();
+ pod.getStatus().setConditions(conditions);
+
+ // No conditions, no error expected
+ EventUtils.checkForVolumeMountErrors(client, pod);
+
+ // Create error events
+ createPodEvent("e1", "reason1", "msg1", pod);
+ createPodEvent("e2", "FailedMount", "mountErr", pod);
+
+ var conditionMap = new HashMap();
+
+ // Pod initialized completely, shouldn't check events
+ conditionMap.put("Initialized", "True");
+ conditionMap.put("Ready", "False");
+
+ conditions.clear();
+ conditionMap.forEach(
+ (t, s) ->
+ conditions.add(
+ new PodConditionBuilder().withType(t).withStatus(s).build()));
+ EventUtils.checkForVolumeMountErrors(client, pod);
+
+ // Pod initialized completely, shouldn't check events
+ conditionMap.put("PodReadyToStartContainers", "True");
+ conditionMap.put("Initialized", "False");
+
+ conditions.clear();
+ conditionMap.forEach(
+ (t, s) ->
+ conditions.add(
+ new PodConditionBuilder().withType(t).withStatus(s).build()));
+ EventUtils.checkForVolumeMountErrors(client, pod);
+
+ // Check event only when not ready to start
+ conditionMap.put("PodReadyToStartContainers", "False");
+ conditions.clear();
+ conditionMap.forEach(
+ (t, s) ->
+ conditions.add(
+ new PodConditionBuilder().withType(t).withStatus(s).build()));
+
+ try {
+ EventUtils.checkForVolumeMountErrors(client, pod);
+ fail("Exception not thrown");
+ } catch (DeploymentFailedException dfe) {
+ assertEquals("FailedMount", dfe.getReason());
+ assertEquals("mountErr", dfe.getMessage());
+ }
+
+ // Old kubernetes without PodReadyToStartContainers
+ conditionMap.remove("PodReadyToStartContainers");
+ conditionMap.put("Initialized", "False");
+ conditions.clear();
+ conditionMap.forEach(
+ (t, s) ->
+ conditions.add(
+ new PodConditionBuilder().withType(t).withStatus(s).build()));
+
+ try {
+ EventUtils.checkForVolumeMountErrors(client, pod);
+ fail("Exception not thrown");
+ } catch (DeploymentFailedException dfe) {
+ assertEquals("FailedMount", dfe.getReason());
+ assertEquals("mountErr", dfe.getMessage());
+ }
+ }
+
+ private void createPodEvent(String name, String reason, String msg, Pod pod) {
+ var event =
+ new EventBuilder()
+ .withApiVersion("v1")
+ .withInvolvedObject(EventUtils.getObjectReference(pod))
+ .withType("type")
+ .withReason(reason)
+ .withFirstTimestamp(Instant.now().toString())
+ .withLastTimestamp(Instant.now().toString())
+ .withNewSource()
+ .withComponent("pod")
+ .endSource()
+ .withCount(1)
+ .withMessage(msg)
+ .withNewMetadata()
+ .withName(name)
+ .withNamespace(pod.getMetadata().getNamespace())
+ .endMetadata()
+ .build();
+ client.resource(event).create();
+ }
+}