Skip to content

Commit b89c5cc

Browse files
jiangzhoPeter Toth
authored andcommitted
[SPARK-52915] Support TTL for Spark apps
### What changes were proposed in this pull request? This PR adds support for configuring the ttl for Spark apps after it stops. Working with the `resourceRetainPolicy` and `resourceRetainDurationMillis`, it enhances the garbage collection mechanism at the custom resource level. ### Why are the changes needed? Introducing TTL helps user to more effectively configure the garbage collection for apps. ### Does this PR introduce _any_ user-facing change? New configurable field spec.applicationTolerations.ttlAfterStopMillis added to SparkApplication CRD ### How was this patch tested? CIs - including new unit test and revised e2e scenario ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#290 from jiangzho/resource_ttl. Authored-by: Zhou JIANG <[email protected]> Signed-off-by: Peter Toth <[email protected]>
1 parent cc4302a commit b89c5cc

File tree

8 files changed

+382
-54
lines changed

8 files changed

+382
-54
lines changed

build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17434,6 +17434,9 @@ spec:
1743417434
- OnInfrastructureFailure
1743517435
type: string
1743617436
type: object
17437+
ttlAfterStopMillis:
17438+
default: -1
17439+
type: integer
1743717440
type: object
1743817441
configMapSpecs:
1743917442
items:

docs/spark_custom_resources.md

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ applicationTolerations:
293293
resourceRetainPolicy: OnFailure
294294
# Secondary resources would be garbage collected 10 minutes after app termination
295295
resourceRetainDurationMillis: 600000
296+
# Garbage collect the SparkApplication custom resource itself 30 minutes after termination
297+
ttlAfterStopMillis: 1800000
296298
```
297299

298300
to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly,
@@ -302,7 +304,54 @@ possible to configure `resourceRetainDurationMillis` to define the maximal retai
302304
these resources. Note that this applies only to operator-created resources (driver pod, SparkConf
303305
configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination`
304306
and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created
305-
resources.
307+
resources. `ttlAfterStopMillis` controls the garbage collection behavior at the SparkApplication
308+
level after it stops. When set to a non-negative value, Spark operator would garbage collect the
309+
application (and therefore all its associated resources) after given timeout. If the application
310+
is configured to restart, `resourceRetainPolicy`, `resourceRetainDurationMillis` and
311+
`ttlAfterStopMillis` would be applied only to the last attempt.
312+
313+
For example, if an app with below configuration:
314+
315+
```yaml
316+
applicationTolerations:
317+
restartConfig:
318+
restartPolicy: OnFailure
319+
maxRestartAttempts: 1
320+
resourceRetainPolicy: Always
321+
resourceRetainDurationMillis: 30000
322+
ttlAfterStopMillis: 60000
323+
```
324+
325+
ends up with status like:
326+
327+
```yaml
328+
status:
329+
#... the 1st attempt
330+
"5":
331+
currentStateSummary: Failed
332+
"6":
333+
currentStateSummary: ScheduledToRestart
334+
# ...the 2nd attempt
335+
"11":
336+
currentStateSummary: Succeeded
337+
"12":
338+
currentStateSummary: TerminatedWithoutReleaseResources
339+
```
340+
341+
The retain policy only takes effect after the final state `12`. Secondary resources are always
342+
released between attempts between `5` and `6`. TTL would be calculated based on the last state as
343+
well.
344+
345+
| Field | Type | Default Value | Description |
346+
|-----------------------------------------------------------|-----------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
347+
| .spec.applicationTolerations.resourceRetainPolicy | `Always` / `OnFailure` / `Never` | Never | Configure operator to delete / retain secondary resources for an app after it terminates. |
348+
| .spec.applicationTolerations.resourceRetainDurationMillis | integer | -1 | Time to wait in milliseconds for releasing **secondary resources** after termination. Setting to negative value would disable the retention duration check for secondary resources after termination. |
349+
| .spec.applicationTolerations.ttlAfterStopMillis | integer | -1 | Time-to-live in milliseconds for SparkApplication and **all its associated secondary resources**. If set to a negative value, the application would be retained and not be garbage collected by operator. |
350+
351+
Note that `ttlAfterStopMillis` applies to the app as well as its secondary resources. If both
352+
`resourceRetainDurationMillis` and `ttlAfterStopMillis` are set to non-negative value and the
353+
latter is smaller, then it takes higher precedence: operator would remove all resources related
354+
to this app after `ttlAfterStopMillis`.
306355

307356
## Spark Cluster
308357

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationTolerations.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,34 @@ public class ApplicationTolerations {
5858
@Builder.Default
5959
protected Long resourceRetainDurationMillis = -1L;
6060

61+
/**
62+
* Time-to-live in milliseconds for SparkApplication and all its associated secondary resources
63+
* after stop. If set to a negative value, the application could be retained according to the
64+
* retain policy. If the application is configured to restart, this would apply to the last
65+
* attempt only.
66+
*/
67+
@Default("-1")
68+
@Builder.Default
69+
protected Long ttlAfterStopMillis = -1L;
70+
71+
/**
72+
* @return The effective retain duration for secondary resources, which would be the smaller value
73+
* of `resourceRetainDurationMillis` or `ttlAfterStopMillis`, if they are set to non-negative
74+
* value. Return -1 if none of them are set.
75+
*/
76+
public long computeEffectiveRetainDurationMillis() {
77+
if (resourceRetainDurationMillis < 0 && ttlAfterStopMillis < 0) {
78+
return -1L;
79+
}
80+
if (resourceRetainDurationMillis < 0) {
81+
return ttlAfterStopMillis;
82+
}
83+
if (ttlAfterStopMillis < 0) {
84+
return resourceRetainDurationMillis;
85+
}
86+
return Math.min(resourceRetainDurationMillis, ttlAfterStopMillis);
87+
}
88+
6189
/**
6290
* Check whether a terminated application has exceeded the resource retain duration at the
6391
* provided instant
@@ -68,20 +96,30 @@ public class ApplicationTolerations {
6896
*/
6997
public boolean exceedRetainDurationAtInstant(
7098
ApplicationState lastObservedState, Instant instant) {
71-
return lastObservedState != null
99+
return isRetainDurationEnabled()
100+
&& lastObservedState != null
72101
&& lastObservedState.getCurrentStateSummary().isTerminated()
73-
&& resourceRetainDurationMillis > 0L
74102
&& Instant.parse(lastObservedState.getLastTransitionTime())
75-
.plusMillis(resourceRetainDurationMillis)
103+
.plusMillis(computeEffectiveRetainDurationMillis())
76104
.isBefore(instant);
77105
}
78106

79107
/**
80108
* Indicates whether the reconciler need to perform retain duration check
81109
*
82-
* @return true `resourceRetainDurationMillis` is set to non-negative value
110+
* @return true if `resourceRetainDurationMillis` or `ttlAfterStopMillis` is set to non-negative
111+
* value
83112
*/
84113
public boolean isRetainDurationEnabled() {
85-
return resourceRetainDurationMillis >= 0L;
114+
return resourceRetainDurationMillis >= 0L || ttlAfterStopMillis >= 0L;
115+
}
116+
117+
/**
118+
* Indicates whether the reconciler need to perform ttl check
119+
*
120+
* @return true if `ttlAfterStopMillis` is set to non-negative value
121+
*/
122+
public boolean isTTLEnabled() {
123+
return ttlAfterStopMillis >= 0L;
86124
}
87125
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.spec;
21+
22+
import static org.junit.jupiter.api.Assertions.*;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
class ApplicationTolerationsTest {
27+
private final ApplicationTolerations withRetainDurationOnly =
28+
ApplicationTolerations.builder().resourceRetainDurationMillis(10L).build();
29+
private final ApplicationTolerations withTTLOnly =
30+
ApplicationTolerations.builder().ttlAfterStopMillis(10L).build();
31+
private final ApplicationTolerations withNeitherRetainDurationNorTtl =
32+
ApplicationTolerations.builder().build();
33+
private final ApplicationTolerations withRetainDurationGreaterThanTtl =
34+
ApplicationTolerations.builder()
35+
.resourceRetainDurationMillis(20L)
36+
.ttlAfterStopMillis(10L)
37+
.build();
38+
private final ApplicationTolerations withRetainDurationShorterThanTtl =
39+
ApplicationTolerations.builder()
40+
.resourceRetainDurationMillis(10L)
41+
.ttlAfterStopMillis(20L)
42+
.build();
43+
44+
@Test
45+
void computeEffectiveRetainDurationMillis() {
46+
assertEquals(10L, withRetainDurationOnly.computeEffectiveRetainDurationMillis());
47+
assertEquals(10L, withTTLOnly.computeEffectiveRetainDurationMillis());
48+
assertEquals(-1, withNeitherRetainDurationNorTtl.computeEffectiveRetainDurationMillis());
49+
assertEquals(10L, withRetainDurationGreaterThanTtl.computeEffectiveRetainDurationMillis());
50+
assertEquals(10L, withRetainDurationShorterThanTtl.computeEffectiveRetainDurationMillis());
51+
}
52+
53+
@Test
54+
void isRetainDurationEnabled() {
55+
assertTrue(withRetainDurationOnly.isRetainDurationEnabled());
56+
assertTrue(withTTLOnly.isRetainDurationEnabled());
57+
assertFalse(withNeitherRetainDurationNorTtl.isRetainDurationEnabled());
58+
assertTrue(withRetainDurationGreaterThanTtl.isRetainDurationEnabled());
59+
assertTrue(withRetainDurationShorterThanTtl.isRetainDurationEnabled());
60+
}
61+
62+
@Test
63+
void isTTLEnabled() {
64+
assertFalse(withRetainDurationOnly.isTTLEnabled());
65+
assertTrue(withTTLOnly.isTTLEnabled());
66+
assertFalse(withNeitherRetainDurationNorTtl.isTTLEnabled());
67+
assertTrue(withRetainDurationGreaterThanTtl.isTTLEnabled());
68+
assertTrue(withRetainDurationShorterThanTtl.isTTLEnabled());
69+
}
70+
}

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import io.fabric8.kubernetes.api.model.HasMetadata;
3232
import io.fabric8.kubernetes.api.model.Pod;
33+
import io.fabric8.kubernetes.client.KubernetesClient;
3334
import lombok.NoArgsConstructor;
3435
import lombok.extern.slf4j.Slf4j;
3536
import org.apache.commons.lang3.StringUtils;
@@ -95,7 +96,7 @@ public ReconcileProgress reconcile(
9596
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
9697
if (currentState.getCurrentStateSummary().isTerminated()) {
9798
Optional<ReconcileProgress> terminatedAppProgress =
98-
checkEarlyExitForTerminatedApp(application, statusRecorder);
99+
checkEarlyExitForTerminatedApp(context.getClient(), application, statusRecorder);
99100
if (terminatedAppProgress.isPresent()) {
100101
return terminatedAppProgress.get();
101102
}
@@ -184,36 +185,70 @@ public ReconcileProgress reconcile(
184185
}
185186
}
186187

187-
protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
188+
protected Optional<ReconcileProgress> clearCacheAndFinishReconcileForApplication(
188189
final SparkApplication application, final SparkAppStatusRecorder statusRecorder) {
190+
log.debug("Cleaning up status cache and stop reconciling for application.");
191+
statusRecorder.removeCachedStatus(application);
192+
return Optional.of(ReconcileProgress.completeAndNoRequeue());
193+
}
194+
195+
protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
196+
final KubernetesClient client,
197+
final SparkApplication application,
198+
final SparkAppStatusRecorder statusRecorder) {
189199
ApplicationStatus currentStatus = application.getStatus();
190200
ApplicationState currentState = currentStatus.getCurrentState();
191201
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
202+
Instant now = Instant.now();
192203
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
193-
statusRecorder.removeCachedStatus(application);
194-
return Optional.of(ReconcileProgress.completeAndNoRequeue());
204+
// Perform TTL check after removing all secondary resources, if enabled
205+
if (isOnDemandCleanup() || !tolerations.isTTLEnabled()) {
206+
// all secondary resources have been released, no more reconciliations needed
207+
return clearCacheAndFinishReconcileForApplication(application, statusRecorder);
208+
} else {
209+
ApplicationState lastObservedStateBeforeTermination =
210+
getLastObservedStateBeforeTermination(currentStatus);
211+
Duration nextCheckDuration =
212+
Duration.between(
213+
now,
214+
Instant.parse(lastObservedStateBeforeTermination.getLastTransitionTime())
215+
.plusMillis(tolerations.getTtlAfterStopMillis()));
216+
if (nextCheckDuration.isNegative()) {
217+
log.info("Garbage collecting application exceeded given ttl.");
218+
ReconcilerUtils.deleteResourceIfExists(client, application, true);
219+
return clearCacheAndFinishReconcileForApplication(application, statusRecorder);
220+
} else {
221+
log.info(
222+
"Application has yet expired, reconciliation would be resumed in {} millis.",
223+
nextCheckDuration.toMillis());
224+
return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
225+
}
226+
}
195227
}
196228
if (isOnDemandCleanup()) {
197229
return Optional.empty();
198230
}
199231
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
200232
currentState.getCurrentStateSummary())) {
201233
if (tolerations.isRetainDurationEnabled()) {
202-
Instant now = Instant.now();
203234
if (tolerations.exceedRetainDurationAtInstant(currentState, now)) {
235+
log.info("Garbage collecting secondary resources for application");
204236
onDemandCleanUpReason = SparkAppStatusUtils::appExceededRetainDuration;
205237
return Optional.empty();
206238
} else {
207239
Duration nextCheckDuration =
208240
Duration.between(
209-
Instant.now(),
241+
now,
210242
Instant.parse(currentState.getLastTransitionTime())
211-
.plusMillis(tolerations.getResourceRetainDurationMillis()));
243+
.plusMillis(tolerations.computeEffectiveRetainDurationMillis()));
244+
log.info(
245+
"Application is within retention, reconciliation would be resumed in {} millis.",
246+
nextCheckDuration.toMillis());
212247
return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
213248
}
214249
} else {
215-
statusRecorder.removeCachedStatus(application);
216-
return Optional.of(ReconcileProgress.completeAndNoRequeue());
250+
log.info("Retention duration check is not enabled for application.");
251+
return clearCacheAndFinishReconcileForApplication(application, statusRecorder);
217252
}
218253
}
219254
return Optional.empty();
@@ -223,17 +258,25 @@ protected boolean isOnDemandCleanup() {
223258
return onDemandCleanUpReason != null;
224259
}
225260

226-
protected boolean isReleasingResourcesForSchedulingFailureAttempt(
227-
final ApplicationStatus status) {
261+
/**
262+
* @param status status of the application
263+
* @return The last observed state before termination if the app has terminated. If the app has
264+
* not terminated, return the last observed state
265+
*/
266+
protected ApplicationState getLastObservedStateBeforeTermination(final ApplicationStatus status) {
228267
ApplicationState lastObservedState = status.getCurrentState();
229-
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
230-
lastObservedState.getCurrentStateSummary())) {
231-
// if the app has already terminated, use the last observed state before termination
268+
if (lastObservedState.getCurrentStateSummary().isTerminated()) {
232269
NavigableMap<Long, ApplicationState> navMap =
233270
(NavigableMap<Long, ApplicationState>) status.getStateTransitionHistory();
234271
Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry();
235-
lastObservedState = navMap.lowerEntry(terminateState.getKey()).getValue();
272+
return navMap.lowerEntry(terminateState.getKey()).getValue();
236273
}
274+
return lastObservedState;
275+
}
276+
277+
protected boolean isReleasingResourcesForSchedulingFailureAttempt(
278+
final ApplicationStatus status) {
279+
ApplicationState lastObservedState = getLastObservedStateBeforeTermination(status);
237280
return ApplicationStateSummary.SchedulingFailure.equals(
238281
lastObservedState.getCurrentStateSummary());
239282
}

0 commit comments

Comments
 (0)