2727import org .apache .flink .kubernetes .operator .controller .FlinkResourceContext ;
2828import org .apache .flink .kubernetes .operator .reconciler .ReconciliationUtils ;
2929import org .apache .flink .kubernetes .operator .utils .EventRecorder ;
30+ import org .apache .flink .kubernetes .operator .utils .EventUtils ;
3031import org .apache .flink .kubernetes .operator .utils .ExceptionUtils ;
3132import org .apache .flink .kubernetes .operator .utils .K8sAnnotationsSanitizer ;
3233import org .apache .flink .runtime .client .JobStatusMessage ;
3536import org .slf4j .Logger ;
3637import org .slf4j .LoggerFactory ;
3738
39+ import java .time .Duration ;
3840import java .time .Instant ;
39- import java .time .ZoneId ;
4041import java .util .ArrayList ;
42+ import java .util .Collections ;
4143import java .util .Comparator ;
4244import java .util .HashMap ;
43- import java .util .List ;
4445import java .util .Map ;
4546import java .util .Objects ;
4647import java .util .concurrent .TimeoutException ;
@@ -53,6 +54,8 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
5354 private static final Logger LOG = LoggerFactory .getLogger (JobStatusObserver .class );
5455
5556 public static final String JOB_NOT_FOUND_ERR = "Job Not Found" ;
57+ public static final String EXCEPTION_TIMESTAMP = "exception-timestamp" ;
58+ public static final Duration MAX_K8S_EVENT_AGE = Duration .ofMinutes (30 );
5659
5760 protected final EventRecorder eventRecorder ;
5861
@@ -132,65 +135,77 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
132135 }
133136
134137 var exceptionHistory = history .getExceptionHistory ();
135- List <JobExceptionsInfoWithHistory .RootExceptionInfo > exceptions =
136- exceptionHistory .getEntries ();
137- if (exceptions == null || exceptions .isEmpty ()) {
138- return ;
138+ var exceptions = exceptionHistory .getEntries ();
139+ if (exceptions != null ) {
140+ exceptions = new ArrayList <>(exceptions );
141+ exceptions .sort (
142+ Comparator .comparingLong (
143+ JobExceptionsInfoWithHistory .RootExceptionInfo
144+ ::getTimestamp )
145+ .reversed ());
146+ } else {
147+ exceptions = Collections .emptyList ();
139148 }
140149
141- if (exceptionHistory .isTruncated ()) {
142- LOG .warn (
143- "Job exception history is truncated for jobId '{}'. Some exceptions may be missing." ,
144- jobId );
150+ String currentJobId = jobStatus .getJobId ();
151+ var cacheEntry = ctx .getExceptionCacheEntry ();
152+
153+ if (!cacheEntry .isInitialized ()) {
154+ Instant lastExceptionTs ;
155+ if (exceptions .isEmpty ()) {
156+ // If the job doesn't have any exceptions set to MIN as we always have to record
157+ // the next
158+ lastExceptionTs = Instant .MIN ;
159+ } else {
160+ var k8sExpirationTs = Instant .now ().minus (MAX_K8S_EVENT_AGE );
161+ var maxJobExceptionTs = Instant .ofEpochMilli (exceptions .get (0 ).getTimestamp ());
162+ if (maxJobExceptionTs .isBefore (k8sExpirationTs )) {
163+ // If the last job exception was a long time ago, then there is no point in
164+ // checking in k8s. We won't report this as exception
165+ lastExceptionTs = maxJobExceptionTs ;
166+ } else {
167+ // If there were recent exceptions, we check the triggered events from kube
168+ // to make sure we don't double trigger
169+ lastExceptionTs =
170+ EventUtils .findLastJobExceptionTsFromK8s (
171+ ctx .getKubernetesClient (), resource )
172+ .orElse (k8sExpirationTs );
173+ }
174+ }
175+
176+ cacheEntry .setLastTimestamp (lastExceptionTs );
177+ cacheEntry .setInitialized (true );
178+ cacheEntry .setJobId (currentJobId );
145179 }
146180
147- String currentJobId = jobStatus .getJobId ();
148- Instant lastRecorded = null ; // first reconciliation
181+ var lastRecorded =
182+ currentJobId .equals (cacheEntry .getJobId ())
183+ ? cacheEntry .getLastTimestamp ()
184+ : Instant .MIN ;
149185
150- var cacheEntry = ctx .getExceptionCacheEntry ();
151- // a cache entry is created should always be present. The timestamp for the first
152- // reconciliation would be
153- // when the job was created. This check is still necessary because even though there
154- // might be an entry,
155- // the jobId could have changed since the job was first created.
156- if (cacheEntry .getJobId () != null && cacheEntry .getJobId ().equals (currentJobId )) {
157- lastRecorded = Instant .ofEpochMilli (cacheEntry .getLastTimestamp ());
186+ if (exceptions .isEmpty ()) {
187+ return ;
158188 }
159189
160190 int maxEvents = operatorConfig .getReportedExceptionEventsMaxCount ();
161191 int maxStackTraceLines = operatorConfig .getReportedExceptionEventsMaxStackTraceLength ();
162192
163- // Sort and reverse to prioritize the newest exceptions
164- var sortedExceptions = new ArrayList <>(exceptions );
165- sortedExceptions .sort (
166- Comparator .comparingLong (
167- JobExceptionsInfoWithHistory .RootExceptionInfo ::getTimestamp )
168- .reversed ());
169193 int count = 0 ;
170- Instant latestSeen = null ;
171-
172- for (var exception : sortedExceptions ) {
173- Instant exceptionTime = Instant .ofEpochMilli (exception .getTimestamp ());
174- // Skip already recorded exceptions
175- if (lastRecorded != null && !exceptionTime .isAfter (lastRecorded )) {
194+ for (var exception : exceptions ) {
195+ var exceptionTime = Instant .ofEpochMilli (exception .getTimestamp ());
196+ // Skip already recorded exceptions and after max count
197+ if (!exceptionTime .isAfter (lastRecorded ) || count ++ >= maxEvents ) {
176198 break ;
177199 }
178200 emitJobManagerExceptionEvent (ctx , exception , exceptionTime , maxStackTraceLines );
179- if (latestSeen == null ) {
180- latestSeen = exceptionTime ;
181- }
182- if (++count >= maxEvents ) {
183- break ;
184- }
185201 }
186202
187- ctx .getExceptionCacheEntry ().setJobId (currentJobId );
188- // Set to the timestamp of the latest emitted exception, if any were emitted
189- // the other option is that if no exceptions were emitted, we set this to now.
190- if (latestSeen != null ) {
191- ctx .getExceptionCacheEntry ().setLastTimestamp (latestSeen .toEpochMilli ());
203+ if (count > maxEvents ) {
204+ LOG .warn ("Job exception history is truncated. Some exceptions may be missing." );
192205 }
193206
207+ cacheEntry .setJobId (currentJobId );
208+ cacheEntry .setLastTimestamp (Instant .ofEpochMilli (exceptions .get (0 ).getTimestamp ()));
194209 } catch (Exception e ) {
195210 LOG .warn ("Failed to fetch JobManager exception info." , e );
196211 }
@@ -203,9 +218,7 @@ private void emitJobManagerExceptionEvent(
203218 int maxStackTraceLines ) {
204219 Map <String , String > annotations = new HashMap <>();
205220 if (exceptionTime != null ) {
206- annotations .put (
207- "exception-timestamp" ,
208- exceptionTime .atZone (ZoneId .systemDefault ()).toOffsetDateTime ().toString ());
221+ annotations .put (EXCEPTION_TIMESTAMP , exceptionTime .toString ());
209222 }
210223 if (exception .getTaskName () != null ) {
211224 annotations .put ("task-name" , exception .getTaskName ());
0 commit comments