@@ -157,6 +157,85 @@ void testFailed() throws Exception {
157157 assertTrue (flinkResourceEventCollector .events .isEmpty ());
158158 }
159159
160+ @ Test
161+ public void testExceptionObservedEvenWhenNewStateIsTerminal () throws Exception {
162+ var deployment = initDeployment ();
163+ var status = deployment .getStatus ();
164+ var jobStatus = status .getJobStatus ();
165+ jobStatus .setState (JobStatus .RUNNING );
166+ Map <String , String > configuration = new HashMap <>();
167+ configuration .put (
168+ KubernetesOperatorConfigOptions .OPERATOR_EVENT_EXCEPTION_LIMIT .key (), "2" );
169+ Configuration operatorConfig = Configuration .fromMap (configuration );
170+ FlinkResourceContext <AbstractFlinkResource <?, ?>> ctx =
171+ getResourceContext (deployment , operatorConfig );
172+
173+ var jobId = JobID .fromHexString (deployment .getStatus ().getJobStatus ().getJobId ());
174+ ctx .getExceptionCacheEntry ().setJobId (jobId .toHexString ());
175+ ctx .getExceptionCacheEntry ().setLastTimestamp (500L );
176+ flinkService .addExceptionHistory (jobId , "ExceptionOne" , "trace1" , 1000L );
177+
178+ // Ensure jobFailedErr is null before the observe call
179+ flinkService .submitApplicationCluster (
180+ deployment .getSpec ().getJob (), ctx .getDeployConfig (deployment .getSpec ()), false );
181+ flinkService .cancelJob (JobID .fromHexString (jobStatus .getJobId ()), false );
182+ flinkService .setJobFailedErr (null );
183+
184+ observer .observe (ctx );
185+
186+ var events =
187+ kubernetesClient
188+ .v1 ()
189+ .events ()
190+ .inNamespace (deployment .getMetadata ().getNamespace ())
191+ .list ()
192+ .getItems ();
193+ assertEquals (2 , events .size ()); // one will be for job status changed
194+ // assert that none of the events contain JOB_NOT_FOUND_ERR
195+ assertFalse (
196+ events .stream ()
197+ .anyMatch (
198+ event ->
199+ event .getMessage ()
200+ .contains (JobStatusObserver .JOB_NOT_FOUND_ERR )));
201+ }
202+
203+ @ Test
204+ public void testExceptionNotObservedWhenOldStateIsTerminal () throws Exception {
205+ var deployment = initDeployment ();
206+ var status = deployment .getStatus ();
207+ var jobStatus = status .getJobStatus ();
208+ jobStatus .setState (JobStatus .CANCELED );
209+ Map <String , String > configuration = new HashMap <>();
210+ configuration .put (
211+ KubernetesOperatorConfigOptions .OPERATOR_EVENT_EXCEPTION_LIMIT .key (), "2" );
212+ Configuration operatorConfig = Configuration .fromMap (configuration );
213+ FlinkResourceContext <AbstractFlinkResource <?, ?>> ctx =
214+ getResourceContext (deployment , operatorConfig );
215+
216+ var jobId = JobID .fromHexString (deployment .getStatus ().getJobStatus ().getJobId ());
217+ ctx .getExceptionCacheEntry ().setJobId (jobId .toHexString ());
218+ ctx .getExceptionCacheEntry ().setLastTimestamp (500L );
219+ flinkService .addExceptionHistory (jobId , "ExceptionOne" , "trace1" , 1000L );
220+
221+ // Ensure jobFailedErr is null before the observe call
222+ flinkService .submitApplicationCluster (
223+ deployment .getSpec ().getJob (), ctx .getDeployConfig (deployment .getSpec ()), false );
224+ flinkService .setJobFailedErr (null );
225+
226+ observer .observe (ctx );
227+
228+ var events =
229+ kubernetesClient
230+ .v1 ()
231+ .events ()
232+ .inNamespace (deployment .getMetadata ().getNamespace ())
233+ .list ()
234+ .getItems ();
235+ assertEquals (1 , events .size ()); // only one event for job status changed
236+ assertEquals (EventRecorder .Reason .JobStatusChanged .name (), events .get (0 ).getReason ());
237+ }
238+
160239 @ Test
161240 public void testExceptionLimitConfig () throws Exception {
162241 var observer = new JobStatusObserver <>(eventRecorder );
0 commit comments