Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ buildNumber.properties
.idea
*.iml
*.DS_Store

.kube
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
import org.apache.flink.runtime.client.JobStatusMessage;
Expand All @@ -35,12 +36,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
Expand All @@ -53,6 +54,8 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);

public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
public static final String EXCEPTION_TIMESTAMP = "exception-timestamp";
public static final Duration MAX_K8S_EVENT_AGE = Duration.ofMinutes(30);

protected final EventRecorder eventRecorder;

Expand Down Expand Up @@ -132,65 +135,77 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
}

var exceptionHistory = history.getExceptionHistory();
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
exceptionHistory.getEntries();
if (exceptions == null || exceptions.isEmpty()) {
return;
var exceptions = exceptionHistory.getEntries();
if (exceptions != null) {
exceptions = new ArrayList<>(exceptions);
exceptions.sort(
Comparator.comparingLong(
JobExceptionsInfoWithHistory.RootExceptionInfo
::getTimestamp)
.reversed());
} else {
exceptions = Collections.emptyList();
}

if (exceptionHistory.isTruncated()) {
LOG.warn(
"Job exception history is truncated for jobId '{}'. Some exceptions may be missing.",
jobId);
String currentJobId = jobStatus.getJobId();
var cacheEntry = ctx.getExceptionCacheEntry();

if (!cacheEntry.isInitialized()) {
Instant lastExceptionTs;
if (exceptions.isEmpty()) {
// If the job doesn't have any exceptions set to MIN as we always have to record
// the next
lastExceptionTs = Instant.MIN;
} else {
var k8sExpirationTs = Instant.now().minus(MAX_K8S_EVENT_AGE);
var maxJobExceptionTs = Instant.ofEpochMilli(exceptions.get(0).getTimestamp());
if (maxJobExceptionTs.isBefore(k8sExpirationTs)) {
// If the last job exception was a long time ago, then there is no point in
// checking in k8s. We won't report this as exception
lastExceptionTs = maxJobExceptionTs;
} else {
// If there were recent exceptions, we check the triggered events from kube
// to make sure we don't double trigger
lastExceptionTs =
EventUtils.findLastJobExceptionTsFromK8s(
ctx.getKubernetesClient(), resource)
.orElse(k8sExpirationTs);
}
}

cacheEntry.setLastTimestamp(lastExceptionTs);
cacheEntry.setInitialized(true);
cacheEntry.setJobId(currentJobId);
}

String currentJobId = jobStatus.getJobId();
Instant lastRecorded = null; // first reconciliation
var lastRecorded =
currentJobId.equals(cacheEntry.getJobId())
? cacheEntry.getLastTimestamp()
: Instant.MIN;

var cacheEntry = ctx.getExceptionCacheEntry();
// a cache entry is created should always be present. The timestamp for the first
// reconciliation would be
// when the job was created. This check is still necessary because even though there
// might be an entry,
// the jobId could have changed since the job was first created.
if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) {
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
if (exceptions.isEmpty()) {
return;
}

int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();

// Sort and reverse to prioritize the newest exceptions
var sortedExceptions = new ArrayList<>(exceptions);
sortedExceptions.sort(
Comparator.comparingLong(
JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
.reversed());
int count = 0;
Instant latestSeen = null;

for (var exception : sortedExceptions) {
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
// Skip already recorded exceptions
if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) {
for (var exception : exceptions) {
var exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
// Skip already recorded exceptions and after max count
if (!exceptionTime.isAfter(lastRecorded) || count++ >= maxEvents) {
break;
}
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
if (latestSeen == null) {
latestSeen = exceptionTime;
}
if (++count >= maxEvents) {
break;
}
}

ctx.getExceptionCacheEntry().setJobId(currentJobId);
// Set to the timestamp of the latest emitted exception, if any were emitted
// the other option is that if no exceptions were emitted, we set this to now.
if (latestSeen != null) {
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
if (count > maxEvents) {
LOG.warn("Job exception history is truncated. Some exceptions may be missing.");
}

cacheEntry.setJobId(currentJobId);
cacheEntry.setLastTimestamp(Instant.ofEpochMilli(exceptions.get(0).getTimestamp()));
} catch (Exception e) {
LOG.warn("Failed to fetch JobManager exception info.", e);
}
Expand All @@ -203,9 +218,7 @@ private void emitJobManagerExceptionEvent(
int maxStackTraceLines) {
Map<String, String> annotations = new HashMap<>();
if (exceptionTime != null) {
annotations.put(
"exception-timestamp",
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
annotations.put(EXCEPTION_TIMESTAMP, exceptionTime.toString());
}
if (exception.getTaskName() != null) {
annotations.put("task-name", exception.getTaskName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -57,12 +58,8 @@ public class FlinkResourceContextFactory {
@Data
public static final class ExceptionCacheEntry {
private String jobId;
private long lastTimestamp;

public ExceptionCacheEntry(String jobId, long lastTimestamp) {
this.jobId = jobId;
this.lastTimestamp = lastTimestamp;
}
private Instant lastTimestamp;
private boolean initialized;
}

@VisibleForTesting
Expand Down Expand Up @@ -128,10 +125,7 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
configManager,
this::getFlinkService,
lastRecordedExceptionCache.computeIfAbsent(
resourceId,
id ->
new ExceptionCacheEntry(
flinkDepJobId, System.currentTimeMillis())));
resourceId, id -> new ExceptionCacheEntry()));
} else if (resource instanceof FlinkSessionJob) {
var resourceId = ResourceID.fromResource(resource);
var flinkSessionJobId = jobId;
Expand All @@ -143,11 +137,7 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
configManager,
this::getFlinkService,
lastRecordedExceptionCache.computeIfAbsent(
resourceId,
id ->
new ExceptionCacheEntry(
flinkSessionJobId,
System.currentTimeMillis())));
resourceId, id -> new ExceptionCacheEntry()));
} else {
throw new IllegalArgumentException(
"Unknown resource type " + resource.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
Expand All @@ -38,6 +39,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -283,13 +285,13 @@ private static Optional<Event> createOrReplaceEvent(KubernetesClient client, Eve
return Optional.empty();
}

private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
var ref = getObjectReference(pod);
private static List<Event> getResourceEvents(KubernetesClient client, HasMetadata cr) {
var ref = getObjectReference(cr);

var eventList =
client.v1()
.events()
.inNamespace(pod.getMetadata().getNamespace())
.inNamespace(cr.getMetadata().getNamespace())
.withInvolvedObject(ref)
.list();

Expand Down Expand Up @@ -343,7 +345,7 @@ public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False");

if (notReady && failedInitialization) {
getPodEvents(client, pod).stream()
getResourceEvents(client, pod).stream()
.filter(e -> e.getReason().equals("FailedMount"))
.findAny()
.ifPresent(
Expand All @@ -356,4 +358,20 @@ public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) {
return condition != null && condition.getStatus().equals(status);
}

public static Optional<Instant> findLastJobExceptionTsFromK8s(
KubernetesClient client, HasMetadata cr) {
var events = getResourceEvents(client, cr);
return events.stream()
.filter(e -> EventRecorder.Reason.JobException.name().equals(e.getReason()))
.map(
e ->
Instant.parse(
e.getMetadata()
.getAnnotations()
.getOrDefault(
JobStatusObserver.EXCEPTION_TIMESTAMP,
e.getMetadata().getCreationTimestamp())))
.max(Comparator.naturalOrder());
}
}
Loading