From 07b4f68af4acbc1dbf404471032b1e95c1f9daf9 Mon Sep 17 00:00:00 2001 From: Zhou JIANG Date: Tue, 24 Jun 2025 17:38:35 -0700 Subject: [PATCH] [SPARK-52581]Revise AppCleanUpStep to include cleanup logic for all states ### What changes were proposed in this pull request? This PR updates AppCleanUpStep so that it can be added as a regular step of each reconciliation, remove the therefore-unused AppTerminatedStep ### Why are the changes needed? Previously, we have AppTerminatedStep performed at the beginning of each reconciliation, to end the actions early if app has reached terminated state. However, in some scenarios, we may still want to do "clean up" for applications that are terminated: * If an app in "terminated without releasing resources" is being deleted, we would like the AppCleanUpStep to be performed, since it covers a few corner cases that might not be covered in general delete flow * We may also to introduce the concept of "maximal retention duration" for applications along with current "retention policy" in near future to serve the lifecycle management of application secondary resources. That could also require cleaning up secondary resources for already-terminated applications. The revised AppCleanUpStep includes everything in the previous AppTerminatedStep and takes care of cleaning up resources for those in "TerminatedWithoutReleaseResources" stage as needed. ### Does this PR introduce any user-facing change? No - internal step(s) changes only ### How was this patch tested? CIs and E2Es shall cover the clean up logic ### Was this patch authored or co-authored using generative AI tooling? No --- .../reconciler/SparkAppReconciler.java | 12 +- .../reconcilesteps/AppCleanUpStep.java | 115 +++++-- .../reconcilesteps/AppTerminatedStep.java | 44 --- .../reconcilesteps/AppCleanUpStepTest.java | 297 ++++++++++++++++++ 4 files changed, 386 insertions(+), 82 deletions(-) delete mode 100644 spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java index 89c52134..e99462ed 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java @@ -59,7 +59,6 @@ import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppReconcileStep; import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppResourceObserveStep; import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppRunningStep; -import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppTerminatedStep; import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppUnknownStateStep; import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppValidateStep; import org.apache.spark.k8s.operator.utils.LoggingUtils; @@ -150,7 +149,7 @@ public Map prepareEventSources( protected List getReconcileSteps(final SparkApplication app) { List steps = new ArrayList<>(); steps.add(new AppValidateStep()); - steps.add(new AppTerminatedStep()); + steps.add(new AppCleanUpStep()); switch (app.getStatus().getCurrentState().getCurrentStateSummary()) { case Submitted, ScheduledToRestart -> steps.add(new AppInitStep()); case DriverRequested, DriverStarted -> { @@ -172,14 +171,6 @@ protected List getReconcileSteps(final SparkApplication app) { steps.add( new AppResourceObserveStep(Collections.singletonList(new AppDriverTimeoutObserver()))); } - case DriverReadyTimedOut, - DriverStartTimedOut, - ExecutorsStartTimedOut, - Succeeded, - DriverEvicted, - Failed, - SchedulingFailure -> - steps.add(new AppCleanUpStep()); default -> steps.add(new AppUnknownStateStep()); } return steps; @@ -202,7 +193,6 @@ public DeleteControl cleanup( SparkAppContext ctx = new SparkAppContext(sparkApplication, context, submissionWorker); List cleanupSteps = new ArrayList<>(); cleanupSteps.add(new AppValidateStep()); - cleanupSteps.add(new AppTerminatedStep()); cleanupSteps.add(new AppCleanUpStep(SparkAppStatusUtils::appCancelled)); for (AppReconcileStep step : cleanupSteps) { ReconcileProgress progress = step.reconcile(ctx, sparkAppStatusRecorder); diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java index f75967a1..6d4313ee 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java @@ -23,6 +23,8 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.function.Supplier; @@ -54,38 +56,78 @@ @NoArgsConstructor @Slf4j public class AppCleanUpStep extends AppReconcileStep { - private Supplier cleanUpSuccessStateSupplier; + private Supplier onDemandCleanUpReason; + /** + * Cleanup secondary resources for an application if needed and updates application status + * accordingly. This step would be performed right after validation step in each reconcile as a + * sanity check. It may end the reconciliation if no more actions are needed. In addition, it can + * be performed on demand with a reason for cleanup secondary resources. + * + *

An app expects its secondary resources to be released if any of the below is true: + * + *

    + *
  • When the application is being deleted on demand(e.g. being deleted) with a reason + *
  • When the application is stopping + *
  • When the application has terminated without releasing resources, but it has exceeded + * configured retention duration + *
+ * + *

It would proceed to next steps with no actions for application in other states. Note that + * when even the reconciler decides to proceed with clean up, sub-resources may still be retained + * based on tolerations. + * + * @param context context for the app + * @param statusRecorder recorder for status updates in this reconcile + * @return the reconcile progress + */ @Override public ReconcileProgress reconcile( SparkAppContext context, SparkAppStatusRecorder statusRecorder) { - ApplicationStatus currentStatus = context.getResource().getStatus(); - ApplicationTolerations tolerations = - context.getResource().getSpec().getApplicationTolerations(); - ResourceRetainPolicy resourceRetainPolicy = tolerations.getResourceRetainPolicy(); + SparkApplication application = context.getResource(); + ApplicationStatus currentStatus = application.getStatus(); + ApplicationState currentState = currentStatus.getCurrentState(); + ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations(); + if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) { + statusRecorder.removeCachedStatus(application); + return ReconcileProgress.completeAndNoRequeue(); + } String stateMessage = null; - - if (retainReleaseResource(resourceRetainPolicy, currentStatus.getCurrentState())) { - if (tolerations.getRestartConfig() != null - && !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) { - stateMessage = - "Application is configured to restart, resources created in current " - + "attempt would be force released."; - log.warn(stateMessage); + if (isOnDemandCleanup()) { + log.info("Cleaning up application resources on demand"); + } else { + if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( + currentState.getCurrentStateSummary())) { + statusRecorder.removeCachedStatus(application); + return ReconcileProgress.completeAndNoRequeue(); + } else if (currentState.getCurrentStateSummary().isStopping()) { + if (retainReleaseResourceForPolicyAndState( + tolerations.getResourceRetainPolicy(), currentState)) { + if (tolerations.getRestartConfig() != null + && !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) { + stateMessage = + "Application is configured to restart, resources created in current " + + "attempt would be force released."; + log.warn(stateMessage); + } else { + ApplicationState terminationState = + new ApplicationState( + ApplicationStateSummary.TerminatedWithoutReleaseResources, + "Application is terminated without releasing resources as configured."); + long requeueAfterMillis = + tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); + return appendStateAndRequeueAfter( + context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis)); + } + } } else { - ApplicationState terminationState = - new ApplicationState( - ApplicationStateSummary.TerminatedWithoutReleaseResources, - "Application is terminated without releasing resources as configured."); - long requeueAfterMillis = - tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis(); - return appendStateAndRequeueAfter( - context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis)); + log.debug("Clean up is not expected for app, proceeding to next step."); + return ReconcileProgress.proceed(); } } + List resourcesToRemove = new ArrayList<>(); - if (ApplicationStateSummary.SchedulingFailure.equals( - currentStatus.getCurrentState().getCurrentStateSummary())) { + if (isReleasingResourcesForSchedulingFailureAttempt(currentStatus)) { // if app failed at scheduling, re-compute all spec and delete as they may not be fully // owned by driver try { @@ -110,13 +152,13 @@ public ReconcileProgress reconcile( Optional driver = context.getDriverPod(); driver.ifPresent(resourcesToRemove::add); } - boolean forceDelete = enableForceDelete(context.getResource()); + boolean forceDelete = enableForceDelete(application); for (HasMetadata resource : resourcesToRemove) { ReconcilerUtils.deleteResourceIfExists(context.getClient(), resource, forceDelete); } ApplicationStatus updatedStatus; - if (cleanUpSuccessStateSupplier != null) { - ApplicationState state = cleanUpSuccessStateSupplier.get(); + if (onDemandCleanUpReason != null) { + ApplicationState state = onDemandCleanUpReason.get(); if (StringUtils.isNotEmpty(stateMessage)) { state.setMessage(stateMessage); } @@ -142,7 +184,26 @@ public ReconcileProgress reconcile( } } - protected boolean retainReleaseResource( + protected boolean isOnDemandCleanup() { + return onDemandCleanUpReason != null; + } + + protected boolean isReleasingResourcesForSchedulingFailureAttempt( + final ApplicationStatus status) { + ApplicationState lastObservedState = status.getCurrentState(); + if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals( + lastObservedState.getCurrentStateSummary())) { + // if the app has already terminated, use the last observed state before termination + NavigableMap navMap = + (NavigableMap) status.getStateTransitionHistory(); + Map.Entry terminateState = navMap.lastEntry(); + lastObservedState = navMap.lowerEntry(terminateState.getKey()).getValue(); + } + return ApplicationStateSummary.SchedulingFailure.equals( + lastObservedState.getCurrentStateSummary()); + } + + protected boolean retainReleaseResourceForPolicyAndState( ResourceRetainPolicy resourceRetainPolicy, ApplicationState currentState) { return switch (resourceRetainPolicy) { case Never -> false; diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java deleted file mode 100644 index 16a4155d..00000000 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.k8s.operator.reconciler.reconcilesteps; - -import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.proceed; - -import org.apache.spark.k8s.operator.context.SparkAppContext; -import org.apache.spark.k8s.operator.reconciler.ReconcileProgress; -import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; - -/** Observes whether app is already terminated. If so, end the reconcile. */ -public class AppTerminatedStep extends AppReconcileStep { - @Override - public ReconcileProgress reconcile( - SparkAppContext context, SparkAppStatusRecorder statusRecorder) { - if (context - .getResource() - .getStatus() - .getCurrentState() - .getCurrentStateSummary() - .isTerminated()) { - statusRecorder.removeCachedStatus(context.getResource()); - return ReconcileProgress.completeAndNoRequeue(); - } - return proceed(); - } -} diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java index 388be418..1c4637e6 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java @@ -19,14 +19,42 @@ package org.apache.spark.k8s.operator.reconciler.reconcilesteps; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import java.time.Duration; import java.time.Instant; +import java.util.Collections; +import java.util.Optional; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.apache.spark.k8s.operator.Constants; import org.apache.spark.k8s.operator.SparkApplication; +import org.apache.spark.k8s.operator.context.SparkAppContext; +import org.apache.spark.k8s.operator.reconciler.ReconcileProgress; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.status.ApplicationState; +import org.apache.spark.k8s.operator.status.ApplicationStateSummary; +import org.apache.spark.k8s.operator.status.ApplicationStatus; +import org.apache.spark.k8s.operator.utils.ReconcilerUtils; +import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; +import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils; +@SuppressWarnings("PMD.NcssCount") class AppCleanUpStepTest { @Test void enableForceDelete() { @@ -41,4 +69,273 @@ void enableForceDelete() { .setForceTerminationGracePeriodMillis(3000L); assertTrue(appCleanUpStep.enableForceDelete(app)); } + + @Test + void routineCleanupForRunningAppExpectNoAction() { + SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); + AppCleanUpStep routineCheck = new AppCleanUpStep(); + for (ApplicationStateSummary stateSummary : ApplicationStateSummary.values()) { + if (!stateSummary.isStopping() && !stateSummary.isTerminated()) { + ApplicationStatus status = prepareApplicationStatus(stateSummary); + ApplicationSpec spec = ApplicationSpec.builder().build(); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(status); + when(mockApp.getSpec()).thenReturn(spec); + SparkAppContext mockAppContext = mock(SparkAppContext.class); + when(mockAppContext.getResource()).thenReturn(mockApp); + ReconcileProgress progress = routineCheck.reconcile(mockAppContext, mockRecorder); + Assertions.assertEquals(ReconcileProgress.proceed(), progress); + verify(mockAppContext).getResource(); + verify(mockApp).getSpec(); + verify(mockApp).getStatus(); + verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); + } + } + } + + @Test + void onDemandCleanupForRunningAppExpectDelete() { + SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + for (ApplicationStateSummary stateSummary : ApplicationStateSummary.values()) { + if (!stateSummary.isStopping() && !stateSummary.isTerminated()) { + ApplicationStatus status = prepareApplicationStatus(stateSummary); + ApplicationSpec spec = ApplicationSpec.builder().build(); + SparkApplication mockApp = mock(SparkApplication.class); + when(mockApp.getStatus()).thenReturn(status); + when(mockApp.getSpec()).thenReturn(spec); + SparkAppContext mockAppContext = mock(SparkAppContext.class); + when(mockAppContext.getResource()).thenReturn(mockApp); + KubernetesClient mockClient = mock(KubernetesClient.class); + when(mockAppContext.getClient()).thenReturn(mockClient); + Pod driverPod = mock(Pod.class); + when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod)); + when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList()); + when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList()); + + try (MockedStatic utils = Mockito.mockStatic(ReconcilerUtils.class)) { + ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, mockRecorder); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, driverPod, false)); + Assertions.assertEquals( + ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress); + } + + verify(mockAppContext).getResource(); + verify(mockApp, times(2)).getSpec(); + verify(mockApp, times(2)).getStatus(); + verify(mockAppContext).getClient(); + verify(mockAppContext).getDriverPod(); + ArgumentCaptor captor = ArgumentCaptor.forClass(ApplicationState.class); + verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext), captor.capture()); + ApplicationState appState = captor.getValue(); + Assertions.assertEquals( + ApplicationStateSummary.ResourceReleased, appState.getCurrentStateSummary()); + Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE, appState.getMessage()); + verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp, mockClient, driverPod); + } + } + } + + @Test + void routineCleanupForTerminatedAppExpectNoAction() { + SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); + AppCleanUpStep routineCheck = new AppCleanUpStep(); + for (ApplicationStateSummary stateSummary : ApplicationStateSummary.values()) { + if (stateSummary.isTerminated()) { + ApplicationStatus status = prepareApplicationStatus(stateSummary); + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec spec = ApplicationSpec.builder().build(); + when(mockApp.getStatus()).thenReturn(status); + SparkAppContext mockAppContext = mock(SparkAppContext.class); + when(mockAppContext.getResource()).thenReturn(mockApp); + when(mockApp.getSpec()).thenReturn(spec); + ReconcileProgress progress = routineCheck.reconcile(mockAppContext, mockRecorder); + Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), progress); + verify(mockAppContext).getResource(); + verify(mockApp).getSpec(); + verify(mockApp).getStatus(); + verify(mockRecorder).removeCachedStatus(mockApp); + verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); + } + } + } + + @Test + void onDemandCleanupForTerminatedAppExpectNoAction() { + SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); + AppCleanUpStep routineCheck = new AppCleanUpStep(); + ApplicationStatus status = prepareApplicationStatus(ApplicationStateSummary.ResourceReleased); + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec spec = ApplicationSpec.builder().build(); + when(mockApp.getStatus()).thenReturn(status); + SparkAppContext mockAppContext = mock(SparkAppContext.class); + when(mockAppContext.getResource()).thenReturn(mockApp); + when(mockApp.getSpec()).thenReturn(spec); + ReconcileProgress progress = routineCheck.reconcile(mockAppContext, mockRecorder); + Assertions.assertEquals(ReconcileProgress.completeAndNoRequeue(), progress); + verify(mockAppContext).getResource(); + verify(mockApp).getSpec(); + verify(mockApp).getStatus(); + verify(mockRecorder).removeCachedStatus(mockApp); + verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp); + } + + @Test + void onDemandCleanupForTerminatedAppExpectDelete() { + SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus status = + prepareApplicationStatus(ApplicationStateSummary.TerminatedWithoutReleaseResources); + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec spec = ApplicationSpec.builder().build(); + when(mockApp.getStatus()).thenReturn(status); + SparkAppContext mockAppContext = mock(SparkAppContext.class); + when(mockAppContext.getResource()).thenReturn(mockApp); + when(mockApp.getSpec()).thenReturn(spec); + KubernetesClient mockClient = mock(KubernetesClient.class); + when(mockAppContext.getClient()).thenReturn(mockClient); + Pod driverPod = mock(Pod.class); + when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod)); + when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList()); + when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList()); + + try (MockedStatic utils = Mockito.mockStatic(ReconcilerUtils.class)) { + ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, mockRecorder); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, driverPod, false)); + Assertions.assertEquals( + ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress); + } + + verify(mockAppContext).getResource(); + verify(mockApp, times(2)).getSpec(); + verify(mockApp, times(2)).getStatus(); + verify(mockAppContext).getClient(); + verify(mockAppContext).getDriverPod(); + ArgumentCaptor captor = ArgumentCaptor.forClass(ApplicationState.class); + verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext), captor.capture()); + ApplicationState appState = captor.getValue(); + Assertions.assertEquals( + ApplicationStateSummary.ResourceReleased, appState.getCurrentStateSummary()); + Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE, appState.getMessage()); + verifyNoMoreInteractions(mockAppContext, mockRecorder, mockApp, mockClient, driverPod); + } + + @Test + void cleanupForAppExpectDeleteWithRecompute() { + SparkAppStatusRecorder mockRecorder = mock(SparkAppStatusRecorder.class); + AppCleanUpStep cleanUpWithReason = new AppCleanUpStep(SparkAppStatusUtils::appCancelled); + ApplicationStatus status1 = prepareApplicationStatus(ApplicationStateSummary.SchedulingFailure); + ApplicationStatus status2 = + prepareApplicationStatus( + ApplicationStateSummary.SchedulingFailure, + ApplicationStateSummary.TerminatedWithoutReleaseResources); + SparkApplication mockApp1 = mock(SparkApplication.class); + SparkApplication mockApp2 = mock(SparkApplication.class); + ApplicationSpec spec = ApplicationSpec.builder().build(); + when(mockApp1.getStatus()).thenReturn(status1); + when(mockApp2.getStatus()).thenReturn(status2); + SparkAppContext mockAppContext1 = mock(SparkAppContext.class); + SparkAppContext mockAppContext2 = mock(SparkAppContext.class); + when(mockAppContext1.getResource()).thenReturn(mockApp1); + when(mockAppContext2.getResource()).thenReturn(mockApp2); + when(mockApp1.getSpec()).thenReturn(spec); + when(mockApp2.getSpec()).thenReturn(spec); + KubernetesClient mockClient = mock(KubernetesClient.class); + when(mockAppContext1.getClient()).thenReturn(mockClient); + Pod driverPod = mock(Pod.class); + Pod driverPodSpec = mock(Pod.class); + ConfigMap resource1 = mock(ConfigMap.class); + ConfigMap resource2 = mock(ConfigMap.class); + when(mockAppContext1.getDriverPod()).thenReturn(Optional.of(driverPod)); + when(mockAppContext1.getDriverPodSpec()).thenReturn(driverPodSpec); + when(mockAppContext1.getDriverPreResourcesSpec()) + .thenReturn(Collections.singletonList(resource1)); + when(mockAppContext1.getDriverResourcesSpec()).thenReturn(Collections.singletonList(resource2)); + when(mockAppContext2.getDriverPod()).thenReturn(Optional.of(driverPod)); + when(mockAppContext2.getDriverPodSpec()).thenReturn(driverPodSpec); + when(mockAppContext2.getDriverPreResourcesSpec()) + .thenReturn(Collections.singletonList(resource1)); + when(mockAppContext2.getDriverResourcesSpec()).thenReturn(Collections.singletonList(resource2)); + + try (MockedStatic utils = Mockito.mockStatic(ReconcilerUtils.class)) { + ReconcileProgress progress1 = cleanUpWithReason.reconcile(mockAppContext1, mockRecorder); + ReconcileProgress progress2 = cleanUpWithReason.reconcile(mockAppContext2, mockRecorder); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, resource1, false)); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, driverPodSpec, false)); + utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, resource2, false)); + Assertions.assertEquals( + ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress1); + Assertions.assertEquals( + ReconcileProgress.completeAndRequeueAfter(Duration.ofMillis(2000)), progress2); + } + + verify(mockAppContext1).getResource(); + verify(mockApp1, times(2)).getSpec(); + verify(mockApp1, times(2)).getStatus(); + verify(mockAppContext1, times(3)).getClient(); + verify(mockAppContext1).getDriverPreResourcesSpec(); + verify(mockAppContext1).getDriverPodSpec(); + verify(mockAppContext1).getDriverResourcesSpec(); + verify(mockAppContext2).getResource(); + verify(mockApp2, times(2)).getSpec(); + verify(mockApp2, times(2)).getStatus(); + verify(mockAppContext2, times(3)).getClient(); + verify(mockAppContext2).getDriverPreResourcesSpec(); + verify(mockAppContext2).getDriverPodSpec(); + verify(mockAppContext2).getDriverResourcesSpec(); + ArgumentCaptor captor = ArgumentCaptor.forClass(ApplicationState.class); + verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext1), captor.capture()); + verify(mockRecorder).appendNewStateAndPersist(eq(mockAppContext2), captor.capture()); + Assertions.assertEquals(2, captor.getAllValues().size()); + ApplicationState appState1 = captor.getAllValues().get(0); + Assertions.assertEquals( + ApplicationStateSummary.ResourceReleased, appState1.getCurrentStateSummary()); + Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE, appState1.getMessage()); + ApplicationState appState2 = captor.getAllValues().get(1); + Assertions.assertEquals( + ApplicationStateSummary.ResourceReleased, appState2.getCurrentStateSummary()); + Assertions.assertEquals(Constants.APP_CANCELLED_MESSAGE, appState2.getMessage()); + verifyNoMoreInteractions( + mockAppContext1, mockAppContext2, mockRecorder, mockApp1, mockApp2, mockClient, driverPod); + } + + private ApplicationStatus prepareApplicationStatus(ApplicationStateSummary currentStateSummary) { + ApplicationStatus status = new ApplicationStatus(); + ApplicationState state = new ApplicationState(currentStateSummary, "foo"); + return status.appendNewState(state); + } + + private ApplicationStatus prepareApplicationStatus( + ApplicationStateSummary currentStateSummary, ApplicationStateSummary previousStateSummary) { + ApplicationStatus status = prepareApplicationStatus(previousStateSummary); + ApplicationState state = new ApplicationState(currentStateSummary, "foo"); + return status.appendNewState(state); + } + + @Test + void isReleasingResourcesForSchedulingFailureAttempt() { + AppCleanUpStep appCleanUpStep = new AppCleanUpStep(); + ApplicationStatus status = new ApplicationStatus(); + assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status)); + status = + status.appendNewState(new ApplicationState(ApplicationStateSummary.DriverRequested, "foo")); + assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status)); + status = + status.appendNewState(new ApplicationState(ApplicationStateSummary.RunningHealthy, "foo")); + assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status)); + status = status.appendNewState(new ApplicationState(ApplicationStateSummary.Failed, "foo")); + assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status)); + status = + status.appendNewState( + new ApplicationState(ApplicationStateSummary.ScheduledToRestart, "foo")); + assertFalse(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status)); + status = + status.appendNewState( + new ApplicationState(ApplicationStateSummary.SchedulingFailure, "foo")); + assertTrue(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status)); + status = + status.appendNewState( + new ApplicationState(ApplicationStateSummary.TerminatedWithoutReleaseResources, "foo")); + assertTrue(appCleanUpStep.isReleasingResourcesForSchedulingFailureAttempt(status)); + } }