From 2eaa03ff98bb6c99cf9611faabfbd27d915f5e1e Mon Sep 17 00:00:00 2001 From: Santwana Verma <87354839+vsantwana@users.noreply.github.com> Date: Wed, 2 Jul 2025 20:58:25 +0530 Subject: [PATCH 1/2] [FLINK-37895][Job Manager] Fix failing collection of Flink Exceptions for Session Jobs --- .../service/AbstractFlinkService.java | 22 +- .../observer/JobStatusObserverTest.java | 307 ++++++++++++++++++ 2 files changed, 313 insertions(+), 16 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 3d6da4f5bd..211f2e9dc4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -851,25 +851,15 @@ public RestClusterClient getClusterClient(Configuration conf) throws Exc @Override public JobExceptionsInfoWithHistory getJobExceptions( - AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) { + AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) + throws IOException { JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance(); - int port = observeConfig.getInteger(RestOptions.PORT); - String host = - ObjectUtils.firstNonNull( - operatorConfig.getFlinkServiceHostOverride(), - ExternalServiceDecorator.getNamespacedExternalServiceName( - resource.getMetadata().getName(), - resource.getMetadata().getNamespace())); JobExceptionsMessageParameters params = new JobExceptionsMessageParameters(); params.jobPathParameter.resolve(jobId); - try (var restClient = getRestClient(observeConfig)) { - return restClient - .sendRequest( - host, - port, - jobExceptionsHeaders, - params, - EmptyRequestBody.getInstance()) + + try (var clusterClient = getClusterClient(observeConfig)) { + return clusterClient + .sendRequest(jobExceptionsHeaders, params, EmptyRequestBody.getInstance()) .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS); } catch (Exception e) { LOG.warn( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java index 4f89d04bd2..f3ff7a8b73 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java @@ -418,6 +418,313 @@ public void testExceptionEventTriggerInitialization() throws Exception { ctx.getExceptionCacheEntry().getLastTimestamp()); } + @Test + public void testSessionJobExceptionObservedEvenWhenNewStateIsTerminal() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); + Map configuration = new HashMap<>(); + configuration.put( + KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2"); + Configuration operatorConfig = Configuration.fromMap(configuration); + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient), + operatorConfig); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L)); + flinkService.addExceptionHistory(jobId, "SessionJobExceptionOne", "trace1", 1000L); + + // Submit the session job + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + + // Cancel the job to make it terminal + flinkService.cancelJob(jobId, false); + flinkService.setJobFailedErr(null); + + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(2, events.size()); // one will be for job status changed + // assert that none of the events contain JOB_NOT_FOUND_ERR + assertFalse( + events.stream() + .anyMatch( + event -> + event.getMessage() + .contains(JobStatusObserver.JOB_NOT_FOUND_ERR))); + } + + @Test + public void testSessionJobExceptionNotObservedWhenOldStateIsTerminal() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.FINISHED); // Set to terminal state + + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L)); + + long exceptionTime = 4000L; + flinkService.addExceptionHistory(jobId, "SessionJobException", "trace", exceptionTime); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals( + 1, events.size()); // Only one event for job status changed, no exception events + assertEquals(EventRecorder.Reason.JobStatusChanged.name(), events.get(0).getReason()); + } + + @Test + public void testSessionJobExceptionLimitConfig() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); + Map configuration = new HashMap<>(); + configuration.put( + KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2"); + Configuration operatorConfig = Configuration.fromMap(configuration); + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient), + operatorConfig); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L)); + + // Add 3 exceptions, but only 2 should be reported due to limit + flinkService.addExceptionHistory(jobId, "SessionJobException1", "trace1", 4000L); + flinkService.addExceptionHistory(jobId, "SessionJobException2", "trace2", 5000L); + flinkService.addExceptionHistory(jobId, "SessionJobException3", "trace3", 6000L); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(2, events.size()); // Only 2 exceptions should be reported + } + + @Test + public void testSessionJobStackTraceTruncationConfig() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); + Map configuration = new HashMap<>(); + configuration.put( + KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES.key(), + "2"); + Configuration operatorConfig = Configuration.fromMap(configuration); + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient), + operatorConfig); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration()); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(jobId.toHexString()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L)); + + long exceptionTime = 4000L; + String longTrace = "line1\nline2\nline3\nline4"; + flinkService.addExceptionHistory( + jobId, "SessionJobStackTraceCheck", longTrace, exceptionTime); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(1, events.size()); + String msg = events.get(0).getMessage(); + assertTrue(msg.contains("line1")); + assertTrue(msg.contains("line2")); + assertFalse(msg.contains("line3")); + assertTrue(msg.contains("... (2 more lines)")); + } + + @Test + public void testSessionJobIgnoreOldExceptions() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state + + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + ctx.getExceptionCacheEntry().setInitialized(true); + ctx.getExceptionCacheEntry().setJobId(sessionJob.getStatus().getJobStatus().getJobId()); + ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L)); + + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + + // Map exception names to timestamps + Map exceptionHistory = + Map.of( + "OldSessionException", 1000L, + "MidSessionException", 2000L, + "NewSessionException", 3000L); + String dummyStackTrace = + "org.apache.%s\n" + + "\tat org.apache.flink.kubernetes.operator.observer.JobStatusObserverTest.testSessionJobIgnoreOldExceptions(JobStatusObserverTest.java:1)\n" + + "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n" + + "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n" + + "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n" + + "\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n"; + // Add mapped exceptions + exceptionHistory.forEach( + (exceptionName, timestamp) -> { + String fullStackTrace = String.format(dummyStackTrace, exceptionName); + flinkService.addExceptionHistory( + jobId, "org.apache." + exceptionName, fullStackTrace, timestamp); + }); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(1, events.size()); + assertTrue(events.get(0).getMessage().contains("org.apache.NewSessionException")); + } + + @Test + public void testSessionJobExceptionEventTriggerInitialization() throws Exception { + var sessionJob = initSessionJob(); + var status = sessionJob.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state + + FlinkResourceContext> ctx = + getResourceContext( + sessionJob, + TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); + + var now = Instant.now(); + var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); + flinkService.submitJobToSessionCluster( + sessionJob.getMetadata(), + sessionJob.getSpec(), + jobId, + ctx.getDeployConfig(sessionJob.getSpec()), + null); + + // Old exception that happened outside of kubernetes event retention should be ignored + flinkService.addExceptionHistory( + jobId, + "OldSessionException", + "OldSessionException", + now.minus(Duration.ofHours(1)).toEpochMilli()); + flinkService.addExceptionHistory( + jobId, + "NewSessionException", + "NewSessionException", + now.minus(Duration.ofMinutes(1)).toEpochMilli()); + + // Ensure jobFailedErr is null before the observe call + flinkService.setJobFailedErr(null); + observer.observe(ctx); + + var events = + kubernetesClient + .v1() + .events() + .inNamespace(sessionJob.getMetadata().getNamespace()) + .list() + .getItems(); + assertEquals(1, events.size()); + assertTrue(events.get(0).getMessage().contains("NewSessionException")); + assertTrue(ctx.getExceptionCacheEntry().isInitialized()); + assertEquals( + now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS), + ctx.getExceptionCacheEntry().getLastTimestamp()); + } + private static Stream cancellingArgs() { var args = new ArrayList(); for (var status : JobStatus.values()) { From 59051c4951d11e5e9e909db38fe7b42ce0689003 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Wed, 2 Jul 2025 14:30:08 +0200 Subject: [PATCH 2/2] [FLINK-38033] Fix accidental upgrade snapshot dispose bug --- .../deployment/AbstractJobReconciler.java | 37 +++++++------ .../deployment/ApplicationReconcilerTest.java | 54 +++++++++++++------ 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 023396b52a..9e79982c25 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -409,25 +409,30 @@ protected void setUpgradeSavepointPath( conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE) .name()); - FlinkStateSnapshotUtils.createUpgradeSnapshotResource( - conf, - ctx.getOperatorConfig(), - ctx.getKubernetesClient(), - ctx.getResource(), - savepointFormatType, - savepointLocation); + var snapshotCrOpt = + FlinkStateSnapshotUtils.createUpgradeSnapshotResource( + conf, + ctx.getOperatorConfig(), + ctx.getKubernetesClient(), + ctx.getResource(), + savepointFormatType, + savepointLocation); var jobStatus = ctx.getResource().getStatus().getJobStatus(); jobStatus.setUpgradeSavepointPath(savepointLocation); - // Register created savepoint in the now deprecated savepoint info and history - var savepoint = - new Savepoint( - cancelTs.toEpochMilli(), - savepointLocation, - SnapshotTriggerType.UPGRADE, - savepointFormatType, - null); - jobStatus.getSavepointInfo().updateLastSavepoint(savepoint); + if (snapshotCrOpt.isEmpty()) { + // Register created savepoint in the now deprecated savepoint info and history + // only if snapshot CR was not created, otherwise it would be double recorded + // and disposed immediately + var savepoint = + new Savepoint( + cancelTs.toEpochMilli(), + savepointLocation, + SnapshotTriggerType.UPGRADE, + savepointFormatType, + null); + jobStatus.getSavepointInfo().updateLastSavepoint(savepoint); + } } /** diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index b31361b2ae..3d8ff29086 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -90,6 +90,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.ThrowingConsumer; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -112,6 +113,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Stream; import static org.apache.flink.api.common.JobStatus.FINISHED; import static org.apache.flink.api.common.JobStatus.RECONCILING; @@ -137,6 +139,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * @link JobStatusObserver unit tests @@ -235,9 +238,12 @@ public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion flinkVersio } @ParameterizedTest - @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") - public void testUpgrade(FlinkVersion flinkVersion) throws Exception { + @MethodSource("upgradeArgs") + public void testUpgrade(FlinkVersion flinkVersion, boolean snapshotResource) throws Exception { FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion); + conf.set(SNAPSHOT_RESOURCE_ENABLED, snapshotResource); + configManager.updateDefaultConfig(conf); + operatorConfig = configManager.getOperatorConfiguration(); reconciler.reconcile(deployment, context); var runningJobs = flinkService.listJobs(); @@ -305,26 +311,35 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception { assertEquals(0, flinkService.getRunningCount()); var spInfo = statefulUpgrade.getStatus().getJobStatus().getSavepointInfo(); - assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation()); - assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType()); - assertEquals( - spInfo.getLastSavepoint(), - new LinkedList<>(spInfo.getSavepointHistory()).getLast()); + if (snapshotResource) { + assertNull(spInfo.getLastSavepoint()); + } else { + assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation()); + assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType()); + assertEquals( + spInfo.getLastSavepoint(), + new LinkedList<>(spInfo.getSavepointHistory()).getLast()); + } reconciler.reconcile(statefulUpgrade, context); runningJobs = flinkService.listJobs(); assertEquals(1, flinkService.getRunningCount()); var snapshots = TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment); - assertThat(snapshots).isNotEmpty(); - assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()).isEqualTo("savepoint_0"); - assertEquals( - SnapshotTriggerType.UPGRADE.name(), - snapshots - .get(0) - .getMetadata() - .getLabels() - .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); + if (snapshotResource) { + assertThat(snapshots).isNotEmpty(); + assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()) + .isEqualTo("savepoint_0"); + assertEquals( + SnapshotTriggerType.UPGRADE.name(), + snapshots + .get(0) + .getMetadata() + .getLabels() + .get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE)); + } else { + assertThat(snapshots).isEmpty(); + } // Make sure jobId rotated on savepoint verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); @@ -370,6 +385,13 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception { verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); } + private static Stream upgradeArgs() { + return Stream.of( + arguments(FlinkVersion.v1_16, true), + arguments(FlinkVersion.v1_20, true), + arguments(FlinkVersion.v1_20, false)); + } + private void verifyJobId( FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) { // jobId set by operator