Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppReconcileStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppResourceObserveStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppRunningStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppTerminatedStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppUnknownStateStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppValidateStep;
import org.apache.spark.k8s.operator.utils.LoggingUtils;
Expand Down Expand Up @@ -150,7 +149,7 @@ public Map<String, EventSource> prepareEventSources(
protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) {
List<AppReconcileStep> steps = new ArrayList<>();
steps.add(new AppValidateStep());
steps.add(new AppTerminatedStep());
steps.add(new AppCleanUpStep());
switch (app.getStatus().getCurrentState().getCurrentStateSummary()) {
case Submitted, ScheduledToRestart -> steps.add(new AppInitStep());
case DriverRequested, DriverStarted -> {
Expand All @@ -172,14 +171,6 @@ protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) {
steps.add(
new AppResourceObserveStep(Collections.singletonList(new AppDriverTimeoutObserver())));
}
case DriverReadyTimedOut,
DriverStartTimedOut,
ExecutorsStartTimedOut,
Succeeded,
DriverEvicted,
Failed,
SchedulingFailure ->
steps.add(new AppCleanUpStep());
default -> steps.add(new AppUnknownStateStep());
}
return steps;
Expand All @@ -202,7 +193,6 @@ public DeleteControl cleanup(
SparkAppContext ctx = new SparkAppContext(sparkApplication, context, submissionWorker);
List<AppReconcileStep> cleanupSteps = new ArrayList<>();
cleanupSteps.add(new AppValidateStep());
cleanupSteps.add(new AppTerminatedStep());
cleanupSteps.add(new AppCleanUpStep(SparkAppStatusUtils::appCancelled));
for (AppReconcileStep step : cleanupSteps) {
ReconcileProgress progress = step.reconcile(ctx, sparkAppStatusRecorder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.function.Supplier;

Expand Down Expand Up @@ -54,38 +56,78 @@
@NoArgsConstructor
@Slf4j
public class AppCleanUpStep extends AppReconcileStep {
private Supplier<ApplicationState> cleanUpSuccessStateSupplier;
private Supplier<ApplicationState> onDemandCleanUpReason;

/**
* Cleanup secondary resources for an application if needed and updates application status
* accordingly. This step would be performed right after validation step in each reconcile as a
* sanity check. It may end the reconciliation if no more actions are needed. In addition, it can
* be performed on demand with a reason for cleanup secondary resources.
*
* <p>An app expects its secondary resources to be released if any of the below is true:
*
* <ul>
* <li>When the application is being deleted on demand(e.g. being deleted) with a reason
* <li>When the application is stopping
* <li>When the application has terminated without releasing resources, but it has exceeded
* configured retention duration
* </ul>
*
* <p>It would proceed to next steps with no actions for application in other states. Note that
* when even the reconciler decides to proceed with clean up, sub-resources may still be retained
* based on tolerations.
*
* @param context context for the app
* @param statusRecorder recorder for status updates in this reconcile
* @return the reconcile progress
*/
@Override
public ReconcileProgress reconcile(
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
ApplicationStatus currentStatus = context.getResource().getStatus();
ApplicationTolerations tolerations =
context.getResource().getSpec().getApplicationTolerations();
ResourceRetainPolicy resourceRetainPolicy = tolerations.getResourceRetainPolicy();
SparkApplication application = context.getResource();
ApplicationStatus currentStatus = application.getStatus();
ApplicationState currentState = currentStatus.getCurrentState();
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
statusRecorder.removeCachedStatus(application);
return ReconcileProgress.completeAndNoRequeue();
}
String stateMessage = null;

if (retainReleaseResource(resourceRetainPolicy, currentStatus.getCurrentState())) {
if (tolerations.getRestartConfig() != null
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
stateMessage =
"Application is configured to restart, resources created in current "
+ "attempt would be force released.";
log.warn(stateMessage);
if (isOnDemandCleanup()) {
log.info("Cleaning up application resources on demand");
} else {
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
currentState.getCurrentStateSummary())) {
statusRecorder.removeCachedStatus(application);
return ReconcileProgress.completeAndNoRequeue();
} else if (currentState.getCurrentStateSummary().isStopping()) {
if (retainReleaseResourceForPolicyAndState(
tolerations.getResourceRetainPolicy(), currentState)) {
if (tolerations.getRestartConfig() != null
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
stateMessage =
"Application is configured to restart, resources created in current "
+ "attempt would be force released.";
log.warn(stateMessage);
} else {
ApplicationState terminationState =
new ApplicationState(
ApplicationStateSummary.TerminatedWithoutReleaseResources,
"Application is terminated without releasing resources as configured.");
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
return appendStateAndRequeueAfter(
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
}
}
} else {
ApplicationState terminationState =
new ApplicationState(
ApplicationStateSummary.TerminatedWithoutReleaseResources,
"Application is terminated without releasing resources as configured.");
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
return appendStateAndRequeueAfter(
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
log.debug("Clean up is not expected for app, proceeding to next step.");
return ReconcileProgress.proceed();
}
}

List<HasMetadata> resourcesToRemove = new ArrayList<>();
if (ApplicationStateSummary.SchedulingFailure.equals(
currentStatus.getCurrentState().getCurrentStateSummary())) {
if (isReleasingResourcesForSchedulingFailureAttempt(currentStatus)) {
// if app failed at scheduling, re-compute all spec and delete as they may not be fully
// owned by driver
try {
Expand All @@ -110,13 +152,13 @@ public ReconcileProgress reconcile(
Optional<Pod> driver = context.getDriverPod();
driver.ifPresent(resourcesToRemove::add);
}
boolean forceDelete = enableForceDelete(context.getResource());
boolean forceDelete = enableForceDelete(application);
for (HasMetadata resource : resourcesToRemove) {
ReconcilerUtils.deleteResourceIfExists(context.getClient(), resource, forceDelete);
}
ApplicationStatus updatedStatus;
if (cleanUpSuccessStateSupplier != null) {
ApplicationState state = cleanUpSuccessStateSupplier.get();
if (onDemandCleanUpReason != null) {
ApplicationState state = onDemandCleanUpReason.get();
if (StringUtils.isNotEmpty(stateMessage)) {
state.setMessage(stateMessage);
}
Expand All @@ -142,7 +184,26 @@ public ReconcileProgress reconcile(
}
}

protected boolean retainReleaseResource(
protected boolean isOnDemandCleanup() {
return onDemandCleanUpReason != null;
}

protected boolean isReleasingResourcesForSchedulingFailureAttempt(
final ApplicationStatus status) {
ApplicationState lastObservedState = status.getCurrentState();
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
lastObservedState.getCurrentStateSummary())) {
// if the app has already terminated, use the last observed state before termination
NavigableMap<Long, ApplicationState> navMap =
(NavigableMap<Long, ApplicationState>) status.getStateTransitionHistory();
Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry();
lastObservedState = navMap.lowerEntry(terminateState.getKey()).getValue();
}
return ApplicationStateSummary.SchedulingFailure.equals(
lastObservedState.getCurrentStateSummary());
}

protected boolean retainReleaseResourceForPolicyAndState(
ResourceRetainPolicy resourceRetainPolicy, ApplicationState currentState) {
return switch (resourceRetainPolicy) {
case Never -> false;
Expand Down

This file was deleted.

Loading
Loading