Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flink-kubernetes-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ under the License.
</exclusions>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kube-api-test-client-inject</artifactId>
<version>${fabric8.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@

package org.apache.flink.kubernetes.operator.exception;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;

import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;

import java.util.Optional;
import java.util.Set;

/** Exception to signal terminal deployment failure. */
public class DeploymentFailedException extends RuntimeException {

public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
public static final String REASON_ERR_IMAGE_PULL = "ErrImagePull";
public static final Set<String> CONTAINER_ERROR_REASONS =
ImmutableSet.of(
"CrashLoopBackOff",
"ImagePullBackOff",
"ErrImagePull",
"RunContainerError",
"CreateContainerConfigError",
"OOMKilled");

private static final long serialVersionUID = -1070179896083579221L;

Expand All @@ -36,11 +46,6 @@ public DeploymentFailedException(DeploymentCondition deployCondition) {
this.reason = deployCondition.getReason();
}

public DeploymentFailedException(ContainerStateWaiting stateWaiting) {
super(stateWaiting.getMessage());
this.reason = stateWaiting.getReason();
}

public DeploymentFailedException(String message, String reason) {
super(message);
this.reason = reason;
Expand All @@ -49,4 +54,24 @@ public DeploymentFailedException(String message, String reason) {
public String getReason() {
return reason;
}

public static DeploymentFailedException forContainerStatus(ContainerStatus status) {
var waiting = status.getState().getWaiting();
var lastState = status.getLastState();
String message = null;
if ("CrashLoopBackOff".equals(waiting.getReason())
&& lastState != null
&& lastState.getTerminated() != null) {
message =
Optional.ofNullable(lastState.getTerminated().getMessage())
.map(err -> "CrashLoop - " + err)
.orElse(null);
}

if (message == null) {
message = waiting.getMessage();
}
return new DeploymentFailedException(
String.format("[%s] %s", status.getName(), message), waiting.getReason());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
Expand All @@ -46,7 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

/** Base observer for session and application clusters. */
public abstract class AbstractFlinkDeploymentObserver
Expand Down Expand Up @@ -135,7 +135,7 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
try {
checkFailedCreate(status);
// checking the pod is expensive; only do it when the deployment isn't ready
checkContainerBackoff(ctx);
checkContainerErrors(ctx);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for spec update
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
Expand Down Expand Up @@ -172,21 +172,28 @@ private void checkFailedCreate(DeploymentStatus status) {
}
}

private void checkContainerBackoff(FlinkResourceContext<FlinkDeployment> ctx) {
private void checkContainerErrors(FlinkResourceContext<FlinkDeployment> ctx) {
PodList jmPods =
ctx.getFlinkService().getJmPodList(ctx.getResource(), ctx.getObserveConfig());
for (Pod pod : jmPods.getItems()) {
for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
ContainerStateWaiting csw = cs.getState().getWaiting();
if (csw != null
&& Set.of(
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
DeploymentFailedException.REASON_ERR_IMAGE_PULL)
.contains(csw.getReason())) {
throw new DeploymentFailedException(csw);
}
}
var podStatus = pod.getStatus();
Stream.concat(
podStatus.getContainerStatuses().stream(),
podStatus.getInitContainerStatuses().stream())
.forEach(AbstractFlinkDeploymentObserver::checkContainerError);

// No obvious errors were found, check for volume mount issues
EventUtils.checkForVolumeMountErrors(ctx.getKubernetesClient(), pod);
}
}

private static void checkContainerError(ContainerStatus cs) {
if (cs.getState() == null || cs.getState().getWaiting() == null) {
return;
}
if (DeploymentFailedException.CONTAINER_ERROR_REASONS.contains(
cs.getState().getWaiting().getReason())) {
throw DeploymentFailedException.forContainerStatus(cs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package org.apache.flink.kubernetes.operator.utils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodCondition;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import org.slf4j.Logger;
Expand All @@ -32,10 +37,14 @@
import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* The util to generate an event for the target resource. It is copied from
Expand Down Expand Up @@ -182,13 +191,7 @@ private static Event buildEvent(
String eventName) {
return new EventBuilder()
.withApiVersion("v1")
.withInvolvedObject(
new ObjectReferenceBuilder()
.withKind(target.getKind())
.withUid(target.getMetadata().getUid())
.withName(target.getMetadata().getName())
.withNamespace(target.getMetadata().getNamespace())
.build())
.withInvolvedObject(getObjectReference(target))
.withType(type.name())
.withReason(reason)
.withFirstTimestamp(Instant.now().toString())
Expand Down Expand Up @@ -235,4 +238,78 @@ private static Optional<Event> createOrReplaceEvent(KubernetesClient client, Eve
}
return Optional.empty();
}

private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
var ref = getObjectReference(pod);

var eventList =
client.v1()
.events()
.inNamespace(pod.getMetadata().getNamespace())
.withInvolvedObject(ref)
.list();

if (eventList == null) {
return new ArrayList<>();
}

var items = eventList.getItems();
if (items == null) {
return new ArrayList<>();
}
return items;
}

@VisibleForTesting
protected static ObjectReference getObjectReference(HasMetadata resource) {
var ref = new ObjectReference();
ref.setApiVersion(resource.getApiVersion());
ref.setKind(resource.getKind());
ref.setName(resource.getMetadata().getName());
ref.setNamespace(resource.getMetadata().getNamespace());
ref.setUid(resource.getMetadata().getUid());
return ref;
}

/**
* Check that pod is stuck during volume mount stage and throw {@link DeploymentFailedException}
* with the right reason message if that's the case.
*
* @param client Kubernetes client
* @param pod Pod to be checked
*/
public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
var conditions = pod.getStatus().getConditions();
if (conditions == null) {
return;
}
var conditionMap =
conditions.stream()
.collect(Collectors.toMap(PodCondition::getType, Function.identity()));

// We use PodReadyToStartContainers if available otherwise use Initialized, but it's only
// there k8s 1.29+
boolean failedInitialization =
checkStatusWasAlways(
pod,
conditionMap.getOrDefault(
"PodReadyToStartContainers", conditionMap.get("Initialized")),
"False");

boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False");

if (notReady && failedInitialization) {
getPodEvents(client, pod).stream()
.filter(e -> e.getReason().equals("FailedMount"))
.findAny()
.ifPresent(
e -> {
throw new DeploymentFailedException(e.getMessage(), e.getReason());
});
}
}

private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) {
return condition != null && condition.getStatus().equals(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ public class TestUtils extends BaseTestUtils {
public static PodList createFailedPodList(String crashLoopMessage, String reason) {
ContainerStatus cs =
new ContainerStatusBuilder()
.withName("c1")
.withNewState()
.withNewWaiting()
.withReason(reason)
.withMessage(crashLoopMessage)
.endWaiting()
.endState()
.build();

Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
pod.setStatus(new PodStatusBuilder().withContainerStatuses(cs).build());
return new PodListBuilder().withItems(pod).build();
}

public static PodList createFailedInitContainerPodList(String crashLoopMessage, String reason) {
ContainerStatus cs =
new ContainerStatusBuilder()
.withName("c1")
.withNewState()
.withNewWaiting()
.withReason(reason)
Expand All @@ -108,7 +126,8 @@ public static PodList createFailedPodList(String crashLoopMessage, String reason
Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
pod.setStatus(
new PodStatusBuilder()
.withContainerStatuses(Collections.singletonList(cs))
.withContainerStatuses(new ContainerStatusBuilder().withReady().build())
.withInitContainerStatuses(cs)
.build());
return new PodListBuilder().withItems(pod).build();
}
Expand Down
Loading
Loading