Skip to content

Commit 4b297e2

Browse files
committed
[FLINK-37730] Improve exception recording ts initialization
1 parent 46ee95c commit 4b297e2

File tree

7 files changed

+193
-51
lines changed

7 files changed

+193
-51
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,5 @@ buildNumber.properties
3636
.idea
3737
*.iml
3838
*.DS_Store
39+
40+
.kube

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2828
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2929
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
30+
import org.apache.flink.kubernetes.operator.utils.EventUtils;
3031
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3132
import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
3233
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -35,12 +36,11 @@
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

39+
import java.time.Duration;
3840
import java.time.Instant;
39-
import java.time.ZoneId;
4041
import java.util.ArrayList;
4142
import java.util.Comparator;
4243
import java.util.HashMap;
43-
import java.util.List;
4444
import java.util.Map;
4545
import java.util.Objects;
4646
import java.util.concurrent.TimeoutException;
@@ -53,6 +53,8 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
5353
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
5454

5555
public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
56+
public static final String EXCEPTION_TIMESTAMP = "exception-timestamp";
57+
public static final Duration MAX_K8S_EVENT_AGE = Duration.ofMinutes(30);
5658

5759
protected final EventRecorder eventRecorder;
5860

@@ -132,8 +134,50 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
132134
}
133135

134136
var exceptionHistory = history.getExceptionHistory();
135-
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
136-
exceptionHistory.getEntries();
137+
var exceptions = exceptionHistory.getEntries();
138+
139+
String currentJobId = jobStatus.getJobId();
140+
var cacheEntry = ctx.getExceptionCacheEntry();
141+
142+
if (!cacheEntry.isInitialized()) {
143+
144+
Instant lastExceptionTs;
145+
if (exceptions == null || exceptions.isEmpty()) {
146+
// If the job doesn't have any exceptions set to MIN as we always have to record
147+
// the next
148+
lastExceptionTs = Instant.MIN;
149+
} else {
150+
var k8sExpirationTs = Instant.now().minus(MAX_K8S_EVENT_AGE);
151+
var maxJobExceptionTs =
152+
exceptions.stream()
153+
.map(e -> Instant.ofEpochMilli(e.getTimestamp()))
154+
.max(Comparator.naturalOrder())
155+
.orElseThrow();
156+
157+
if (maxJobExceptionTs.isBefore(k8sExpirationTs)) {
158+
// If the last job exception was a long time ago, then there is no point in
159+
// checking in k8s.
160+
lastExceptionTs = maxJobExceptionTs;
161+
} else {
162+
// If there were recent exceptions, we check the triggered events from kube
163+
// to make sure we don't double trigger
164+
lastExceptionTs =
165+
EventUtils.findLastJobExceptionTsFromK8s(
166+
ctx.getKubernetesClient(), resource)
167+
.orElse(Instant.now().minus(MAX_K8S_EVENT_AGE));
168+
}
169+
}
170+
171+
cacheEntry.setLastTimestamp(lastExceptionTs);
172+
cacheEntry.setInitialized(true);
173+
cacheEntry.setJobId(currentJobId);
174+
}
175+
176+
var lastRecorded =
177+
currentJobId.equals(cacheEntry.getJobId())
178+
? cacheEntry.getLastTimestamp()
179+
: Instant.MIN;
180+
137181
if (exceptions == null || exceptions.isEmpty()) {
138182
return;
139183
}
@@ -144,19 +188,6 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
144188
jobId);
145189
}
146190

147-
String currentJobId = jobStatus.getJobId();
148-
Instant lastRecorded = null; // first reconciliation
149-
150-
var cacheEntry = ctx.getExceptionCacheEntry();
151-
// a cache entry is created should always be present. The timestamp for the first
152-
// reconciliation would be
153-
// when the job was created. This check is still necessary because even though there
154-
// might be an entry,
155-
// the jobId could have changed since the job was first created.
156-
if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) {
157-
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
158-
}
159-
160191
int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
161192
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
162193

@@ -172,7 +203,7 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
172203
for (var exception : sortedExceptions) {
173204
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
174205
// Skip already recorded exceptions
175-
if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) {
206+
if (!exceptionTime.isAfter(lastRecorded)) {
176207
break;
177208
}
178209
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
@@ -188,7 +219,7 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
188219
// Set to the timestamp of the latest emitted exception, if any were emitted
189220
// the other option is that if no exceptions were emitted, we set this to now.
190221
if (latestSeen != null) {
191-
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
222+
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen);
192223
}
193224

194225
} catch (Exception e) {
@@ -203,9 +234,7 @@ private void emitJobManagerExceptionEvent(
203234
int maxStackTraceLines) {
204235
Map<String, String> annotations = new HashMap<>();
205236
if (exceptionTime != null) {
206-
annotations.put(
207-
"exception-timestamp",
208-
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
237+
annotations.put(EXCEPTION_TIMESTAMP, exceptionTime.toString());
209238
}
210239
if (exception.getTaskName() != null) {
211240
annotations.put("task-name", exception.getTaskName());

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
4545

46+
import java.time.Instant;
4647
import java.util.Map;
4748
import java.util.concurrent.ConcurrentHashMap;
4849
import java.util.concurrent.ExecutorService;
@@ -57,12 +58,8 @@ public class FlinkResourceContextFactory {
5758
@Data
5859
public static final class ExceptionCacheEntry {
5960
private String jobId;
60-
private long lastTimestamp;
61-
62-
public ExceptionCacheEntry(String jobId, long lastTimestamp) {
63-
this.jobId = jobId;
64-
this.lastTimestamp = lastTimestamp;
65-
}
61+
private Instant lastTimestamp;
62+
private boolean initialized;
6663
}
6764

6865
@VisibleForTesting
@@ -128,10 +125,7 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
128125
configManager,
129126
this::getFlinkService,
130127
lastRecordedExceptionCache.computeIfAbsent(
131-
resourceId,
132-
id ->
133-
new ExceptionCacheEntry(
134-
flinkDepJobId, System.currentTimeMillis())));
128+
resourceId, id -> new ExceptionCacheEntry()));
135129
} else if (resource instanceof FlinkSessionJob) {
136130
var resourceId = ResourceID.fromResource(resource);
137131
var flinkSessionJobId = jobId;
@@ -143,11 +137,7 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
143137
configManager,
144138
this::getFlinkService,
145139
lastRecordedExceptionCache.computeIfAbsent(
146-
resourceId,
147-
id ->
148-
new ExceptionCacheEntry(
149-
flinkSessionJobId,
150-
System.currentTimeMillis())));
140+
resourceId, id -> new ExceptionCacheEntry()));
151141
} else {
152142
throw new IllegalArgumentException(
153143
"Unknown resource type " + resource.getClass().getSimpleName());

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
22+
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
2223

2324
import io.fabric8.kubernetes.api.model.Event;
2425
import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -38,6 +39,7 @@
3839
import java.time.Duration;
3940
import java.time.Instant;
4041
import java.util.ArrayList;
42+
import java.util.Comparator;
4143
import java.util.List;
4244
import java.util.Map;
4345
import java.util.Optional;
@@ -283,13 +285,13 @@ private static Optional<Event> createOrReplaceEvent(KubernetesClient client, Eve
283285
return Optional.empty();
284286
}
285287

286-
private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
287-
var ref = getObjectReference(pod);
288+
private static List<Event> getResourceEvents(KubernetesClient client, HasMetadata cr) {
289+
var ref = getObjectReference(cr);
288290

289291
var eventList =
290292
client.v1()
291293
.events()
292-
.inNamespace(pod.getMetadata().getNamespace())
294+
.inNamespace(cr.getMetadata().getNamespace())
293295
.withInvolvedObject(ref)
294296
.list();
295297

@@ -343,7 +345,7 @@ public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
343345
boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False");
344346

345347
if (notReady && failedInitialization) {
346-
getPodEvents(client, pod).stream()
348+
getResourceEvents(client, pod).stream()
347349
.filter(e -> e.getReason().equals("FailedMount"))
348350
.findAny()
349351
.ifPresent(
@@ -356,4 +358,20 @@ public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
356358
private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) {
357359
return condition != null && condition.getStatus().equals(status);
358360
}
361+
362+
public static Optional<Instant> findLastJobExceptionTsFromK8s(
363+
KubernetesClient client, HasMetadata cr) {
364+
var events = getResourceEvents(client, cr);
365+
return events.stream()
366+
.filter(e -> EventRecorder.Reason.JobException.name().equals(e.getReason()))
367+
.map(
368+
e ->
369+
Instant.parse(
370+
e.getMetadata()
371+
.getAnnotations()
372+
.getOrDefault(
373+
JobStatusObserver.EXCEPTION_TIMESTAMP,
374+
e.getMetadata().getCreationTimestamp())))
375+
.max(Comparator.naturalOrder());
376+
}
359377
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -749,10 +749,6 @@ public void addExceptionHistory(
749749
stackTrace,
750750
timestamp,
751751
Map.of("label-key", "label-value"),
752-
"task-name-1",
753-
"location-1",
754-
"endpoint-1",
755-
"tm-id-1",
756752
List.of() // concurrentExceptions
757753
);
758754

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
import org.junit.jupiter.params.provider.EnumSource;
4444
import org.junit.jupiter.params.provider.MethodSource;
4545

46+
import java.time.Duration;
47+
import java.time.Instant;
48+
import java.time.temporal.ChronoUnit;
4649
import java.util.ArrayList;
4750
import java.util.HashMap;
4851
import java.util.Map;
@@ -167,8 +170,9 @@ public void testExceptionObservedEvenWhenNewStateIsTerminal() throws Exception {
167170
getResourceContext(deployment, operatorConfig);
168171

169172
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
173+
ctx.getExceptionCacheEntry().setInitialized(true);
170174
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
171-
ctx.getExceptionCacheEntry().setLastTimestamp(500L);
175+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
172176
flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L);
173177

174178
// Ensure jobFailedErr is null before the observe call
@@ -210,8 +214,9 @@ public void testExceptionNotObservedWhenOldStateIsTerminal() throws Exception {
210214
getResourceContext(deployment, operatorConfig);
211215

212216
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
217+
ctx.getExceptionCacheEntry().setInitialized(true);
213218
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
214-
ctx.getExceptionCacheEntry().setLastTimestamp(500L);
219+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
215220
flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L);
216221

217222
// Ensure jobFailedErr is null before the observe call
@@ -247,8 +252,9 @@ public void testExceptionLimitConfig() throws Exception {
247252
getResourceContext(deployment, operatorConfig); // set a non-terminal state
248253

249254
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
255+
ctx.getExceptionCacheEntry().setInitialized(true);
250256
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
251-
ctx.getExceptionCacheEntry().setLastTimestamp(500L);
257+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
252258

253259
flinkService.submitApplicationCluster(
254260
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
@@ -289,8 +295,9 @@ public void testStackTraceTruncationConfig() throws Exception {
289295
flinkService.submitApplicationCluster(
290296
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
291297
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
298+
ctx.getExceptionCacheEntry().setInitialized(true);
292299
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
293-
ctx.getExceptionCacheEntry().setLastTimestamp(3000L);
300+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));
294301

295302
long exceptionTime = 4000L;
296303
String longTrace = "line1\nline2\nline3\nline4";
@@ -323,8 +330,9 @@ public void testIgnoreOldExceptions() throws Exception {
323330
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
324331

325332
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);
333+
ctx.getExceptionCacheEntry().setInitialized(true);
326334
ctx.getExceptionCacheEntry().setJobId(deployment.getStatus().getJobStatus().getJobId());
327-
ctx.getExceptionCacheEntry().setLastTimestamp(2500L);
335+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L));
328336

329337
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
330338
flinkService.submitApplicationCluster(
@@ -365,6 +373,51 @@ public void testIgnoreOldExceptions() throws Exception {
365373
assertTrue(events.get(0).getMessage().contains("org.apache.NewException"));
366374
}
367375

376+
@Test
377+
public void testExceptionEventTriggerInitialization() throws Exception {
378+
var deployment = initDeployment();
379+
var status = deployment.getStatus();
380+
var jobStatus = status.getJobStatus();
381+
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
382+
383+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);
384+
385+
var now = Instant.now();
386+
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
387+
flinkService.submitApplicationCluster(
388+
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
389+
390+
// Old exception that happened outside of kubernetes event retention should be ignored
391+
flinkService.addExceptionHistory(
392+
jobId,
393+
"OldException",
394+
"OldException",
395+
now.minus(Duration.ofHours(1)).toEpochMilli());
396+
flinkService.addExceptionHistory(
397+
jobId,
398+
"NewException",
399+
"NewException",
400+
now.minus(Duration.ofMinutes(1)).toEpochMilli());
401+
402+
// Ensure jobFailedErr is null before the observe call
403+
flinkService.setJobFailedErr(null);
404+
observer.observe(ctx);
405+
406+
var events =
407+
kubernetesClient
408+
.v1()
409+
.events()
410+
.inNamespace(deployment.getMetadata().getNamespace())
411+
.list()
412+
.getItems();
413+
assertEquals(1, events.size());
414+
assertTrue(events.get(0).getMessage().contains("NewException"));
415+
assertTrue(ctx.getExceptionCacheEntry().isInitialized());
416+
assertEquals(
417+
now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS),
418+
ctx.getExceptionCacheEntry().getLastTimestamp());
419+
}
420+
368421
private static Stream<Arguments> cancellingArgs() {
369422
var args = new ArrayList<Arguments>();
370423
for (var status : JobStatus.values()) {

0 commit comments

Comments
 (0)