Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ buildNumber.properties
.idea
*.iml
*.DS_Store

.kube
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
import org.apache.flink.runtime.client.JobStatusMessage;
Expand All @@ -35,12 +36,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
Expand All @@ -53,6 +53,8 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);

public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
public static final String EXCEPTION_TIMESTAMP = "exception-timestamp";
public static final Duration MAX_K8S_EVENT_AGE = Duration.ofMinutes(30);

protected final EventRecorder eventRecorder;

Expand Down Expand Up @@ -132,8 +134,50 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
}

var exceptionHistory = history.getExceptionHistory();
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
exceptionHistory.getEntries();
var exceptions = exceptionHistory.getEntries();

String currentJobId = jobStatus.getJobId();
var cacheEntry = ctx.getExceptionCacheEntry();

if (!cacheEntry.isInitialized()) {

Instant lastExceptionTs;
if (exceptions == null || exceptions.isEmpty()) {
// If the job doesn't have any exceptions set to MIN as we always have to record
// the next
lastExceptionTs = Instant.MIN;
} else {
var k8sExpirationTs = Instant.now().minus(MAX_K8S_EVENT_AGE);
var maxJobExceptionTs =
exceptions.stream()
.map(e -> Instant.ofEpochMilli(e.getTimestamp()))
.max(Comparator.naturalOrder())
.orElseThrow();

if (maxJobExceptionTs.isBefore(k8sExpirationTs)) {
// If the last job exception was a long time ago, then there is no point in
// checking in k8s.
lastExceptionTs = maxJobExceptionTs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this optimization? It complicates the code by adding another setting. It also requires the user to tune just another setting. There is no harm in calling out to the k8s api regularly to fetch events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no config for this (nothing to tune) and the optimization can be very important when the operator starts up because then the cache is empty and it would fetch events for every single job. In most cases this filter completely eliminates that so this greatly reduces the startup api server load

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. The value is hardcoded. We would only query for the jobs with exceptions, but still those could amount to quite some jobs.

} else {
// If there were recent exceptions, we check the triggered events from kube
// to make sure we don't double trigger
lastExceptionTs =
EventUtils.findLastJobExceptionTsFromK8s(
ctx.getKubernetesClient(), resource)
.orElse(Instant.now().minus(MAX_K8S_EVENT_AGE));
Copy link
Contributor

@vsantwana vsantwana May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.orElse(Instant.now().minus(MAX_K8S_EVENT_AGE));
.orElse(k8sExpirationTs);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I cleaned up / simplified the duplicated code in the method in a new commit, please check :)

}
}

cacheEntry.setLastTimestamp(lastExceptionTs);
cacheEntry.setInitialized(true);
cacheEntry.setJobId(currentJobId);
}

var lastRecorded =
currentJobId.equals(cacheEntry.getJobId())
? cacheEntry.getLastTimestamp()
: Instant.MIN;

if (exceptions == null || exceptions.isEmpty()) {
return;
}
Expand All @@ -144,19 +188,6 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
jobId);
}

String currentJobId = jobStatus.getJobId();
Instant lastRecorded = null; // first reconciliation

var cacheEntry = ctx.getExceptionCacheEntry();
// a cache entry is created should always be present. The timestamp for the first
// reconciliation would be
// when the job was created. This check is still necessary because even though there
// might be an entry,
// the jobId could have changed since the job was first created.
if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) {
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
}

int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();

Expand All @@ -172,7 +203,7 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
for (var exception : sortedExceptions) {
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
// Skip already recorded exceptions
if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) {
if (!exceptionTime.isAfter(lastRecorded)) {
break;
}
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
Expand All @@ -188,7 +219,7 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
// Set to the timestamp of the latest emitted exception, if any were emitted
// the other option is that if no exceptions were emitted, we set this to now.
if (latestSeen != null) {
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen);
}

} catch (Exception e) {
Expand All @@ -203,9 +234,7 @@ private void emitJobManagerExceptionEvent(
int maxStackTraceLines) {
Map<String, String> annotations = new HashMap<>();
if (exceptionTime != null) {
annotations.put(
"exception-timestamp",
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
annotations.put(EXCEPTION_TIMESTAMP, exceptionTime.toString());
}
if (exception.getTaskName() != null) {
annotations.put("task-name", exception.getTaskName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -57,12 +58,8 @@ public class FlinkResourceContextFactory {
@Data
public static final class ExceptionCacheEntry {
private String jobId;
private long lastTimestamp;

public ExceptionCacheEntry(String jobId, long lastTimestamp) {
this.jobId = jobId;
this.lastTimestamp = lastTimestamp;
}
private Instant lastTimestamp;
private boolean initialized;
}

@VisibleForTesting
Expand Down Expand Up @@ -128,10 +125,7 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
configManager,
this::getFlinkService,
lastRecordedExceptionCache.computeIfAbsent(
resourceId,
id ->
new ExceptionCacheEntry(
flinkDepJobId, System.currentTimeMillis())));
resourceId, id -> new ExceptionCacheEntry()));
} else if (resource instanceof FlinkSessionJob) {
var resourceId = ResourceID.fromResource(resource);
var flinkSessionJobId = jobId;
Expand All @@ -143,11 +137,7 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
configManager,
this::getFlinkService,
lastRecordedExceptionCache.computeIfAbsent(
resourceId,
id ->
new ExceptionCacheEntry(
flinkSessionJobId,
System.currentTimeMillis())));
resourceId, id -> new ExceptionCacheEntry()));
} else {
throw new IllegalArgumentException(
"Unknown resource type " + resource.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
Expand All @@ -38,6 +39,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -283,13 +285,13 @@ private static Optional<Event> createOrReplaceEvent(KubernetesClient client, Eve
return Optional.empty();
}

private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
var ref = getObjectReference(pod);
private static List<Event> getResourceEvents(KubernetesClient client, HasMetadata cr) {
var ref = getObjectReference(cr);

var eventList =
client.v1()
.events()
.inNamespace(pod.getMetadata().getNamespace())
.inNamespace(cr.getMetadata().getNamespace())
.withInvolvedObject(ref)
.list();

Expand Down Expand Up @@ -343,7 +345,7 @@ public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False");

if (notReady && failedInitialization) {
getPodEvents(client, pod).stream()
getResourceEvents(client, pod).stream()
.filter(e -> e.getReason().equals("FailedMount"))
.findAny()
.ifPresent(
Expand All @@ -356,4 +358,20 @@ public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) {
return condition != null && condition.getStatus().equals(status);
}

public static Optional<Instant> findLastJobExceptionTsFromK8s(
KubernetesClient client, HasMetadata cr) {
var events = getResourceEvents(client, cr);
return events.stream()
.filter(e -> EventRecorder.Reason.JobException.name().equals(e.getReason()))
.map(
e ->
Instant.parse(
e.getMetadata()
.getAnnotations()
.getOrDefault(
JobStatusObserver.EXCEPTION_TIMESTAMP,
e.getMetadata().getCreationTimestamp())))
.max(Comparator.naturalOrder());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,6 @@ public void addExceptionHistory(
stackTrace,
timestamp,
Map.of("label-key", "label-value"),
"task-name-1",
"location-1",
"endpoint-1",
"tm-id-1",
List.of() // concurrentExceptions
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -167,8 +170,9 @@ public void testExceptionObservedEvenWhenNewStateIsTerminal() throws Exception {
getResourceContext(deployment, operatorConfig);

var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(500L);
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L);

// Ensure jobFailedErr is null before the observe call
Expand Down Expand Up @@ -210,8 +214,9 @@ public void testExceptionNotObservedWhenOldStateIsTerminal() throws Exception {
getResourceContext(deployment, operatorConfig);

var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(500L);
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L);

// Ensure jobFailedErr is null before the observe call
Expand Down Expand Up @@ -247,8 +252,9 @@ public void testExceptionLimitConfig() throws Exception {
getResourceContext(deployment, operatorConfig); // set a non-terminal state

var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(500L);
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));

flinkService.submitApplicationCluster(
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
Expand Down Expand Up @@ -289,8 +295,9 @@ public void testStackTraceTruncationConfig() throws Exception {
flinkService.submitApplicationCluster(
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(3000L);
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));

long exceptionTime = 4000L;
String longTrace = "line1\nline2\nline3\nline4";
Expand Down Expand Up @@ -323,8 +330,9 @@ public void testIgnoreOldExceptions() throws Exception {
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state

FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(deployment.getStatus().getJobStatus().getJobId());
ctx.getExceptionCacheEntry().setLastTimestamp(2500L);
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L));

var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
flinkService.submitApplicationCluster(
Expand Down Expand Up @@ -365,6 +373,51 @@ public void testIgnoreOldExceptions() throws Exception {
assertTrue(events.get(0).getMessage().contains("org.apache.NewException"));
}

@Test
public void testExceptionEventTriggerInitialization() throws Exception {
var deployment = initDeployment();
var status = deployment.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state

FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);

var now = Instant.now();
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
flinkService.submitApplicationCluster(
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);

// Old exception that happened outside of kubernetes event retention should be ignored
flinkService.addExceptionHistory(
jobId,
"OldException",
"OldException",
now.minus(Duration.ofHours(1)).toEpochMilli());
flinkService.addExceptionHistory(
jobId,
"NewException",
"NewException",
now.minus(Duration.ofMinutes(1)).toEpochMilli());

// Ensure jobFailedErr is null before the observe call
flinkService.setJobFailedErr(null);
observer.observe(ctx);

var events =
kubernetesClient
.v1()
.events()
.inNamespace(deployment.getMetadata().getNamespace())
.list()
.getItems();
assertEquals(1, events.size());
assertTrue(events.get(0).getMessage().contains("NewException"));
assertTrue(ctx.getExceptionCacheEntry().isInitialized());
assertEquals(
now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS),
ctx.getExceptionCacheEntry().getLastTimestamp());
}

private static Stream<Arguments> cancellingArgs() {
var args = new ArrayList<Arguments>();
for (var status : JobStatus.values()) {
Expand Down
Loading