Skip to content

Commit cbb4c43

Browse files
committed
[FLINK-37470] Improve JobManager Deployment / Pod error handling
1 parent c4d460b commit cbb4c43

File tree

8 files changed

+370
-132
lines changed

8 files changed

+370
-132
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,25 @@
1717

1818
package org.apache.flink.kubernetes.operator.exception;
1919

20-
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
20+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
21+
22+
import io.fabric8.kubernetes.api.model.ContainerStatus;
2123
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
2224

25+
import java.util.Optional;
26+
import java.util.Set;
27+
2328
/** Exception to signal terminal deployment failure. */
2429
public class DeploymentFailedException extends RuntimeException {
2530

26-
public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
27-
public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
28-
public static final String REASON_ERR_IMAGE_PULL = "ErrImagePull";
31+
public static final Set<String> CONTAINER_ERROR_REASONS =
32+
ImmutableSet.of(
33+
"CrashLoopBackOff",
34+
"ImagePullBackOff",
35+
"ErrImagePull",
36+
"RunContainerError",
37+
"CreateContainerConfigError",
38+
"OOMKilled");
2939

3040
private static final long serialVersionUID = -1070179896083579221L;
3141

@@ -36,11 +46,6 @@ public DeploymentFailedException(DeploymentCondition deployCondition) {
3646
this.reason = deployCondition.getReason();
3747
}
3848

39-
public DeploymentFailedException(ContainerStateWaiting stateWaiting) {
40-
super(stateWaiting.getMessage());
41-
this.reason = stateWaiting.getReason();
42-
}
43-
4449
public DeploymentFailedException(String message, String reason) {
4550
super(message);
4651
this.reason = reason;
@@ -49,4 +54,24 @@ public DeploymentFailedException(String message, String reason) {
4954
public String getReason() {
5055
return reason;
5156
}
57+
58+
public static DeploymentFailedException forContainerStatus(ContainerStatus status) {
59+
var waiting = status.getState().getWaiting();
60+
var lastState = status.getLastState();
61+
String message = null;
62+
if ("CrashLoopBackOff".equals(waiting.getReason())
63+
&& lastState != null
64+
&& lastState.getTerminated() != null) {
65+
message =
66+
Optional.ofNullable(lastState.getTerminated().getMessage())
67+
.map(err -> "CrashLoop - " + err)
68+
.orElse(null);
69+
}
70+
71+
if (message == null) {
72+
message = waiting.getMessage();
73+
}
74+
return new DeploymentFailedException(
75+
String.format("[%s] %s", status.getName(), message), waiting.getReason());
76+
}
5277
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
3131
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3232
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
33+
import org.apache.flink.kubernetes.operator.utils.EventUtils;
3334
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
3435

35-
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
3636
import io.fabric8.kubernetes.api.model.ContainerStatus;
3737
import io.fabric8.kubernetes.api.model.Pod;
3838
import io.fabric8.kubernetes.api.model.PodList;
@@ -46,7 +46,7 @@
4646
import java.util.List;
4747
import java.util.Map;
4848
import java.util.Optional;
49-
import java.util.Set;
49+
import java.util.stream.Stream;
5050

5151
/** Base observer for session and application clusters. */
5252
public abstract class AbstractFlinkDeploymentObserver
@@ -135,7 +135,7 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
135135
try {
136136
checkFailedCreate(status);
137137
// checking the pod is expensive; only do it when the deployment isn't ready
138-
checkContainerBackoff(ctx);
138+
checkContainerErrors(ctx);
139139
} catch (DeploymentFailedException dfe) {
140140
// throw only when not already in error status to allow for spec update
141141
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
@@ -172,21 +172,29 @@ private void checkFailedCreate(DeploymentStatus status) {
172172
}
173173
}
174174

175-
private void checkContainerBackoff(FlinkResourceContext<FlinkDeployment> ctx) {
175+
private void checkContainerErrors(FlinkResourceContext<FlinkDeployment> ctx) {
176176
PodList jmPods =
177177
ctx.getFlinkService().getJmPodList(ctx.getResource(), ctx.getObserveConfig());
178178
for (Pod pod : jmPods.getItems()) {
179-
for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
180-
ContainerStateWaiting csw = cs.getState().getWaiting();
181-
if (csw != null
182-
&& Set.of(
183-
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
184-
DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
185-
DeploymentFailedException.REASON_ERR_IMAGE_PULL)
186-
.contains(csw.getReason())) {
187-
throw new DeploymentFailedException(csw);
188-
}
189-
}
179+
var podStatus = pod.getStatus();
180+
Stream.concat(
181+
podStatus.getContainerStatuses().stream(),
182+
podStatus.getInitContainerStatuses().stream())
183+
.forEach(AbstractFlinkDeploymentObserver::checkContainerError);
184+
185+
// No obvious errors were found, check for volume mount issues
186+
EventUtils.checkForVolumeMountErrors(
187+
pod, () -> EventUtils.getPodEvents(ctx.getKubernetesClient(), pod));
188+
}
189+
}
190+
191+
private static void checkContainerError(ContainerStatus cs) {
192+
if (cs.getState() == null || cs.getState().getWaiting() == null) {
193+
return;
194+
}
195+
if (DeploymentFailedException.CONTAINER_ERROR_REASONS.contains(
196+
cs.getState().getWaiting().getReason())) {
197+
throw DeploymentFailedException.forContainerStatus(cs);
190198
}
191199
}
192200

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717

1818
package org.apache.flink.kubernetes.operator.utils;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
22+
2023
import io.fabric8.kubernetes.api.model.Event;
2124
import io.fabric8.kubernetes.api.model.EventBuilder;
2225
import io.fabric8.kubernetes.api.model.HasMetadata;
2326
import io.fabric8.kubernetes.api.model.ObjectMeta;
24-
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
27+
import io.fabric8.kubernetes.api.model.ObjectReference;
28+
import io.fabric8.kubernetes.api.model.Pod;
29+
import io.fabric8.kubernetes.api.model.PodCondition;
2530
import io.fabric8.kubernetes.client.KubernetesClient;
2631
import io.fabric8.kubernetes.client.KubernetesClientException;
2732
import org.slf4j.Logger;
@@ -32,10 +37,15 @@
3237
import java.net.HttpURLConnection;
3338
import java.time.Duration;
3439
import java.time.Instant;
40+
import java.util.ArrayList;
41+
import java.util.List;
3542
import java.util.Map;
3643
import java.util.Optional;
3744
import java.util.function.Consumer;
45+
import java.util.function.Function;
3846
import java.util.function.Predicate;
47+
import java.util.function.Supplier;
48+
import java.util.stream.Collectors;
3949

4050
/**
4151
* The util to generate an event for the target resource. It is copied from
@@ -182,13 +192,7 @@ private static Event buildEvent(
182192
String eventName) {
183193
return new EventBuilder()
184194
.withApiVersion("v1")
185-
.withInvolvedObject(
186-
new ObjectReferenceBuilder()
187-
.withKind(target.getKind())
188-
.withUid(target.getMetadata().getUid())
189-
.withName(target.getMetadata().getName())
190-
.withNamespace(target.getMetadata().getNamespace())
191-
.build())
195+
.withInvolvedObject(getObjectReference(target))
192196
.withType(type.name())
193197
.withReason(reason)
194198
.withFirstTimestamp(Instant.now().toString())
@@ -235,4 +239,78 @@ private static Optional<Event> createOrReplaceEvent(KubernetesClient client, Eve
235239
}
236240
return Optional.empty();
237241
}
242+
243+
public static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
244+
var ref = getObjectReference(pod);
245+
246+
var eventList =
247+
client.v1()
248+
.events()
249+
.inNamespace(pod.getMetadata().getNamespace())
250+
.withInvolvedObject(ref)
251+
.list();
252+
253+
if (eventList == null) {
254+
return new ArrayList<>();
255+
}
256+
257+
var items = eventList.getItems();
258+
if (items == null) {
259+
return new ArrayList<>();
260+
}
261+
return items;
262+
}
263+
264+
@VisibleForTesting
265+
protected static ObjectReference getObjectReference(HasMetadata resource) {
266+
var ref = new ObjectReference();
267+
ref.setApiVersion(resource.getApiVersion());
268+
ref.setKind(resource.getKind());
269+
ref.setName(resource.getMetadata().getName());
270+
ref.setNamespace(resource.getMetadata().getNamespace());
271+
ref.setUid(resource.getMetadata().getUid());
272+
return ref;
273+
}
274+
275+
/**
276+
* Check that pod is stuck during volume mount stage and throw {@link DeploymentFailedException}
277+
* with the right reason message if that's the case.
278+
*
279+
* @param pod Pod to be checked
280+
* @param podEventSupplier supplier for Pod event list. For easy testability
281+
*/
282+
public static void checkForVolumeMountErrors(Pod pod, Supplier<List<Event>> podEventSupplier) {
283+
var conditions = pod.getStatus().getConditions();
284+
if (conditions == null) {
285+
return;
286+
}
287+
var conditionMap =
288+
conditions.stream()
289+
.collect(Collectors.toMap(PodCondition::getType, Function.identity()));
290+
291+
// We use PodReadyToStartContainers if available otherwise use Initialized, but it's only
292+
// there k8s 1.29+
293+
boolean failedInitialization =
294+
checkStatusWasAlways(
295+
pod,
296+
conditionMap.getOrDefault(
297+
"PodReadyToStartContainers", conditionMap.get("Initialized")),
298+
"False");
299+
300+
boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False");
301+
302+
if (notReady && failedInitialization) {
303+
podEventSupplier.get().stream()
304+
.filter(e -> e.getReason().equals("FailedMount"))
305+
.findAny()
306+
.ifPresent(
307+
e -> {
308+
throw new DeploymentFailedException(e.getMessage(), e.getReason());
309+
});
310+
}
311+
}
312+
313+
private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) {
314+
return condition != null && condition.getStatus().equals(status);
315+
}
238316
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,24 @@ public class TestUtils extends BaseTestUtils {
9797
public static PodList createFailedPodList(String crashLoopMessage, String reason) {
9898
ContainerStatus cs =
9999
new ContainerStatusBuilder()
100+
.withName("c1")
101+
.withNewState()
102+
.withNewWaiting()
103+
.withReason(reason)
104+
.withMessage(crashLoopMessage)
105+
.endWaiting()
106+
.endState()
107+
.build();
108+
109+
Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
110+
pod.setStatus(new PodStatusBuilder().withContainerStatuses(cs).build());
111+
return new PodListBuilder().withItems(pod).build();
112+
}
113+
114+
public static PodList createFailedInitContainerPodList(String crashLoopMessage, String reason) {
115+
ContainerStatus cs =
116+
new ContainerStatusBuilder()
117+
.withName("c1")
100118
.withNewState()
101119
.withNewWaiting()
102120
.withReason(reason)
@@ -108,7 +126,8 @@ public static PodList createFailedPodList(String crashLoopMessage, String reason
108126
Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
109127
pod.setStatus(
110128
new PodStatusBuilder()
111-
.withContainerStatuses(Collections.singletonList(cs))
129+
.withContainerStatuses(new ContainerStatusBuilder().withReady().build())
130+
.withInitContainerStatuses(cs)
112131
.build());
113132
return new PodListBuilder().withItems(pod).build();
114133
}

0 commit comments

Comments
 (0)