Skip to content

Commit 9c7795a

Browse files
authored
[FLINK-37895][Job Manager] Fix failing collection of Flink Exceptions for Session Jobs
1 parent 8a18cc4 commit 9c7795a

File tree

2 files changed

+313
-16
lines changed

2 files changed

+313
-16
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -851,25 +851,15 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
851851

852852
@Override
853853
public JobExceptionsInfoWithHistory getJobExceptions(
854-
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) {
854+
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig)
855+
throws IOException {
855856
JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
856-
int port = observeConfig.getInteger(RestOptions.PORT);
857-
String host =
858-
ObjectUtils.firstNonNull(
859-
operatorConfig.getFlinkServiceHostOverride(),
860-
ExternalServiceDecorator.getNamespacedExternalServiceName(
861-
resource.getMetadata().getName(),
862-
resource.getMetadata().getNamespace()));
863857
JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
864858
params.jobPathParameter.resolve(jobId);
865-
try (var restClient = getRestClient(observeConfig)) {
866-
return restClient
867-
.sendRequest(
868-
host,
869-
port,
870-
jobExceptionsHeaders,
871-
params,
872-
EmptyRequestBody.getInstance())
859+
860+
try (var clusterClient = getClusterClient(observeConfig)) {
861+
return clusterClient
862+
.sendRequest(jobExceptionsHeaders, params, EmptyRequestBody.getInstance())
873863
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
874864
} catch (Exception e) {
875865
LOG.warn(

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,313 @@ public void testExceptionEventTriggerInitialization() throws Exception {
418418
ctx.getExceptionCacheEntry().getLastTimestamp());
419419
}
420420

421+
@Test
422+
public void testSessionJobExceptionObservedEvenWhenNewStateIsTerminal() throws Exception {
423+
var sessionJob = initSessionJob();
424+
var status = sessionJob.getStatus();
425+
var jobStatus = status.getJobStatus();
426+
jobStatus.setState(JobStatus.RUNNING);
427+
Map<String, String> configuration = new HashMap<>();
428+
configuration.put(
429+
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2");
430+
Configuration operatorConfig = Configuration.fromMap(configuration);
431+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
432+
getResourceContext(
433+
sessionJob,
434+
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient),
435+
operatorConfig);
436+
437+
var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
438+
ctx.getExceptionCacheEntry().setInitialized(true);
439+
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
440+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
441+
flinkService.addExceptionHistory(jobId, "SessionJobExceptionOne", "trace1", 1000L);
442+
443+
// Submit the session job
444+
flinkService.submitJobToSessionCluster(
445+
sessionJob.getMetadata(),
446+
sessionJob.getSpec(),
447+
jobId,
448+
ctx.getDeployConfig(sessionJob.getSpec()),
449+
null);
450+
451+
// Cancel the job to make it terminal
452+
flinkService.cancelJob(jobId, false);
453+
flinkService.setJobFailedErr(null);
454+
455+
observer.observe(ctx);
456+
457+
var events =
458+
kubernetesClient
459+
.v1()
460+
.events()
461+
.inNamespace(sessionJob.getMetadata().getNamespace())
462+
.list()
463+
.getItems();
464+
assertEquals(2, events.size()); // one will be for job status changed
465+
// assert that none of the events contain JOB_NOT_FOUND_ERR
466+
assertFalse(
467+
events.stream()
468+
.anyMatch(
469+
event ->
470+
event.getMessage()
471+
.contains(JobStatusObserver.JOB_NOT_FOUND_ERR)));
472+
}
473+
474+
@Test
475+
public void testSessionJobExceptionNotObservedWhenOldStateIsTerminal() throws Exception {
476+
var sessionJob = initSessionJob();
477+
var status = sessionJob.getStatus();
478+
var jobStatus = status.getJobStatus();
479+
jobStatus.setState(JobStatus.FINISHED); // Set to terminal state
480+
481+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
482+
getResourceContext(
483+
sessionJob,
484+
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
485+
486+
var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
487+
flinkService.submitJobToSessionCluster(
488+
sessionJob.getMetadata(),
489+
sessionJob.getSpec(),
490+
jobId,
491+
ctx.getDeployConfig(sessionJob.getSpec()),
492+
null);
493+
ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration());
494+
ctx.getExceptionCacheEntry().setInitialized(true);
495+
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
496+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));
497+
498+
long exceptionTime = 4000L;
499+
flinkService.addExceptionHistory(jobId, "SessionJobException", "trace", exceptionTime);
500+
501+
// Ensure jobFailedErr is null before the observe call
502+
flinkService.setJobFailedErr(null);
503+
observer.observe(ctx);
504+
505+
var events =
506+
kubernetesClient
507+
.v1()
508+
.events()
509+
.inNamespace(sessionJob.getMetadata().getNamespace())
510+
.list()
511+
.getItems();
512+
assertEquals(
513+
1, events.size()); // Only one event for job status changed, no exception events
514+
assertEquals(EventRecorder.Reason.JobStatusChanged.name(), events.get(0).getReason());
515+
}
516+
517+
@Test
518+
public void testSessionJobExceptionLimitConfig() throws Exception {
519+
var sessionJob = initSessionJob();
520+
var status = sessionJob.getStatus();
521+
var jobStatus = status.getJobStatus();
522+
jobStatus.setState(JobStatus.RUNNING);
523+
Map<String, String> configuration = new HashMap<>();
524+
configuration.put(
525+
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2");
526+
Configuration operatorConfig = Configuration.fromMap(configuration);
527+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
528+
getResourceContext(
529+
sessionJob,
530+
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient),
531+
operatorConfig);
532+
533+
var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
534+
flinkService.submitJobToSessionCluster(
535+
sessionJob.getMetadata(),
536+
sessionJob.getSpec(),
537+
jobId,
538+
ctx.getDeployConfig(sessionJob.getSpec()),
539+
null);
540+
ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration());
541+
ctx.getExceptionCacheEntry().setInitialized(true);
542+
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
543+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));
544+
545+
// Add 3 exceptions, but only 2 should be reported due to limit
546+
flinkService.addExceptionHistory(jobId, "SessionJobException1", "trace1", 4000L);
547+
flinkService.addExceptionHistory(jobId, "SessionJobException2", "trace2", 5000L);
548+
flinkService.addExceptionHistory(jobId, "SessionJobException3", "trace3", 6000L);
549+
550+
// Ensure jobFailedErr is null before the observe call
551+
flinkService.setJobFailedErr(null);
552+
observer.observe(ctx);
553+
554+
var events =
555+
kubernetesClient
556+
.v1()
557+
.events()
558+
.inNamespace(sessionJob.getMetadata().getNamespace())
559+
.list()
560+
.getItems();
561+
assertEquals(2, events.size()); // Only 2 exceptions should be reported
562+
}
563+
564+
@Test
565+
public void testSessionJobStackTraceTruncationConfig() throws Exception {
566+
var sessionJob = initSessionJob();
567+
var status = sessionJob.getStatus();
568+
var jobStatus = status.getJobStatus();
569+
jobStatus.setState(JobStatus.RUNNING);
570+
Map<String, String> configuration = new HashMap<>();
571+
configuration.put(
572+
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES.key(),
573+
"2");
574+
Configuration operatorConfig = Configuration.fromMap(configuration);
575+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
576+
getResourceContext(
577+
sessionJob,
578+
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient),
579+
operatorConfig);
580+
581+
var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
582+
flinkService.submitJobToSessionCluster(
583+
sessionJob.getMetadata(),
584+
sessionJob.getSpec(),
585+
jobId,
586+
ctx.getDeployConfig(sessionJob.getSpec()),
587+
null);
588+
ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration());
589+
ctx.getExceptionCacheEntry().setInitialized(true);
590+
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
591+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));
592+
593+
long exceptionTime = 4000L;
594+
String longTrace = "line1\nline2\nline3\nline4";
595+
flinkService.addExceptionHistory(
596+
jobId, "SessionJobStackTraceCheck", longTrace, exceptionTime);
597+
598+
// Ensure jobFailedErr is null before the observe call
599+
flinkService.setJobFailedErr(null);
600+
observer.observe(ctx);
601+
602+
var events =
603+
kubernetesClient
604+
.v1()
605+
.events()
606+
.inNamespace(sessionJob.getMetadata().getNamespace())
607+
.list()
608+
.getItems();
609+
assertEquals(1, events.size());
610+
String msg = events.get(0).getMessage();
611+
assertTrue(msg.contains("line1"));
612+
assertTrue(msg.contains("line2"));
613+
assertFalse(msg.contains("line3"));
614+
assertTrue(msg.contains("... (2 more lines)"));
615+
}
616+
617+
@Test
618+
public void testSessionJobIgnoreOldExceptions() throws Exception {
619+
var sessionJob = initSessionJob();
620+
var status = sessionJob.getStatus();
621+
var jobStatus = status.getJobStatus();
622+
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
623+
624+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
625+
getResourceContext(
626+
sessionJob,
627+
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
628+
ctx.getExceptionCacheEntry().setInitialized(true);
629+
ctx.getExceptionCacheEntry().setJobId(sessionJob.getStatus().getJobStatus().getJobId());
630+
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L));
631+
632+
var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
633+
flinkService.submitJobToSessionCluster(
634+
sessionJob.getMetadata(),
635+
sessionJob.getSpec(),
636+
jobId,
637+
ctx.getDeployConfig(sessionJob.getSpec()),
638+
null);
639+
640+
// Map exception names to timestamps
641+
Map<String, Long> exceptionHistory =
642+
Map.of(
643+
"OldSessionException", 1000L,
644+
"MidSessionException", 2000L,
645+
"NewSessionException", 3000L);
646+
String dummyStackTrace =
647+
"org.apache.%s\n"
648+
+ "\tat org.apache.flink.kubernetes.operator.observer.JobStatusObserverTest.testSessionJobIgnoreOldExceptions(JobStatusObserverTest.java:1)\n"
649+
+ "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n"
650+
+ "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n"
651+
+ "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n"
652+
+ "\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n";
653+
// Add mapped exceptions
654+
exceptionHistory.forEach(
655+
(exceptionName, timestamp) -> {
656+
String fullStackTrace = String.format(dummyStackTrace, exceptionName);
657+
flinkService.addExceptionHistory(
658+
jobId, "org.apache." + exceptionName, fullStackTrace, timestamp);
659+
});
660+
661+
// Ensure jobFailedErr is null before the observe call
662+
flinkService.setJobFailedErr(null);
663+
observer.observe(ctx);
664+
665+
var events =
666+
kubernetesClient
667+
.v1()
668+
.events()
669+
.inNamespace(sessionJob.getMetadata().getNamespace())
670+
.list()
671+
.getItems();
672+
assertEquals(1, events.size());
673+
assertTrue(events.get(0).getMessage().contains("org.apache.NewSessionException"));
674+
}
675+
676+
@Test
677+
public void testSessionJobExceptionEventTriggerInitialization() throws Exception {
678+
var sessionJob = initSessionJob();
679+
var status = sessionJob.getStatus();
680+
var jobStatus = status.getJobStatus();
681+
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
682+
683+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
684+
getResourceContext(
685+
sessionJob,
686+
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
687+
688+
var now = Instant.now();
689+
var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
690+
flinkService.submitJobToSessionCluster(
691+
sessionJob.getMetadata(),
692+
sessionJob.getSpec(),
693+
jobId,
694+
ctx.getDeployConfig(sessionJob.getSpec()),
695+
null);
696+
697+
// Old exception that happened outside of kubernetes event retention should be ignored
698+
flinkService.addExceptionHistory(
699+
jobId,
700+
"OldSessionException",
701+
"OldSessionException",
702+
now.minus(Duration.ofHours(1)).toEpochMilli());
703+
flinkService.addExceptionHistory(
704+
jobId,
705+
"NewSessionException",
706+
"NewSessionException",
707+
now.minus(Duration.ofMinutes(1)).toEpochMilli());
708+
709+
// Ensure jobFailedErr is null before the observe call
710+
flinkService.setJobFailedErr(null);
711+
observer.observe(ctx);
712+
713+
var events =
714+
kubernetesClient
715+
.v1()
716+
.events()
717+
.inNamespace(sessionJob.getMetadata().getNamespace())
718+
.list()
719+
.getItems();
720+
assertEquals(1, events.size());
721+
assertTrue(events.get(0).getMessage().contains("NewSessionException"));
722+
assertTrue(ctx.getExceptionCacheEntry().isInitialized());
723+
assertEquals(
724+
now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS),
725+
ctx.getExceptionCacheEntry().getLastTimestamp());
726+
}
727+
421728
private static Stream<Arguments> cancellingArgs() {
422729
var args = new ArrayList<Arguments>();
423730
for (var status : JobStatus.values()) {

0 commit comments

Comments
 (0)