Skip to content

Commit 5104e13

Browse files
committed
[FLINK-37730]Reverse the exception to get newer exceptions
1 parent 4722465 commit 5104e13

File tree

4 files changed

+15
-20
lines changed

4 files changed

+15
-20
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@
3838

3939
import java.time.Instant;
4040
import java.time.ZoneId;
41+
import java.util.ArrayList;
42+
import java.util.Comparator;
4143
import java.util.HashMap;
44+
import java.util.List;
4245
import java.util.Map;
4346
import java.util.concurrent.TimeoutException;
4447

@@ -129,8 +132,9 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
129132
}
130133

131134
var exceptionHistory = history.getExceptionHistory();
132-
var exceptions = exceptionHistory.getEntries();
133-
if (exceptions.isEmpty()) {
135+
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
136+
exceptionHistory.getEntries();
137+
if (exceptions == null || exceptions.isEmpty()) {
134138
return;
135139
}
136140

@@ -157,13 +161,20 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
157161
int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
158162
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
159163

164+
// Sort and reverse to prioritize the newest exceptions
165+
var sortedExceptions = new ArrayList<>(exceptions);
166+
sortedExceptions.sort(
167+
Comparator.comparingLong(
168+
JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
169+
.reversed());
170+
160171
int count = 0;
161-
for (var exception : exceptions) {
172+
for (var exception : sortedExceptions) {
162173
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
174+
// Skip already recorded exceptions
163175
if (lastRecorded != null && exceptionTime.isBefore(lastRecorded)) {
164176
continue;
165177
}
166-
167178
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
168179
if (++count >= maxEvents) {
169180
break;
@@ -229,7 +240,6 @@ private void emitJobManagerExceptionEvent(
229240
exceptionName.length() > 128 ? exceptionName.substring(0, 128) : exceptionName;
230241

231242
eventRecorder.triggerEventOnceWithAnnotations(
232-
exceptionTime.toEpochMilli(),
233243
ctx.getResource(),
234244
EventRecorder.Type.Warning,
235245
EventRecorder.Reason.JobException,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ public boolean triggerEventOnce(
172172
}
173173

174174
public boolean triggerEventOnceWithAnnotations(
175-
long eventTimestampMillis,
176175
AbstractFlinkResource<?, ?> resource,
177176
Type type,
178177
Reason reason,
@@ -184,7 +183,6 @@ public boolean triggerEventOnceWithAnnotations(
184183
return EventUtils.createWithAnnotationsIfNotExists(
185184
client,
186185
resource,
187-
eventTimestampMillis,
188186
type,
189187
reason.toString(),
190188
message,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.fabric8.kubernetes.api.model.Event;
2424
import io.fabric8.kubernetes.api.model.EventBuilder;
2525
import io.fabric8.kubernetes.api.model.HasMetadata;
26-
import io.fabric8.kubernetes.api.model.MicroTime;
2726
import io.fabric8.kubernetes.api.model.ObjectMeta;
2827
import io.fabric8.kubernetes.api.model.ObjectReference;
2928
import io.fabric8.kubernetes.api.model.Pod;
@@ -38,8 +37,6 @@
3837
import java.net.HttpURLConnection;
3938
import java.time.Duration;
4039
import java.time.Instant;
41-
import java.time.ZoneId;
42-
import java.time.ZonedDateTime;
4340
import java.util.ArrayList;
4441
import java.util.List;
4542
import java.util.Map;
@@ -110,7 +107,6 @@ public static Event findExistingEvent(
110107
public static boolean createWithAnnotationsIfNotExists(
111108
KubernetesClient client,
112109
HasMetadata target,
113-
long eventTime,
114110
EventRecorder.Type type,
115111
String reason,
116112
String message,
@@ -128,7 +124,6 @@ public static boolean createWithAnnotationsIfNotExists(
128124
} else {
129125
Event event = buildEvent(target, type, reason, message, component, eventName);
130126
setAnnotations(event, annotations);
131-
setEventTime(event, Instant.ofEpochMilli(eventTime));
132127
createOrReplaceEvent(client, event).ifPresent(eventListener);
133128
return true;
134129
}
@@ -222,12 +217,6 @@ private static void setAnnotations(Event existing, @Nullable Map<String, String>
222217
existing.getMetadata().setAnnotations(annotations);
223218
}
224219

225-
private static void setEventTime(Event existing, Instant eventTime) {
226-
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(eventTime, ZoneId.of("UTC"));
227-
MicroTime microTime = new MicroTime(zonedDateTime.toString());
228-
existing.setEventTime(microTime);
229-
}
230-
231220
private static Event buildEvent(
232221
HasMetadata target,
233222
EventRecorder.Type type,

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,6 @@ public void accept(Event event) {
600600
EventUtils.createWithAnnotationsIfNotExists(
601601
kubernetesClient,
602602
flinkApp,
603-
System.currentTimeMillis(),
604603
EventRecorder.Type.Normal,
605604
reason,
606605
message,
@@ -631,7 +630,6 @@ public void accept(Event event) {
631630
EventUtils.createWithAnnotationsIfNotExists(
632631
kubernetesClient,
633632
flinkApp,
634-
System.currentTimeMillis(),
635633
EventRecorder.Type.Normal,
636634
reason,
637635
message,

0 commit comments

Comments
 (0)