Skip to content

Commit b100e41

Browse files
jiangzhodongjoon-hyun
authored andcommitted
[SPARK-52581] Revise AppCleanUpStep to include cleanup logic for all states
### What changes were proposed in this pull request? This PR updates AppCleanUpStep so that it can be added as a regular step of each reconciliation, remove the therefore-unused AppTerminatedStep ### Why are the changes needed? Previously, we have AppTerminatedStep performed at the beginning of each reconciliation, to end the actions early if app has reached terminated state. However, in some scenarios, we may still want to do "clean up" for applications that are terminated: * If an app in "terminated without releasing resources" is being deleted, we would like the AppCleanUpStep to be performed, since it covers a few corner cases that might not be covered in general delete flow * We may also to introduce the concept of "maximal retention duration" for applications along with current "retention policy" in near future to serve the lifecycle management of application secondary resources. That could also require cleaning up secondary resources for already-terminated applications. The revised AppCleanUpStep includes everything in the previous AppTerminatedStep and takes care of cleaning up resources for those in "TerminatedWithoutReleaseResources" stage as needed. ### Does this PR introduce any user-facing change? No - internal step(s) changes only ### How was this patch tested? CIs and E2Es shall cover the clean up logic ### Was this patch authored or co-authored using generative AI tooling? No Closes #257 from jiangzho/cleanupForCancel. Authored-by: Zhou JIANG <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent dc87496 commit b100e41

File tree

4 files changed

+386
-82
lines changed

4 files changed

+386
-82
lines changed

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppReconcileStep;
5757
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppResourceObserveStep;
5858
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppRunningStep;
59-
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppTerminatedStep;
6059
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppUnknownStateStep;
6160
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppValidateStep;
6261
import org.apache.spark.k8s.operator.utils.LoggingUtils;
@@ -144,7 +143,7 @@ public List<EventSource<?, SparkApplication>> prepareEventSources(
144143
protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) {
145144
List<AppReconcileStep> steps = new ArrayList<>();
146145
steps.add(new AppValidateStep());
147-
steps.add(new AppTerminatedStep());
146+
steps.add(new AppCleanUpStep());
148147
switch (app.getStatus().getCurrentState().getCurrentStateSummary()) {
149148
case Submitted, ScheduledToRestart -> steps.add(new AppInitStep());
150149
case DriverRequested, DriverStarted -> {
@@ -166,14 +165,6 @@ protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) {
166165
steps.add(
167166
new AppResourceObserveStep(Collections.singletonList(new AppDriverTimeoutObserver())));
168167
}
169-
case DriverReadyTimedOut,
170-
DriverStartTimedOut,
171-
ExecutorsStartTimedOut,
172-
Succeeded,
173-
DriverEvicted,
174-
Failed,
175-
SchedulingFailure ->
176-
steps.add(new AppCleanUpStep());
177168
default -> steps.add(new AppUnknownStateStep());
178169
}
179170
return steps;
@@ -196,7 +187,6 @@ public DeleteControl cleanup(
196187
SparkAppContext ctx = new SparkAppContext(sparkApplication, context, submissionWorker);
197188
List<AppReconcileStep> cleanupSteps = new ArrayList<>();
198189
cleanupSteps.add(new AppValidateStep());
199-
cleanupSteps.add(new AppTerminatedStep());
200190
cleanupSteps.add(new AppCleanUpStep(SparkAppStatusUtils::appCancelled));
201191
for (AppReconcileStep step : cleanupSteps) {
202192
ReconcileProgress progress = step.reconcile(ctx, sparkAppStatusRecorder);

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java

Lines changed: 88 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.time.Instant;
2424
import java.util.ArrayList;
2525
import java.util.List;
26+
import java.util.Map;
27+
import java.util.NavigableMap;
2628
import java.util.Optional;
2729
import java.util.function.Supplier;
2830

@@ -54,38 +56,78 @@
5456
@NoArgsConstructor
5557
@Slf4j
5658
public class AppCleanUpStep extends AppReconcileStep {
57-
private Supplier<ApplicationState> cleanUpSuccessStateSupplier;
59+
private Supplier<ApplicationState> onDemandCleanUpReason;
5860

61+
/**
62+
* Cleanup secondary resources for an application if needed and updates application status
63+
* accordingly. This step would be performed right after validation step in each reconcile as a
64+
* sanity check. It may end the reconciliation if no more actions are needed. In addition, it can
65+
* be performed on demand with a reason for cleanup secondary resources.
66+
*
67+
* <p>An app expects its secondary resources to be released if any of the below is true:
68+
*
69+
* <ul>
70+
* <li>When the application is being deleted on demand(e.g. being deleted) with a reason
71+
* <li>When the application is stopping
72+
* <li>When the application has terminated without releasing resources, but it has exceeded
73+
* configured retention duration
74+
* </ul>
75+
*
76+
* <p>It would proceed to next steps with no actions for application in other states. Note that
77+
* when even the reconciler decides to proceed with clean up, sub-resources may still be retained
78+
* based on tolerations.
79+
*
80+
* @param context context for the app
81+
* @param statusRecorder recorder for status updates in this reconcile
82+
* @return the reconcile progress
83+
*/
5984
@Override
6085
public ReconcileProgress reconcile(
6186
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
62-
ApplicationStatus currentStatus = context.getResource().getStatus();
63-
ApplicationTolerations tolerations =
64-
context.getResource().getSpec().getApplicationTolerations();
65-
ResourceRetainPolicy resourceRetainPolicy = tolerations.getResourceRetainPolicy();
87+
SparkApplication application = context.getResource();
88+
ApplicationStatus currentStatus = application.getStatus();
89+
ApplicationState currentState = currentStatus.getCurrentState();
90+
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
91+
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
92+
statusRecorder.removeCachedStatus(application);
93+
return ReconcileProgress.completeAndNoRequeue();
94+
}
6695
String stateMessage = null;
67-
68-
if (retainReleaseResource(resourceRetainPolicy, currentStatus.getCurrentState())) {
69-
if (tolerations.getRestartConfig() != null
70-
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
71-
stateMessage =
72-
"Application is configured to restart, resources created in current "
73-
+ "attempt would be force released.";
74-
log.warn(stateMessage);
96+
if (isOnDemandCleanup()) {
97+
log.info("Cleaning up application resources on demand");
98+
} else {
99+
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
100+
currentState.getCurrentStateSummary())) {
101+
statusRecorder.removeCachedStatus(application);
102+
return ReconcileProgress.completeAndNoRequeue();
103+
} else if (currentState.getCurrentStateSummary().isStopping()) {
104+
if (retainReleaseResourceForPolicyAndState(
105+
tolerations.getResourceRetainPolicy(), currentState)) {
106+
if (tolerations.getRestartConfig() != null
107+
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
108+
stateMessage =
109+
"Application is configured to restart, resources created in current "
110+
+ "attempt would be force released.";
111+
log.warn(stateMessage);
112+
} else {
113+
ApplicationState terminationState =
114+
new ApplicationState(
115+
ApplicationStateSummary.TerminatedWithoutReleaseResources,
116+
"Application is terminated without releasing resources as configured.");
117+
long requeueAfterMillis =
118+
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
119+
return appendStateAndRequeueAfter(
120+
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
121+
}
122+
}
75123
} else {
76-
ApplicationState terminationState =
77-
new ApplicationState(
78-
ApplicationStateSummary.TerminatedWithoutReleaseResources,
79-
"Application is terminated without releasing resources as configured.");
80-
long requeueAfterMillis =
81-
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
82-
return appendStateAndRequeueAfter(
83-
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
124+
log.debug("Clean up is not expected for app, proceeding to next step.");
125+
return ReconcileProgress.proceed();
84126
}
85127
}
128+
86129
List<HasMetadata> resourcesToRemove = new ArrayList<>();
87-
if (ApplicationStateSummary.SchedulingFailure.equals(
88-
currentStatus.getCurrentState().getCurrentStateSummary())) {
130+
if (isReleasingResourcesForSchedulingFailureAttempt(currentStatus)) {
89131
// if app failed at scheduling, re-compute all spec and delete as they may not be fully
90132
// owned by driver
91133
try {
@@ -110,13 +152,13 @@ public ReconcileProgress reconcile(
110152
Optional<Pod> driver = context.getDriverPod();
111153
driver.ifPresent(resourcesToRemove::add);
112154
}
113-
boolean forceDelete = enableForceDelete(context.getResource());
155+
boolean forceDelete = enableForceDelete(application);
114156
for (HasMetadata resource : resourcesToRemove) {
115157
ReconcilerUtils.deleteResourceIfExists(context.getClient(), resource, forceDelete);
116158
}
117159
ApplicationStatus updatedStatus;
118-
if (cleanUpSuccessStateSupplier != null) {
119-
ApplicationState state = cleanUpSuccessStateSupplier.get();
160+
if (onDemandCleanUpReason != null) {
161+
ApplicationState state = onDemandCleanUpReason.get();
120162
if (StringUtils.isNotEmpty(stateMessage)) {
121163
state.setMessage(stateMessage);
122164
}
@@ -142,7 +184,26 @@ public ReconcileProgress reconcile(
142184
}
143185
}
144186

145-
protected boolean retainReleaseResource(
187+
protected boolean isOnDemandCleanup() {
188+
return onDemandCleanUpReason != null;
189+
}
190+
191+
protected boolean isReleasingResourcesForSchedulingFailureAttempt(
192+
final ApplicationStatus status) {
193+
ApplicationState lastObservedState = status.getCurrentState();
194+
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
195+
lastObservedState.getCurrentStateSummary())) {
196+
// if the app has already terminated, use the last observed state before termination
197+
NavigableMap<Long, ApplicationState> navMap =
198+
(NavigableMap<Long, ApplicationState>) status.getStateTransitionHistory();
199+
Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry();
200+
lastObservedState = navMap.lowerEntry(terminateState.getKey()).getValue();
201+
}
202+
return ApplicationStateSummary.SchedulingFailure.equals(
203+
lastObservedState.getCurrentStateSummary());
204+
}
205+
206+
protected boolean retainReleaseResourceForPolicyAndState(
146207
ResourceRetainPolicy resourceRetainPolicy, ApplicationState currentState) {
147208
return switch (resourceRetainPolicy) {
148209
case Never -> false;

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppTerminatedStep.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)