3434import org .apache .flink .kubernetes .operator .utils .EventRecorder ;
3535import org .apache .flink .util .SerializedThrowable ;
3636
37- import io .fabric8 .kubernetes .api .model .MicroTime ;
3837import io .fabric8 .kubernetes .client .KubernetesClient ;
3938import io .fabric8 .kubernetes .client .server .mock .EnableKubernetesMockClient ;
4039import lombok .Getter ;
4443import org .junit .jupiter .params .provider .EnumSource ;
4544import org .junit .jupiter .params .provider .MethodSource ;
4645
47- import java .time .Instant ;
48- import java .time .ZoneId ;
49- import java .time .ZonedDateTime ;
5046import java .util .ArrayList ;
5147import java .util .HashMap ;
5248import java .util .Map ;
@@ -317,11 +313,6 @@ public void testStackTraceTruncationConfig() throws Exception {
317313 assertTrue (msg .contains ("line2" ));
318314 assertFalse (msg .contains ("line3" ));
319315 assertTrue (msg .contains ("... (2 more lines)" ));
320- // check that exception time becomes the event time
321- ZonedDateTime zonedDateTime =
322- ZonedDateTime .ofInstant (Instant .ofEpochMilli (exceptionTime ), ZoneId .of ("UTC" ));
323- MicroTime expectedEventTime = new MicroTime (zonedDateTime .toString ());
324- assertEquals (expectedEventTime , events .get (0 ).getEventTime ());
325316 }
326317
327318 @ Test
@@ -338,10 +329,26 @@ public void testIgnoreOldExceptions() throws Exception {
338329 var jobId = JobID .fromHexString (deployment .getStatus ().getJobStatus ().getJobId ());
339330 flinkService .submitApplicationCluster (
340331 deployment .getSpec ().getJob (), ctx .getDeployConfig (deployment .getSpec ()), false );
341-
342- flinkService .addExceptionHistory (jobId , "OldException" , "old" , 1000L );
343- flinkService .addExceptionHistory (jobId , "MidException" , "mid" , 2000L );
344- flinkService .addExceptionHistory (jobId , "NewException" , "new" , 3000L );
332+ // Map exception names to timestamps
333+ Map <String , Long > exceptionHistory =
334+ Map .of (
335+ "OldException" , 1000L ,
336+ "MidException" , 2000L ,
337+ "NewException" , 3000L );
338+ String dummyStackTrace =
339+ "org.apache.%s\n "
340+ + "\t at org.apache.flink.kubernetes.operator.observer.JobStatusObserverTest.testIgnoreOldExceptions(JobStatusObserverTest.java:1)\n "
341+ + "\t at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n "
342+ + "\t at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n "
343+ + "\t at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n "
344+ + "\t at java.base/java.lang.reflect.Method.invoke(Method.java:566)\n " ;
345+ // Add mapped exceptions
346+ exceptionHistory .forEach (
347+ (exceptionName , timestamp ) -> {
348+ String fullStackTrace = String .format (dummyStackTrace , exceptionName );
349+ flinkService .addExceptionHistory (
350+ jobId , "org.apache." + exceptionName , fullStackTrace , timestamp );
351+ });
345352
346353 // Ensure jobFailedErr is null before the observe call
347354 flinkService .setJobFailedErr (null );
@@ -355,7 +362,7 @@ public void testIgnoreOldExceptions() throws Exception {
355362 .list ()
356363 .getItems ();
357364 assertEquals (1 , events .size ());
358- assertTrue (events .get (0 ).getMessage ().contains ("NewException" ));
365+ assertTrue (events .get (0 ).getMessage ().contains ("org.apache. NewException" ));
359366 }
360367
361368 private static Stream <Arguments > cancellingArgs () {
0 commit comments