diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 3d6da4f5bd..211f2e9dc4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -851,25 +851,15 @@ public RestClusterClient getClusterClient(Configuration conf) throws Exc @Override public JobExceptionsInfoWithHistory getJobExceptions( - AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) { + AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) + throws IOException { JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance(); - int port = observeConfig.getInteger(RestOptions.PORT); - String host = - ObjectUtils.firstNonNull( - operatorConfig.getFlinkServiceHostOverride(), - ExternalServiceDecorator.getNamespacedExternalServiceName( - resource.getMetadata().getName(), - resource.getMetadata().getNamespace())); JobExceptionsMessageParameters params = new JobExceptionsMessageParameters(); params.jobPathParameter.resolve(jobId); - try (var restClient = getRestClient(observeConfig)) { - return restClient - .sendRequest( - host, - port, - jobExceptionsHeaders, - params, - EmptyRequestBody.getInstance()) + + try (var clusterClient = getClusterClient(observeConfig)) { + return clusterClient + .sendRequest(jobExceptionsHeaders, params, EmptyRequestBody.getInstance()) .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS); } catch (Exception e) { LOG.warn( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java index 4f89d04bd2..f3ff7a8b73 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java @@ -418,6 +418,313 @@ public void testExceptionEventTriggerInitialization() throws Exception { ctx.getExceptionCacheEntry().getLastTimestamp()); } + @Test + public void testSessionJobExceptionObservedEvenWhenNewStateIsTerminal() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); + Map configuration = new HashMap<>(); + configuration.put( + KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2"); + Configuration operatorConfig = Configuration.fromMap(configuration); + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient), + operatorConfig); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L)); + flinkService.addExceptionHistory(jobId, "SessionJobExceptionOne", "trace1", 1000L); + + // Submit the session job + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + + // Cancel the job to make it terminal + flinkService.cancelJob(jobId, false); + flinkService.setJobFailedErr(null); + + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(2, events.size()); // one will be for job status changed + // assert that none of the events contain JOB_NOT_FOUND_ERR + assertFalse( + events.stream() + .anyMatch( + event -> + event.getMessage() + .contains(JobStatusObserver.JOB_NOT_FOUND_ERR))); + } + + @Test + public void testSessionJobExceptionNotObservedWhenOldStateIsTerminal() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.FINISHED); // Set to terminal state + + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L)); + + long exceptionTime = 4000L; + flinkService.addExceptionHistory(jobId, "SessionJobException", "trace", exceptionTime); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals( + 1, events.size()); // Only one event for job status changed, no exception events + assertEquals(EventRecorder.Reason.JobStatusChanged.name(), events.get(0).getReason()); + } + + @Test + public void testSessionJobExceptionLimitConfig() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); + Map configuration = new HashMap<>(); + configuration.put( + KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2"); + Configuration operatorConfig = Configuration.fromMap(configuration); + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient), + operatorConfig); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L)); + + // Add 3 exceptions, but only 2 should be reported due to limit + flinkService.addExceptionHistory(jobId, "SessionJobException1", "trace1", 4000L); + flinkService.addExceptionHistory(jobId, "SessionJobException2", "trace2", 5000L); + flinkService.addExceptionHistory(jobId, "SessionJobException3", "trace3", 6000L); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(2, events.size()); // Only 2 exceptions should be reported + } + + @Test + public void testSessionJobStackTraceTruncationConfig() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); + Map configuration = new HashMap<>(); + configuration.put( + KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES.key(), + "2"); + Configuration operatorConfig = Configuration.fromMap(configuration); + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient), + operatorConfig); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L)); + + long exceptionTime = 4000L; + String longTrace = "line1\nline2\nline3\nline4"; + flinkService.addExceptionHistory( + jobId, "SessionJobStackTraceCheck", longTrace, exceptionTime); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(1, events.size()); + String msg = events.get(0).getMessage(); + assertTrue(msg.contains("line1")); + assertTrue(msg.contains("line2")); + assertFalse(msg.contains("line3")); + assertTrue(msg.contains("... (2 more lines)")); + } + + @Test + public void testSessionJobIgnoreOldExceptions() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state + + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(sessionJob.getStatus().getJobStatus().getJobId()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L)); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + + // Map exception names to timestamps + Map exceptionHistory = + Map.of( + "OldSessionException", 1000L, + "MidSessionException", 2000L, + "NewSessionException", 3000L); + String dummyStackTrace = + "org.apache.%s\n" + + "\tat org.apache.flink.kubernetes.operator.observer.JobStatusObserverTest.testSessionJobIgnoreOldExceptions(JobStatusObserverTest.java:1)\n" + + "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n" + + "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n" + + "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n" + + "\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n"; + // Add mapped exceptions + exceptionHistory.forEach( + (exceptionName, timestamp) -> { + String fullStackTrace = String.format(dummyStackTrace, exceptionName); + flinkService.addExceptionHistory( + jobId, "org.apache." + exceptionName, fullStackTrace, timestamp); + }); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(1, events.size()); + assertTrue(events.get(0).getMessage().contains("org.apache.NewSessionException")); + } + + @Test + public void testSessionJobExceptionEventTriggerInitialization() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state + + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + + var now = Instant.now(); + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + + // Old exception that happened outside of kubernetes event retention should be ignored + flinkService.addExceptionHistory( + jobId, + "OldSessionException", + "OldSessionException", + now.minus(Duration.ofHours(1)).toEpochMilli()); + flinkService.addExceptionHistory( + jobId, + "NewSessionException", + "NewSessionException", + now.minus(Duration.ofMinutes(1)).toEpochMilli()); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(1, events.size()); + assertTrue(events.get(0).getMessage().contains("NewSessionException")); + assertTrue(ctx.getExceptionCacheEntry().isInitialized()); + assertEquals( + now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS), + ctx.getExceptionCacheEntry().getLastTimestamp()); + } + private static Stream cancellingArgs() { var args = new ArrayList(); for (var status : JobStatus.values()) {