Skip to content

Commit 8f1cab7

Browse files
committed
[FLINK-37730]Fixes the failures
1 parent 5104e13 commit 8f1cab7

File tree

4 files changed

+41
-25
lines changed

4 files changed

+41
-25
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.api.common.JobStatus;
22-
import org.apache.flink.autoscaler.utils.DateTimeUtils;
2322
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2423
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
2524
import org.apache.flink.kubernetes.operator.api.spec.JobState;
@@ -43,6 +42,7 @@
4342
import java.util.HashMap;
4443
import java.util.List;
4544
import java.util.Map;
45+
import java.util.Objects;
4646
import java.util.concurrent.TimeoutException;
4747

4848
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
@@ -153,11 +153,10 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
153153
// when the job was created. This check is still necessary because even though there
154154
// might be an entry,
155155
// the jobId could have changed since the job was first created.
156-
if (cacheEntry.getJobId().equals(currentJobId)) {
156+
if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) {
157157
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
158158
}
159159

160-
Instant now = Instant.now();
161160
int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
162161
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
163162

@@ -167,21 +166,31 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
167166
Comparator.comparingLong(
168167
JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
169168
.reversed());
170-
171169
int count = 0;
170+
Instant latestSeen = null;
171+
172172
for (var exception : sortedExceptions) {
173173
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
174174
// Skip already recorded exceptions
175-
if (lastRecorded != null && exceptionTime.isBefore(lastRecorded)) {
176-
continue;
175+
if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) {
176+
break;
177177
}
178178
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
179+
if (latestSeen == null || exceptionTime.isAfter(latestSeen)) {
180+
latestSeen = exceptionTime;
181+
}
179182
if (++count >= maxEvents) {
180183
break;
181184
}
182185
}
186+
183187
ctx.getExceptionCacheEntry().setJobId(currentJobId);
184-
ctx.getExceptionCacheEntry().setLastTimestamp(now.toEpochMilli());
188+
// Set to the timestamp of the latest emitted exception, if any were emitted
189+
// the other option is that if no exceptions were emitted, we set this to now.
190+
if (latestSeen != null) {
191+
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
192+
}
193+
185194
} catch (Exception e) {
186195
LOG.warn("Failed to fetch JobManager exception info.", e);
187196
}
@@ -197,13 +206,12 @@ private void emitJobManagerExceptionEvent(
197206
if (exceptionName == null || exceptionName.isBlank()) {
198207
return;
199208
}
200-
201209
Map<String, String> annotations = new HashMap<>();
202-
annotations.put(
203-
"event-time-readable",
204-
DateTimeUtils.readable(exceptionTime, ZoneId.systemDefault()));
205-
annotations.put("event-timestamp-millis", String.valueOf(exceptionTime.toEpochMilli()));
206-
210+
if (exceptionTime != null) {
211+
annotations.put(
212+
"exception-timestamp",
213+
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
214+
}
207215
if (exception.getTaskName() != null) {
208216
annotations.put("task-name", exception.getTaskName());
209217
}
@@ -213,7 +221,6 @@ private void emitJobManagerExceptionEvent(
213221
if (exception.getTaskManagerId() != null) {
214222
annotations.put("tm-id", exception.getTaskManagerId());
215223
}
216-
217224
if (exception.getFailureLabels() != null) {
218225
exception
219226
.getFailureLabels()
@@ -236,16 +243,16 @@ private void emitJobManagerExceptionEvent(
236243
}
237244
}
238245

239-
String keyMessage =
240-
exceptionName.length() > 128 ? exceptionName.substring(0, 128) : exceptionName;
241-
242-
eventRecorder.triggerEventOnceWithAnnotations(
246+
String identityKey =
247+
"jobmanager-exception-"
248+
+ Integer.toHexString(Objects.hash(eventMessage.toString()));
249+
eventRecorder.triggerEventWithAnnotations(
243250
ctx.getResource(),
244251
EventRecorder.Type.Warning,
245252
EventRecorder.Reason.JobException,
246253
eventMessage.toString().trim(),
247254
EventRecorder.Component.JobManagerDeployment,
248-
"jobmanager-exception-" + keyMessage.hashCode(),
255+
identityKey,
249256
ctx.getKubernetesClient(),
250257
K8sAnnotationsSanitizer.sanitizeAnnotations(annotations));
251258
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public boolean triggerEventOnce(
171171
messageKey);
172172
}
173173

174-
public boolean triggerEventOnceWithAnnotations(
174+
public boolean triggerEventWithAnnotations(
175175
AbstractFlinkResource<?, ?> resource,
176176
Type type,
177177
Reason reason,
@@ -180,7 +180,7 @@ public boolean triggerEventOnceWithAnnotations(
180180
String messageKey,
181181
KubernetesClient client,
182182
Map<String, String> annotations) {
183-
return EventUtils.createWithAnnotationsIfNotExists(
183+
return EventUtils.createWithAnnotations(
184184
client,
185185
resource,
186186
type,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,11 @@ public static Event findExistingEvent(
104104
.get();
105105
}
106106

107-
public static boolean createWithAnnotationsIfNotExists(
107+
/**
108+
* Create or update an event for the target resource. If the event already exists, it will be
109+
* updated with the new annotations, message and the count will be increased.
110+
*/
111+
public static boolean createWithAnnotations(
108112
KubernetesClient client,
109113
HasMetadata target,
110114
EventRecorder.Type type,
@@ -120,6 +124,11 @@ public static boolean createWithAnnotationsIfNotExists(
120124
Event existing = findExistingEvent(client, target, eventName);
121125

122126
if (existing != null) {
127+
existing.setLastTimestamp(Instant.now().toString());
128+
existing.setCount(existing.getCount() + 1);
129+
existing.setMessage(message);
130+
setAnnotations(existing, annotations);
131+
createOrReplaceEvent(client, existing).ifPresent(eventListener);
123132
return false;
124133
} else {
125134
Event event = buildEvent(target, type, reason, message, component, eventName);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ public void accept(Event event) {
574574
}
575575

576576
@Test
577-
public void testCreateWithAnnotationsIfNotExists() {
577+
public void testCreateWithAnnotations() {
578578
var consumer =
579579
new Consumer<Event>() {
580580
@Override
@@ -597,7 +597,7 @@ public void accept(Event event) {
597597

598598
// First call should create the event
599599
boolean created =
600-
EventUtils.createWithAnnotationsIfNotExists(
600+
EventUtils.createWithAnnotations(
601601
kubernetesClient,
602602
flinkApp,
603603
EventRecorder.Type.Normal,
@@ -627,7 +627,7 @@ public void accept(Event event) {
627627
// Second call with same key should not create new event
628628
eventConsumed = null;
629629
boolean createdAgain =
630-
EventUtils.createWithAnnotationsIfNotExists(
630+
EventUtils.createWithAnnotations(
631631
kubernetesClient,
632632
flinkApp,
633633
EventRecorder.Type.Normal,

0 commit comments

Comments
 (0)