3939import java .time .Duration ;
4040import java .time .Instant ;
4141import java .util .ArrayList ;
42+ import java .util .Collections ;
4243import java .util .Comparator ;
4344import java .util .HashMap ;
4445import java .util .Map ;
@@ -135,36 +136,40 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
135136
136137 var exceptionHistory = history .getExceptionHistory ();
137138 var exceptions = exceptionHistory .getEntries ();
139+ if (exceptions != null ) {
140+ exceptions = new ArrayList <>(exceptions );
141+ exceptions .sort (
142+ Comparator .comparingLong (
143+ JobExceptionsInfoWithHistory .RootExceptionInfo
144+ ::getTimestamp )
145+ .reversed ());
146+ } else {
147+ exceptions = Collections .emptyList ();
148+ }
138149
139150 String currentJobId = jobStatus .getJobId ();
140151 var cacheEntry = ctx .getExceptionCacheEntry ();
141152
142153 if (!cacheEntry .isInitialized ()) {
143-
144154 Instant lastExceptionTs ;
145- if (exceptions == null || exceptions .isEmpty ()) {
155+ if (exceptions .isEmpty ()) {
146156 // If the job doesn't have any exceptions set to MIN as we always have to record
147157 // the next
148158 lastExceptionTs = Instant .MIN ;
149159 } else {
150160 var k8sExpirationTs = Instant .now ().minus (MAX_K8S_EVENT_AGE );
151- var maxJobExceptionTs =
152- exceptions .stream ()
153- .map (e -> Instant .ofEpochMilli (e .getTimestamp ()))
154- .max (Comparator .naturalOrder ())
155- .orElseThrow ();
156-
161+ var maxJobExceptionTs = Instant .ofEpochMilli (exceptions .get (0 ).getTimestamp ());
157162 if (maxJobExceptionTs .isBefore (k8sExpirationTs )) {
158163 // If the last job exception was a long time ago, then there is no point in
159- // checking in k8s.
164+ // checking in k8s. We won't report this as exception
160165 lastExceptionTs = maxJobExceptionTs ;
161166 } else {
162167 // If there were recent exceptions, we check the triggered events from kube
163168 // to make sure we don't double trigger
164169 lastExceptionTs =
165170 EventUtils .findLastJobExceptionTsFromK8s (
166171 ctx .getKubernetesClient (), resource )
167- .orElse (Instant . now (). minus ( MAX_K8S_EVENT_AGE ) );
172+ .orElse (k8sExpirationTs );
168173 }
169174 }
170175
@@ -178,50 +183,31 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
178183 ? cacheEntry .getLastTimestamp ()
179184 : Instant .MIN ;
180185
181- if (exceptions == null || exceptions .isEmpty ()) {
186+ if (exceptions .isEmpty ()) {
182187 return ;
183188 }
184189
185- if (exceptionHistory .isTruncated ()) {
186- LOG .warn (
187- "Job exception history is truncated for jobId '{}'. Some exceptions may be missing." ,
188- jobId );
189- }
190-
191190 int maxEvents = operatorConfig .getReportedExceptionEventsMaxCount ();
192191 int maxStackTraceLines = operatorConfig .getReportedExceptionEventsMaxStackTraceLength ();
193192
194- // Sort and reverse to prioritize the newest exceptions
195- var sortedExceptions = new ArrayList <>(exceptions );
196- sortedExceptions .sort (
197- Comparator .comparingLong (
198- JobExceptionsInfoWithHistory .RootExceptionInfo ::getTimestamp )
199- .reversed ());
200193 int count = 0 ;
201- Instant latestSeen = null ;
202-
203- for (var exception : sortedExceptions ) {
204- Instant exceptionTime = Instant .ofEpochMilli (exception .getTimestamp ());
205- // Skip already recorded exceptions
206- if (!exceptionTime .isAfter (lastRecorded )) {
194+ for (var exception : exceptions ) {
195+ var exceptionTime = Instant .ofEpochMilli (exception .getTimestamp ());
196+ // Skip already recorded exceptions and after max count
197+ if (!exceptionTime .isAfter (lastRecorded ) || count ++ >= maxEvents ) {
207198 break ;
208199 }
209200 emitJobManagerExceptionEvent (ctx , exception , exceptionTime , maxStackTraceLines );
210- if (latestSeen == null ) {
211- latestSeen = exceptionTime ;
212- }
213- if (++count >= maxEvents ) {
214- break ;
215- }
216201 }
217202
218- ctx .getExceptionCacheEntry ().setJobId (currentJobId );
219- // Set to the timestamp of the latest emitted exception, if any were emitted
220- // the other option is that if no exceptions were emitted, we set this to now.
221- if (latestSeen != null ) {
222- ctx .getExceptionCacheEntry ().setLastTimestamp (latestSeen );
203+ if (count < maxEvents && exceptionHistory .isTruncated ()) {
204+ LOG .warn (
205+ "Job exception history is truncated for jobId '{}'. Some exceptions may be missing." ,
206+ jobId );
223207 }
224208
209+ cacheEntry .setJobId (currentJobId );
210+ cacheEntry .setLastTimestamp (Instant .ofEpochMilli (exceptions .get (0 ).getTimestamp ()));
225211 } catch (Exception e ) {
226212 LOG .warn ("Failed to fetch JobManager exception info." , e );
227213 }
0 commit comments