diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 15607eed..c52439d2 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -82,6 +82,7 @@ jobs: - spark-versions - python - state-transition + - resource-retain-duration - watched-namespaces exclude: - mode: dynamic @@ -90,6 +91,8 @@ jobs: test-group: python - mode: dynamic test-group: state-transition + - mode: dynamic + test-group: resource-retain-duration - mode: static test-group: watched-namespaces steps: diff --git a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml index 0df44fa0..21d39964 100644 --- a/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml +++ b/build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml @@ -17411,6 +17411,9 @@ spec: minExecutors: type: integer type: object + resourceRetainDurationMillis: + default: -1 + type: integer resourceRetainPolicy: enum: - Always diff --git a/docs/spark_custom_resources.md b/docs/spark_custom_resources.md index 5378c640..ad6f8b9f 100644 --- a/docs/spark_custom_resources.md +++ b/docs/spark_custom_resources.md @@ -289,12 +289,17 @@ On the other hand, when developing an application, it's possible to configure ```yaml applicationTolerations: # Acceptable values are 'Always', 'OnFailure', 'Never' + # Setting this to 'OnFailure' would retain secondary resources if and only if the app fails resourceRetainPolicy: OnFailure + # Secondary resources would be garbage collected 10 minutes after app termination + resourceRetainDurationMillis: 600000 ``` to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly, if resourceRetainPolicy is set to `Always`, operator would not delete driver resources -when app ends. Note that this applies only to operator-created resources (driver pod, SparkConf +when app ends. They would be by default kept with the same lifecycle as the App. It's also +possible to configure `resourceRetainDurationMillis` to define the maximal retain duration for +these resources. Note that this applies only to operator-created resources (driver pod, SparkConf configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination` and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created resources. diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java index 6815da3c..3f7fff22 100644 --- a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java @@ -63,6 +63,8 @@ public class Constants { + "observed status for details."; public static final String APP_CANCELLED_MESSAGE = "Spark application has been shutdown as requested."; + public static final String APP_EXCEEDED_RETAIN_DURATION_MESSAGE = + "Spark application resources released after exceeding the configured retain duration."; public static final String DRIVER_UNEXPECTED_REMOVED_MESSAGE = "Driver removed. This could caused by 'exit' called in driver process with non-zero " + "code, involuntary disruptions or unintentional destroy behavior, check " diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java index 8e2bef0d..8e478171 100644 --- a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java @@ -19,13 +19,18 @@ package org.apache.spark.k8s.operator.spec; +import java.time.Instant; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; +import io.fabric8.generator.annotation.Default; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.spark.k8s.operator.status.ApplicationState; + /** Toleration settings for a Spark application. */ @Data @NoArgsConstructor @@ -43,4 +48,40 @@ public class ApplicationTolerations { @Builder.Default protected ExecutorInstanceConfig instanceConfig = new ExecutorInstanceConfig(); @Builder.Default protected ResourceRetainPolicy resourceRetainPolicy = ResourceRetainPolicy.Never; + + /** + * Time-to-live in milliseconds for secondary resources of SparkApplications after termination. If + * set to a negative value, secondary resources could be retained with the same lifecycle as the + * application according to the retain policy. + */ + @Default("-1") + @Builder.Default + protected Long resourceRetainDurationMillis = -1L; + + /** + * Check whether a terminated application has exceeded the resource retain duration at the + * provided instant + * + * @param lastObservedState last observed state of the application + * @return true if the app has terminated and resource retain duration is configured to a positive + * value and the app is not within retain duration; false otherwise. + */ + public boolean exceedRetainDurationAtInstant( + ApplicationState lastObservedState, Instant instant) { + return lastObservedState != null + && lastObservedState.getCurrentStateSummary().isTerminated() + && resourceRetainDurationMillis > 0L + && Instant.parse(lastObservedState.getLastTransitionTime()) + .plusMillis(resourceRetainDurationMillis) + .isBefore(instant); + } + + /** + * Indicates whether the reconciler need to perform retain duration check + * + * @return true `resourceRetainDurationMillis` is set to non-negative value + */ + public boolean isRetainDurationEnabled() { + return resourceRetainDurationMillis >= 0L; + } } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java index f7b61bf5..c62a5068 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java @@ -30,7 +30,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; -import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -47,16 +46,22 @@ import org.apache.spark.k8s.operator.status.ApplicationStatus; import org.apache.spark.k8s.operator.utils.ReconcilerUtils; import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; +import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils; /** * Cleanup all secondary resources when application is deleted, or at the end of each attempt. * Update Application status to indicate whether another attempt would be made. */ -@AllArgsConstructor @NoArgsConstructor @Slf4j public class AppCleanUpStep extends AppReconcileStep { private Supplier onDemandCleanUpReason; + private String stateUpdateMessage; + + public AppCleanUpStep(Supplier onDemandCleanUpReason) { + super(); + this.onDemandCleanUpReason = onDemandCleanUpReason; + } /** * Cleanup secondary resources for an application if needed and updates application status @@ -88,42 +93,37 @@ public ReconcileProgress reconcile( ApplicationStatus currentStatus = application.getStatus(); ApplicationState currentState = currentStatus.getCurrentState(); ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations(); - if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) { - statusRecorder.removeCachedStatus(application); - return ReconcileProgress.completeAndNoRequeue(); - } - String stateMessage = null; - if (isOnDemandCleanup()) { - log.info("Cleaning up application resources on demand"); - } else { - if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( - currentState.getCurrentStateSummary())) { - statusRecorder.removeCachedStatus(application); - return ReconcileProgress.completeAndNoRequeue(); - } else if (currentState.getCurrentStateSummary().isStopping()) { - if (retainReleaseResourceForPolicyAndState( - tolerations.getResourceRetainPolicy(), currentState)) { - if (tolerations.getRestartConfig() != null - && !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) { - stateMessage = - "Application is configured to restart, resources created in current " - + "attempt would be force released."; - log.warn(stateMessage); - } else { - ApplicationState terminationState = - new ApplicationState( - ApplicationStateSummary.TerminatedWithoutReleaseResources, - "Application is terminated without releasing resources as configured."); - long requeueAfterMillis = - tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); - return appendStateAndRequeueAfter( - context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis)); - } + if (currentState.getCurrentStateSummary().isTerminated()) { + Optional terminatedAppProgress = + checkEarlyExitForTerminatedApp(application, statusRecorder); + if (terminatedAppProgress.isPresent()) { + return terminatedAppProgress.get(); + } + } else if (isOnDemandCleanup()) { + log.info("Releasing secondary resources for application on demand."); + } else if (currentState.getCurrentStateSummary().isStopping()) { + if (retainReleaseResourceForPolicyAndState( + tolerations.getResourceRetainPolicy(), currentState)) { + if (tolerations.getRestartConfig() != null + && !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) { + stateUpdateMessage = + "Application is configured to restart, resources created in current " + + "attempt would be force released."; + log.warn(stateUpdateMessage); + } else { + ApplicationState terminationState = + new ApplicationState( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + "Application is terminated without releasing resources as configured."); + long requeueAfterMillis = + tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); + return appendStateAndRequeueAfter( + context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis)); } - } else { - log.debug("Clean up is not expected for app, proceeding to next step."); - return ReconcileProgress.proceed(); } + } else { + log.debug("Clean up is not expected for app, proceeding to next step."); + return ReconcileProgress.proceed(); } List resourcesToRemove = new ArrayList<>(); @@ -159,8 +159,8 @@ public ReconcileProgress reconcile( ApplicationStatus updatedStatus; if (onDemandCleanUpReason != null) { ApplicationState state = onDemandCleanUpReason.get(); - if (StringUtils.isNotEmpty(stateMessage)) { - state.setMessage(stateMessage); + if (StringUtils.isNotEmpty(stateUpdateMessage)) { + state.setMessage(stateUpdateMessage); } long requeueAfterMillis = tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); @@ -171,7 +171,7 @@ public ReconcileProgress reconcile( currentStatus.terminateOrRestart( tolerations.getRestartConfig(), tolerations.getResourceRetainPolicy(), - stateMessage, + stateUpdateMessage, SparkOperatorConf.TRIM_ATTEMPT_STATE_TRANSITION_HISTORY.getValue()); long requeueAfterMillis = tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); @@ -184,6 +184,41 @@ public ReconcileProgress reconcile( } } + protected Optional checkEarlyExitForTerminatedApp( + final SparkApplication application, final SparkAppStatusRecorder statusRecorder) { + ApplicationStatus currentStatus = application.getStatus(); + ApplicationState currentState = currentStatus.getCurrentState(); + ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations(); + if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) { + statusRecorder.removeCachedStatus(application); + return Optional.of(ReconcileProgress.completeAndNoRequeue()); + } + if (isOnDemandCleanup()) { + return Optional.empty(); + } + if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( + currentState.getCurrentStateSummary())) { + if (tolerations.isRetainDurationEnabled()) { + Instant now = Instant.now(); + if (tolerations.exceedRetainDurationAtInstant(currentState, now)) { + onDemandCleanUpReason = SparkAppStatusUtils::appExceededRetainDuration; + return Optional.empty(); + } else { + Duration nextCheckDuration = + Duration.between( + Instant.now(), + Instant.parse(currentState.getLastTransitionTime()) + .plusMillis(tolerations.getResourceRetainDurationMillis())); + return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration)); + } + } else { + statusRecorder.removeCachedStatus(application); + return Optional.of(ReconcileProgress.completeAndNoRequeue()); + } + } + return Optional.empty(); + } + protected boolean isOnDemandCleanup() { return onDemandCleanUpReason != null; } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java index ecbe3e20..8b763bfc 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java @@ -61,6 +61,11 @@ public static ApplicationState appCancelled() { ApplicationStateSummary.ResourceReleased, Constants.APP_CANCELLED_MESSAGE); } + public static ApplicationState appExceededRetainDuration() { + return new ApplicationState( + ApplicationStateSummary.ResourceReleased, Constants.APP_EXCEEDED_RETAIN_DURATION_MESSAGE); + } + public static boolean hasReachedState( SparkApplication application, ApplicationState stateToCheck) { return isValidApplicationStatus(application) diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java index d5897650..ff7f371e 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java @@ -116,4 +116,61 @@ void testCleanupRunningApp() { assertTrue(deleteControl.isRemoveFinalizer()); } } + + @SuppressWarnings("PMD.UnusedLocalVariable") + @Test + void testCleanupAppTerminatedWithoutReleaseResources() { + try (MockedConstruction mockAppContext = + mockConstruction( + SparkAppContext.class, + (mock, context) -> { + when(mock.getResource()).thenReturn(app); + when(mock.getClient()).thenReturn(mockClient); + when(mock.getDriverPod()).thenReturn(Optional.of(mockDriver)); + when(mock.getDriverPodSpec()).thenReturn(mockDriver); + when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList()); + when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList()); + }); + MockedStatic utils = Mockito.mockStatic(ReconcilerUtils.class)) { + // delete app + app.setStatus( + app.getStatus() + .appendNewState( + new ApplicationState( + ApplicationStateSummary.TerminatedWithoutReleaseResources, ""))); + DeleteControl deleteControl = reconciler.cleanup(app, mockContext); + assertFalse(deleteControl.isRemoveFinalizer()); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, mockDriver, false)); + assertEquals( + ApplicationStateSummary.ResourceReleased, + app.getStatus().getCurrentState().getCurrentStateSummary()); + + // proceed delete for terminated app + deleteControl = reconciler.cleanup(app, mockContext); + assertTrue(deleteControl.isRemoveFinalizer()); + } + } + + @SuppressWarnings("PMD.UnusedLocalVariable") + @Test + void testCleanupAppTerminatedResourceReleased() { + try (MockedConstruction mockAppContext = + mockConstruction( + SparkAppContext.class, + (mock, context) -> { + when(mock.getResource()).thenReturn(app); + when(mock.getClient()).thenReturn(mockClient); + when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList()); + when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList()); + }); + MockedStatic utils = Mockito.mockStatic(ReconcilerUtils.class)) { + // delete app + app.setStatus( + app.getStatus() + .appendNewState(new ApplicationState(ApplicationStateSummary.ResourceReleased, ""))); + DeleteControl deleteControl = reconciler.cleanup(app, mockContext); + assertTrue(deleteControl.isRemoveFinalizer()); + utils.verifyNoInteractions(); + } + } } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java index 1c4637e6..23ed54ba 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java @@ -31,6 +31,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.Optional; import io.fabric8.kubernetes.api.model.ConfigMap; @@ -47,6 +48,8 @@ import org.apache.spark.k8s.operator.context.SparkAppContext; import org.apache.spark.k8s.operator.reconciler.ReconcileProgress; import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.spec.ApplicationTolerations; +import org.apache.spark.k8s.operator.spec.ResourceRetainPolicy; import org.apache.spark.k8s.operator.status.ApplicationState; import org.apache.spark.k8s.operator.status.ApplicationStateSummary; import org.apache.spark.k8s.operator.status.ApplicationStatus; @@ -56,6 +59,40 @@ @SuppressWarnings("PMD.NcssCount") class AppCleanUpStepTest { + private final ApplicationSpec alwaysRetain = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .build()) + .build(); + private final ApplicationSpec neverRetain = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Never) + .build()) + .build(); + private final ApplicationSpec exceedRetainDuration = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .resourceRetainDurationMillis(1L) + .build()) + .build(); + private final ApplicationSpec notExceedRetainDuration = + ApplicationSpec.builder() + .applicationTolerations( + ApplicationTolerations.builder() + .resourceRetainPolicy(ResourceRetainPolicy.Always) + .resourceRetainDurationMillis(24 * 60 * 60 * 1000L) + .build()) + .build(); + + private final List specs = + List.of(alwaysRetain, neverRetain, exceedRetainDuration, notExceedRetainDuration); + @Test void enableForceDelete() { AppCleanUpStep appCleanUpStep = new AppCleanUpStep(); @@ -120,7 +157,7 @@ void onDemandCleanupForRunningAppExpectDelete() { ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress); } - verify(mockAppContext).getResource(); + verify(mockAppContext, times(1)).getResource(); verify(mockApp, times(2)).getSpec(); verify(mockApp, times(2)).getStatus(); verify(mockAppContext).getClient(); @@ -151,9 +188,9 @@ void routineCleanupForTerminatedAppExpectNoAction() { when(mockApp.getSpec()).thenReturn(spec); ReconcileProgress progress = routineCheck.reconcile(mockAppContext, mockRecorder); Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), progress); - verify(mockAppContext).getResource(); - verify(mockApp).getSpec(); - verify(mockApp).getStatus(); + verify(mockAppContext, times(1)).getResource(); + verify(mockApp, times(2)).getSpec(); + verify(mockApp, times(2)).getStatus(); verify(mockRecorder).removeCachedStatus(mockApp); verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); } @@ -163,7 +200,7 @@ void routineCleanupForTerminatedAppExpectNoAction() { @Test void onDemandCleanupForTerminatedAppExpectNoAction() { SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); - AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); ApplicationStatus status = prepareApplicationStatus(ApplicationStateSummary.ResourceReleased); SparkApplication mockApp = mock(SparkApplication.class); ApplicationSpec spec = ApplicationSpec.builder().build(); @@ -171,11 +208,11 @@ void onDemandCleanupForTerminatedAppExpectNoAction() { SparkAppContext mockAppContext = mock(SparkAppContext.class); when(mockAppContext.getResource()).thenReturn(mockApp); when(mockApp.getSpec()).thenReturn(spec); - ReconcileProgress progress = routineCheck.reconcile(mockAppContext, mockRecorder); + ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, mockRecorder); Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), progress); - verify(mockAppContext).getResource(); - verify(mockApp).getSpec(); - verify(mockApp).getStatus(); + verify(mockAppContext, times(1)).getResource(); + verify(mockApp, times(2)).getSpec(); + verify(mockApp, times(2)).getStatus(); verify(mockRecorder).removeCachedStatus(mockApp); verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); } @@ -206,9 +243,9 @@ void onDemandCleanupForTerminatedAppExpectDelete() { ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress); } - verify(mockAppContext).getResource(); - verify(mockApp, times(2)).getSpec(); - verify(mockApp, times(2)).getStatus(); + verify(mockAppContext, times(1)).getResource(); + verify(mockApp, times(3)).getSpec(); + verify(mockApp, times(3)).getStatus(); verify(mockAppContext).getClient(); verify(mockAppContext).getDriverPod(); ArgumentCaptor captor = ArgumentCaptor.forClass(ApplicationState.class); @@ -269,14 +306,14 @@ void cleanupForAppExpectDeleteWithRecompute() { ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress2); } - verify(mockAppContext1).getResource(); + verify(mockAppContext1, times(1)).getResource(); verify(mockApp1, times(2)).getSpec(); verify(mockApp1, times(2)).getStatus(); verify(mockAppContext1, times(3)).getClient(); verify(mockAppContext1).getDriverPreResourcesSpec(); verify(mockAppContext1).getDriverPodSpec(); verify(mockAppContext1).getDriverResourcesSpec(); - verify(mockAppContext2).getResource(); + verify(mockAppContext2, times(1)).getResource(); verify(mockApp2, times(2)).getSpec(); verify(mockApp2, times(2)).getStatus(); verify(mockAppContext2, times(3)).getClient(); @@ -299,9 +336,195 @@ void cleanupForAppExpectDeleteWithRecompute() { mockAppContext1, mockAppContext2, mockRecorder, mockApp1, mockApp2, mockClient, driverPod); } + @Test + void checkEarlyExitForResourceReleasedApp() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.ResourceReleased, ApplicationStateSummary.RunningHealthy); + List statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationSpec appSpec : specs) { + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(appSpec); + + Optional routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + Assertions.assertEquals( + ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + verify(mockRecorder1).removeCachedStatus(mockApp); + + Optional onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertTrue(onDemandProgress.isPresent()); + Assertions.assertEquals( + ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + verify(mockRecorder2).removeCachedStatus(mockApp); + } + } + } + + @Test + void checkEarlyExitForAppTerminatedWithoutReleaseResourcesInfiniteRetain() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.RunningHealthy); + List statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(alwaysRetain); + + Optional routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), routineCheckProgress.get()); + verify(mockRecorder1).removeCachedStatus(mockApp); + + Optional onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertFalse(onDemandProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder2); + } + } + + @Test + void checkEarlyExitForAppTerminatedWithoutReleaseResourcesExceededRetainDuration() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.RunningHealthy); + List statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(exceedRetainDuration); + + Optional routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertFalse(routineCheckProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder1); + + Optional onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertFalse(onDemandProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder2); + } + } + + @Test + void checkEarlyExitForAppTerminatedWithoutReleaseResourcesWithinRetainDuration() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus succeeded = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.Succeeded); + ApplicationStatus failed = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.SchedulingFailure); + ApplicationStatus cancelled = + prepareApplicationStatus( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + ApplicationStateSummary.RunningHealthy); + List statusList = List.of(succeeded, failed, cancelled); + + for (ApplicationStatus appStatus : statusList) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(appStatus); + when(mockApp.getSpec()).thenReturn(notExceedRetainDuration); + + Optional routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isPresent()); + ReconcileProgress reconcileProgress = routineCheckProgress.get(); + assertTrue(reconcileProgress.isCompleted()); + assertTrue(reconcileProgress.isRequeue()); + verifyNoMoreInteractions(mockRecorder2); + + Optional onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertFalse(onDemandProgress.isPresent()); + verifyNoMoreInteractions(mockRecorder2); + } + } + + @Test + void checkEarlyExitForNotTerminatedApp() { + AppCleanUpStep routineCheck = new AppCleanUpStep(); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + for (ApplicationStateSummary stateSummary : ApplicationStateSummary.values()) { + if (stateSummary.isTerminated()) { + continue; + } + ApplicationStatus status = prepareApplicationStatus(stateSummary); + for (ApplicationSpec appSpec : specs) { + SparkAppStatusRecorder mockRecorder1 = mock(SparkAppStatusRecorder.class); + SparkAppStatusRecorder mockRecorder2 = mock(SparkAppStatusRecorder.class); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(status); + when(mockApp.getSpec()).thenReturn(appSpec); + + Optional routineCheckProgress = + routineCheck.checkEarlyExitForTerminatedApp(mockApp, mockRecorder1); + assertTrue(routineCheckProgress.isEmpty()); + verifyNoMoreInteractions(mockRecorder1); + + Optional onDemandProgress = + cleanUpWithReason.checkEarlyExitForTerminatedApp(mockApp, mockRecorder2); + assertTrue(onDemandProgress.isEmpty()); + verifyNoMoreInteractions(mockRecorder2); + } + } + } + private ApplicationStatus prepareApplicationStatus(ApplicationStateSummary currentStateSummary) { ApplicationStatus status = new ApplicationStatus(); ApplicationState state = new ApplicationState(currentStateSummary, "foo"); + // to make sure the state exceeds threshold + state.setLastTransitionTime(Instant.now().minusSeconds(10).toString()); return status.appendNewState(state); } @@ -309,6 +532,7 @@ private ApplicationStatus prepareApplicationStatus( ApplicationStateSummary currentStateSummary, ApplicationStateSummary previousStateSummary) { ApplicationStatus status = prepareApplicationStatus(previousStateSummary); ApplicationState state = new ApplicationState(currentStateSummary, "foo"); + state.setLastTransitionTime(Instant.now().minusSeconds(5).toString()); return status.appendNewState(state); } diff --git a/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml b/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml new file mode 100644 index 00000000..4460291f --- /dev/null +++ b/tests/e2e/assertions/spark-application/spark-state-transition-with-retain-check.yaml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: spark.apache.org/v1 +kind: SparkApplication +metadata: + name: spark-example-retain-duration + namespace: ($SPARK_APP_NAMESPACE) +status: + stateTransitionHistory: + (*.currentStateSummary): + - "Submitted" + - "DriverRequested" + - "DriverStarted" + - "DriverReady" + - "RunningHealthy" + - "Succeeded" + - "TerminatedWithoutReleaseResources" + - "ResourceReleased" diff --git a/tests/e2e/resource-retain-duration/chainsaw-test.yaml b/tests/e2e/resource-retain-duration/chainsaw-test.yaml new file mode 100644 index 00000000..2c8b5584 --- /dev/null +++ b/tests/e2e/resource-retain-duration/chainsaw-test.yaml @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: chainsaw.kyverno.io/v1alpha1 +kind: Test +metadata: + name: garbage-collect-with-retain-duration-test +spec: + scenarios: + - bindings: + - name: TEST_NAME + value: succeeded + - name: APPLICATION_FILE_NAME + value: spark-example-retain-duration.yaml + - name: SPARK_APPLICATION_NAME + value: spark-example-retain-duration + steps: + - try: + - script: + env: + - name: FILE_NAME + value: ($APPLICATION_FILE_NAME) + content: kubectl apply -f $FILE_NAME + - assert: + bindings: + - name: SPARK_APP_NAMESPACE + value: default + timeout: 120s + file: "../assertions/spark-application/spark-state-transition-with-retain-check.yaml" + catch: + - describe: + apiVersion: spark.apache.org/v1 + kind: SparkApplication + namespace: default + finally: + - script: + env: + - name: SPARK_APPLICATION_NAME + value: ($SPARK_APPLICATION_NAME) + timeout: 120s + content: | + kubectl delete sparkapplication $SPARK_APPLICATION_NAME diff --git a/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml new file mode 100644 index 00000000..022fdd45 --- /dev/null +++ b/tests/e2e/resource-retain-duration/spark-example-retain-duration.yaml @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: spark.apache.org/v1 +kind: SparkApplication +metadata: + name: spark-example-retain-duration + namespace: default +spec: + mainClass: "org.apache.spark.examples.SparkPi" + jars: "local:///opt/spark/examples/jars/spark-examples.jar" + applicationTolerations: + resourceRetainPolicy: Always + resourceRetainDurationMillis: 10000 + sparkConf: + spark.executor.instances: "1" + spark.kubernetes.container.image: "apache/spark:4.0.0-java21-scala" + spark.kubernetes.authenticate.driver.serviceAccountName: "spark" + runtimeVersions: + sparkVersion: 4.0.0