Skip to content

Commit 2895467

Browse files
jiangzhodongjoon-hyun
authored andcommitted
[SPARK-52648] Add support for maximal retain duration for Spark application resources
### What changes were proposed in this pull request? This PR adds support for configuring the maximal retain duration for Spark apps. Working with the resourceRetainPolicy, it enhances the garbage collection mechanism. ### Why are the changes needed? Current resourceRetainPolicy provides flexibility for retain Spark app resources after its terminated. Introducing maximal retain duration would add one protection layer to avoid terminated resources (pods, config maps .etc) from taking quota in cluster. ### Does this PR introduce _any_ user-facing change? New configurable field `spec.applicationTolerations.resourceRetainDurationMillis` added to SparkApplication CRD ### How was this patch tested? CIs - including new unit test and e2e scenario ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#268 from jiangzho/retentionDuration. Authored-by: Zhou JIANG <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4c6b30c commit 2895467

File tree

12 files changed

+552
-54
lines changed

12 files changed

+552
-54
lines changed

.github/workflows/build_and_test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ jobs:
8282
- spark-versions
8383
- python
8484
- state-transition
85+
- resource-retain-duration
8586
- watched-namespaces
8687
exclude:
8788
- mode: dynamic
@@ -90,6 +91,8 @@ jobs:
9091
test-group: python
9192
- mode: dynamic
9293
test-group: state-transition
94+
- mode: dynamic
95+
test-group: resource-retain-duration
9396
- mode: static
9497
test-group: watched-namespaces
9598
steps:

build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17411,6 +17411,9 @@ spec:
1741117411
minExecutors:
1741217412
type: integer
1741317413
type: object
17414+
resourceRetainDurationMillis:
17415+
default: -1
17416+
type: integer
1741417417
resourceRetainPolicy:
1741517418
enum:
1741617419
- Always

docs/spark_custom_resources.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,12 +289,17 @@ On the other hand, when developing an application, it's possible to configure
289289
```yaml
290290
applicationTolerations:
291291
# Acceptable values are 'Always', 'OnFailure', 'Never'
292+
# Setting this to 'OnFailure' would retain secondary resources if and only if the app fails
292293
resourceRetainPolicy: OnFailure
294+
# Secondary resources would be garbage collected 10 minutes after app termination
295+
resourceRetainDurationMillis: 600000
293296
```
294297

295298
to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly,
296299
if resourceRetainPolicy is set to `Always`, operator would not delete driver resources
297-
when app ends. Note that this applies only to operator-created resources (driver pod, SparkConf
300+
when app ends. They would be by default kept with the same lifecycle as the App. It's also
301+
possible to configure `resourceRetainDurationMillis` to define the maximal retain duration for
302+
these resources. Note that this applies only to operator-created resources (driver pod, SparkConf
298303
configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination`
299304
and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created
300305
resources.

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class Constants {
6363
+ "observed status for details.";
6464
public static final String APP_CANCELLED_MESSAGE =
6565
"Spark application has been shutdown as requested.";
66+
public static final String APP_EXCEEDED_RETAIN_DURATION_MESSAGE =
67+
"Spark application resources released after exceeding the configured retain duration.";
6668
public static final String DRIVER_UNEXPECTED_REMOVED_MESSAGE =
6769
"Driver removed. This could caused by 'exit' called in driver process with non-zero "
6870
+ "code, involuntary disruptions or unintentional destroy behavior, check "

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@
1919

2020
package org.apache.spark.k8s.operator.spec;
2121

22+
import java.time.Instant;
23+
2224
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2325
import com.fasterxml.jackson.annotation.JsonInclude;
26+
import io.fabric8.generator.annotation.Default;
2427
import lombok.AllArgsConstructor;
2528
import lombok.Builder;
2629
import lombok.Data;
2730
import lombok.NoArgsConstructor;
2831

32+
import org.apache.spark.k8s.operator.status.ApplicationState;
33+
2934
/** Toleration settings for a Spark application. */
3035
@Data
3136
@NoArgsConstructor
@@ -43,4 +48,40 @@ public class ApplicationTolerations {
4348
@Builder.Default protected ExecutorInstanceConfig instanceConfig = new ExecutorInstanceConfig();
4449

4550
@Builder.Default protected ResourceRetainPolicy resourceRetainPolicy = ResourceRetainPolicy.Never;
51+
52+
/**
53+
* Time-to-live in milliseconds for secondary resources of SparkApplications after termination. If
54+
* set to a negative value, secondary resources could be retained with the same lifecycle as the
55+
* application according to the retain policy.
56+
*/
57+
@Default("-1")
58+
@Builder.Default
59+
protected Long resourceRetainDurationMillis = -1L;
60+
61+
/**
62+
* Check whether a terminated application has exceeded the resource retain duration at the
63+
* provided instant
64+
*
65+
* @param lastObservedState last observed state of the application
66+
* @return true if the app has terminated and resource retain duration is configured to a positive
67+
* value and the app is not within retain duration; false otherwise.
68+
*/
69+
public boolean exceedRetainDurationAtInstant(
70+
ApplicationState lastObservedState, Instant instant) {
71+
return lastObservedState != null
72+
&& lastObservedState.getCurrentStateSummary().isTerminated()
73+
&& resourceRetainDurationMillis > 0L
74+
&& Instant.parse(lastObservedState.getLastTransitionTime())
75+
.plusMillis(resourceRetainDurationMillis)
76+
.isBefore(instant);
77+
}
78+
79+
/**
80+
* Indicates whether the reconciler need to perform retain duration check
81+
*
82+
* @return true `resourceRetainDurationMillis` is set to non-negative value
83+
*/
84+
public boolean isRetainDurationEnabled() {
85+
return resourceRetainDurationMillis >= 0L;
86+
}
4687
}

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java

Lines changed: 74 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import io.fabric8.kubernetes.api.model.HasMetadata;
3232
import io.fabric8.kubernetes.api.model.Pod;
33-
import lombok.AllArgsConstructor;
3433
import lombok.NoArgsConstructor;
3534
import lombok.extern.slf4j.Slf4j;
3635
import org.apache.commons.lang3.StringUtils;
@@ -47,16 +46,22 @@
4746
import org.apache.spark.k8s.operator.status.ApplicationStatus;
4847
import org.apache.spark.k8s.operator.utils.ReconcilerUtils;
4948
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
49+
import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils;
5050

5151
/**
5252
* Cleanup all secondary resources when application is deleted, or at the end of each attempt.
5353
* Update Application status to indicate whether another attempt would be made.
5454
*/
55-
@AllArgsConstructor
5655
@NoArgsConstructor
5756
@Slf4j
5857
public class AppCleanUpStep extends AppReconcileStep {
5958
private Supplier<ApplicationState> onDemandCleanUpReason;
59+
private String stateUpdateMessage;
60+
61+
public AppCleanUpStep(Supplier<ApplicationState> onDemandCleanUpReason) {
62+
super();
63+
this.onDemandCleanUpReason = onDemandCleanUpReason;
64+
}
6065

6166
/**
6267
* Cleanup secondary resources for an application if needed and updates application status
@@ -88,42 +93,37 @@ public ReconcileProgress reconcile(
8893
ApplicationStatus currentStatus = application.getStatus();
8994
ApplicationState currentState = currentStatus.getCurrentState();
9095
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
91-
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
92-
statusRecorder.removeCachedStatus(application);
93-
return ReconcileProgress.completeAndNoRequeue();
94-
}
95-
String stateMessage = null;
96-
if (isOnDemandCleanup()) {
97-
log.info("Cleaning up application resources on demand");
98-
} else {
99-
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
100-
currentState.getCurrentStateSummary())) {
101-
statusRecorder.removeCachedStatus(application);
102-
return ReconcileProgress.completeAndNoRequeue();
103-
} else if (currentState.getCurrentStateSummary().isStopping()) {
104-
if (retainReleaseResourceForPolicyAndState(
105-
tolerations.getResourceRetainPolicy(), currentState)) {
106-
if (tolerations.getRestartConfig() != null
107-
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
108-
stateMessage =
109-
"Application is configured to restart, resources created in current "
110-
+ "attempt would be force released.";
111-
log.warn(stateMessage);
112-
} else {
113-
ApplicationState terminationState =
114-
new ApplicationState(
115-
ApplicationStateSummary.TerminatedWithoutReleaseResources,
116-
"Application is terminated without releasing resources as configured.");
117-
long requeueAfterMillis =
118-
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
119-
return appendStateAndRequeueAfter(
120-
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
121-
}
96+
if (currentState.getCurrentStateSummary().isTerminated()) {
97+
Optional<ReconcileProgress> terminatedAppProgress =
98+
checkEarlyExitForTerminatedApp(application, statusRecorder);
99+
if (terminatedAppProgress.isPresent()) {
100+
return terminatedAppProgress.get();
101+
}
102+
} else if (isOnDemandCleanup()) {
103+
log.info("Releasing secondary resources for application on demand.");
104+
} else if (currentState.getCurrentStateSummary().isStopping()) {
105+
if (retainReleaseResourceForPolicyAndState(
106+
tolerations.getResourceRetainPolicy(), currentState)) {
107+
if (tolerations.getRestartConfig() != null
108+
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
109+
stateUpdateMessage =
110+
"Application is configured to restart, resources created in current "
111+
+ "attempt would be force released.";
112+
log.warn(stateUpdateMessage);
113+
} else {
114+
ApplicationState terminationState =
115+
new ApplicationState(
116+
ApplicationStateSummary.TerminatedWithoutReleaseResources,
117+
"Application is terminated without releasing resources as configured.");
118+
long requeueAfterMillis =
119+
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
120+
return appendStateAndRequeueAfter(
121+
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
122122
}
123-
} else {
124-
log.debug("Clean up is not expected for app, proceeding to next step.");
125-
return ReconcileProgress.proceed();
126123
}
124+
} else {
125+
log.debug("Clean up is not expected for app, proceeding to next step.");
126+
return ReconcileProgress.proceed();
127127
}
128128

129129
List<HasMetadata> resourcesToRemove = new ArrayList<>();
@@ -159,8 +159,8 @@ public ReconcileProgress reconcile(
159159
ApplicationStatus updatedStatus;
160160
if (onDemandCleanUpReason != null) {
161161
ApplicationState state = onDemandCleanUpReason.get();
162-
if (StringUtils.isNotEmpty(stateMessage)) {
163-
state.setMessage(stateMessage);
162+
if (StringUtils.isNotEmpty(stateUpdateMessage)) {
163+
state.setMessage(stateUpdateMessage);
164164
}
165165
long requeueAfterMillis =
166166
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
@@ -171,7 +171,7 @@ public ReconcileProgress reconcile(
171171
currentStatus.terminateOrRestart(
172172
tolerations.getRestartConfig(),
173173
tolerations.getResourceRetainPolicy(),
174-
stateMessage,
174+
stateUpdateMessage,
175175
SparkOperatorConf.TRIM_ATTEMPT_STATE_TRANSITION_HISTORY.getValue());
176176
long requeueAfterMillis =
177177
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
@@ -184,6 +184,41 @@ public ReconcileProgress reconcile(
184184
}
185185
}
186186

187+
protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
188+
final SparkApplication application, final SparkAppStatusRecorder statusRecorder) {
189+
ApplicationStatus currentStatus = application.getStatus();
190+
ApplicationState currentState = currentStatus.getCurrentState();
191+
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
192+
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
193+
statusRecorder.removeCachedStatus(application);
194+
return Optional.of(ReconcileProgress.completeAndNoRequeue());
195+
}
196+
if (isOnDemandCleanup()) {
197+
return Optional.empty();
198+
}
199+
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
200+
currentState.getCurrentStateSummary())) {
201+
if (tolerations.isRetainDurationEnabled()) {
202+
Instant now = Instant.now();
203+
if (tolerations.exceedRetainDurationAtInstant(currentState, now)) {
204+
onDemandCleanUpReason = SparkAppStatusUtils::appExceededRetainDuration;
205+
return Optional.empty();
206+
} else {
207+
Duration nextCheckDuration =
208+
Duration.between(
209+
Instant.now(),
210+
Instant.parse(currentState.getLastTransitionTime())
211+
.plusMillis(tolerations.getResourceRetainDurationMillis()));
212+
return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
213+
}
214+
} else {
215+
statusRecorder.removeCachedStatus(application);
216+
return Optional.of(ReconcileProgress.completeAndNoRequeue());
217+
}
218+
}
219+
return Optional.empty();
220+
}
221+
187222
protected boolean isOnDemandCleanup() {
188223
return onDemandCleanUpReason != null;
189224
}

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public static ApplicationState appCancelled() {
6161
ApplicationStateSummary.ResourceReleased, Constants.APP_CANCELLED_MESSAGE);
6262
}
6363

64+
public static ApplicationState appExceededRetainDuration() {
65+
return new ApplicationState(
66+
ApplicationStateSummary.ResourceReleased, Constants.APP_EXCEEDED_RETAIN_DURATION_MESSAGE);
67+
}
68+
6469
public static boolean hasReachedState(
6570
SparkApplication application, ApplicationState stateToCheck) {
6671
return isValidApplicationStatus(application)

spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconcilerTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,61 @@ void testCleanupRunningApp() {
116116
assertTrue(deleteControl.isRemoveFinalizer());
117117
}
118118
}
119+
120+
@SuppressWarnings("PMD.UnusedLocalVariable")
121+
@Test
122+
void testCleanupAppTerminatedWithoutReleaseResources() {
123+
try (MockedConstruction<SparkAppContext> mockAppContext =
124+
mockConstruction(
125+
SparkAppContext.class,
126+
(mock, context) -> {
127+
when(mock.getResource()).thenReturn(app);
128+
when(mock.getClient()).thenReturn(mockClient);
129+
when(mock.getDriverPod()).thenReturn(Optional.of(mockDriver));
130+
when(mock.getDriverPodSpec()).thenReturn(mockDriver);
131+
when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
132+
when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
133+
});
134+
MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) {
135+
// delete app
136+
app.setStatus(
137+
app.getStatus()
138+
.appendNewState(
139+
new ApplicationState(
140+
ApplicationStateSummary.TerminatedWithoutReleaseResources, "")));
141+
DeleteControl deleteControl = reconciler.cleanup(app, mockContext);
142+
assertFalse(deleteControl.isRemoveFinalizer());
143+
utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, mockDriver, false));
144+
assertEquals(
145+
ApplicationStateSummary.ResourceReleased,
146+
app.getStatus().getCurrentState().getCurrentStateSummary());
147+
148+
// proceed delete for terminated app
149+
deleteControl = reconciler.cleanup(app, mockContext);
150+
assertTrue(deleteControl.isRemoveFinalizer());
151+
}
152+
}
153+
154+
@SuppressWarnings("PMD.UnusedLocalVariable")
155+
@Test
156+
void testCleanupAppTerminatedResourceReleased() {
157+
try (MockedConstruction<SparkAppContext> mockAppContext =
158+
mockConstruction(
159+
SparkAppContext.class,
160+
(mock, context) -> {
161+
when(mock.getResource()).thenReturn(app);
162+
when(mock.getClient()).thenReturn(mockClient);
163+
when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
164+
when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
165+
});
166+
MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) {
167+
// delete app
168+
app.setStatus(
169+
app.getStatus()
170+
.appendNewState(new ApplicationState(ApplicationStateSummary.ResourceReleased, "")));
171+
DeleteControl deleteControl = reconciler.cleanup(app, mockContext);
172+
assertTrue(deleteControl.isRemoveFinalizer());
173+
utils.verifyNoInteractions();
174+
}
175+
}
119176
}

0 commit comments

Comments
 (0)