Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17434,6 +17434,9 @@ spec:
- OnInfrastructureFailure
type: string
type: object
ttlAfterStopMillis:
default: -1
type: integer
type: object
configMapSpecs:
items:
Expand Down
51 changes: 50 additions & 1 deletion docs/spark_custom_resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ applicationTolerations:
resourceRetainPolicy: OnFailure
# Secondary resources would be garbage collected 10 minutes after app termination
resourceRetainDurationMillis: 600000
# Garbage collect the SparkApplication custom resource itself 30 minutes after termination
ttlAfterStopMillis: 1800000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is -1 in the code, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. This is only an example value placed in this snippet. I can add a chart for the actual default values.

```

to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly,
Expand All @@ -302,7 +304,54 @@ possible to configure `resourceRetainDurationMillis` to define the maximal retai
these resources. Note that this applies only to operator-created resources (driver pod, SparkConf
configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination`
and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created
resources.
resources. `ttlAfterStopMillis` controls the garbage collection behavior at the SparkApplication
level after it stops. When set to a non-negative value, Spark operator would garbage collect the
application (and therefore all its associated resources) after given timeout. If the application
is configured to restart, `resourceRetainPolicy`, `resourceRetainDurationMillis` and
`ttlAfterStopMillis` would be applied only to the last attempt.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean, @jiangzho ?

  • ttlAfterStopMillis is ignored when resourceRetainDurationMillis configuration exists?
  • Or, max(ttlAfterStopMillis , resourceRetainDurationMillis) is applied?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the code. So, Math.min(resourceRetainDurationMillis, ttlAfterStopMillis) is applied, right? I believe we need to rewrite this sentence. Given that the logic, this looks inaccurate to me.

ttlAfterStopMillis would be applied only to the last attempt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry for the slightly misleading statement, it actually refers to another perspective.

  • All 3 fields ttlAfterStopMillis , resourceRetainDurationMillis and resourceRetainPolicy apply to the last attempt only if the app is configured to restart
    • When an app is configured to restart, all resources related to one single attempt(driver, executor, ...) would be released before making the next attempt, regardless of the values configured in above 3 fields - except for the last attempt, when no restart expected.

For example, if I do configure

applicationTolerations: 
  restartConfig:
     restartPolicy: OnFailure
     maxRestartAttempts: 1
  resourceRetainPolicy: Always
  resourceRetainDurationMillis: 30000 
  ttlAfterStopMillis: 60000

and my app ends up with status like

status:
... # the 1st attempt
      "5":
        currentStateSummary: Failed
        lastTransitionTime: "2025-07-30T22:43:15.293414Z"
      "6":
        currentStateSummary: ScheduledToRestart
        lastTransitionTime: "2025-07-30T22:43:15.406645Z"
... # the 2nd attempt
      "11":
        currentStateSummary: Succeeded
        lastTransitionTime: "2025-07-30T22:44:15.503645Z"
      "12":
        currentStateSummary: TerminatedWithoutReleaseResources
        lastTransitionTime: "2025-07-30T22:44:15.503645Z"

The retain policy only takes effect after state 12 - resources are always released between attempts between 5 and 6

Thanks for calling out the Math.min(resourceRetainDurationMillis, ttlAfterStopMillis) prt - forgot to mention it in doc. I'll add it in chart mentioned above


For example, if an app with below configuration:

```yaml
applicationTolerations:
restartConfig:
restartPolicy: OnFailure
maxRestartAttempts: 1
resourceRetainPolicy: Always
resourceRetainDurationMillis: 30000
ttlAfterStopMillis: 60000
```

ends up with status like:

```yaml
status:
#... the 1st attempt
"5":
currentStateSummary: Failed
"6":
currentStateSummary: ScheduledToRestart
# ...the 2nd attempt
"11":
currentStateSummary: Succeeded
"12":
currentStateSummary: TerminatedWithoutReleaseResources
```

The retain policy only takes effect after the final state `12`. Secondary resources are always
released between attempts between `5` and `6`. TTL would be calculated based on the last state as
well.

| Field | Type | Default Value | Description |
|-----------------------------------------------------------|-----------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| .spec.applicationTolerations.resourceRetainPolicy | `Always` / `OnFailure` / `Never` | Never | Configure operator to delete / retain secondary resources for an app after it terminates. |
| .spec.applicationTolerations.resourceRetainDurationMillis | integer | -1 | Time to wait in milliseconds for releasing **secondary resources** after termination. Setting to negative value would disable the retention duration check for secondary resources after termination. |
| .spec.applicationTolerations.ttlAfterStopMillis | integer | -1 | Time-to-live in milliseconds for SparkApplication and **all its associated secondary resources**. If set to a negative value, the application would be retained and not be garbage collected by operator. |

Note that `ttlAfterStopMillis` applies to the app as well as its secondary resources. If both
`resourceRetainDurationMillis` and `ttlAfterStopMillis` are set to non-negative value and the
latter is smaller, then it takes higher precedence: operator would remove all resources related
to this app after `ttlAfterStopMillis`.

## Spark Cluster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@ public class ApplicationTolerations {
@Builder.Default
protected Long resourceRetainDurationMillis = -1L;

/**
* Time-to-live in milliseconds for SparkApplication and all its associated secondary resources
* after stop. If set to a negative value, the application could be retained according to the
* retain policy. If the application is configured to restart, this would apply to the last
* attempt only.
*/
@Default("-1")
@Builder.Default
protected Long ttlAfterStopMillis = -1L;

/**
* @return The effective retain duration for secondary resources, which would be the smaller value
* of `resourceRetainDurationMillis` or `ttlAfterStopMillis`, if they are set to non-negative
* value. Return -1 if none of them are set.
*/
public long computeEffectiveRetainDurationMillis() {
if (resourceRetainDurationMillis < 0 && ttlAfterStopMillis < 0) {
return -1L;
}
if (resourceRetainDurationMillis < 0) {
return ttlAfterStopMillis;
}
if (ttlAfterStopMillis < 0) {
return resourceRetainDurationMillis;
}
return Math.min(resourceRetainDurationMillis, ttlAfterStopMillis);
}

/**
* Check whether a terminated application has exceeded the resource retain duration at the
* provided instant
Expand All @@ -68,20 +96,30 @@ public class ApplicationTolerations {
*/
public boolean exceedRetainDurationAtInstant(
ApplicationState lastObservedState, Instant instant) {
return lastObservedState != null
return isRetainDurationEnabled()
&& lastObservedState != null
&& lastObservedState.getCurrentStateSummary().isTerminated()
&& resourceRetainDurationMillis > 0L
&& Instant.parse(lastObservedState.getLastTransitionTime())
.plusMillis(resourceRetainDurationMillis)
.plusMillis(computeEffectiveRetainDurationMillis())
.isBefore(instant);
}

/**
* Indicates whether the reconciler need to perform retain duration check
*
* @return true `resourceRetainDurationMillis` is set to non-negative value
* @return true if `resourceRetainDurationMillis` or `ttlAfterStopMillis` is set to non-negative
* value
*/
public boolean isRetainDurationEnabled() {
return resourceRetainDurationMillis >= 0L;
return resourceRetainDurationMillis >= 0L || ttlAfterStopMillis >= 0L;
}

/**
* Indicates whether the reconciler need to perform ttl check
*
* @return true if `ttlAfterStopMillis` is set to non-negative value
*/
public boolean isTTLEnabled() {
return ttlAfterStopMillis >= 0L;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.k8s.operator.spec;

import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;

class ApplicationTolerationsTest {
private final ApplicationTolerations withRetainDurationOnly =
ApplicationTolerations.builder().resourceRetainDurationMillis(10L).build();
private final ApplicationTolerations withTTLOnly =
ApplicationTolerations.builder().ttlAfterStopMillis(10L).build();
private final ApplicationTolerations withNeitherRetainDurationNorTtl =
ApplicationTolerations.builder().build();
private final ApplicationTolerations withRetainDurationGreaterThanTtl =
ApplicationTolerations.builder()
.resourceRetainDurationMillis(20L)
.ttlAfterStopMillis(10L)
.build();
private final ApplicationTolerations withRetainDurationShorterThanTtl =
ApplicationTolerations.builder()
.resourceRetainDurationMillis(10L)
.ttlAfterStopMillis(20L)
.build();

@Test
void computeEffectiveRetainDurationMillis() {
assertEquals(10L, withRetainDurationOnly.computeEffectiveRetainDurationMillis());
assertEquals(10L, withTTLOnly.computeEffectiveRetainDurationMillis());
assertEquals(-1, withNeitherRetainDurationNorTtl.computeEffectiveRetainDurationMillis());
assertEquals(10L, withRetainDurationGreaterThanTtl.computeEffectiveRetainDurationMillis());
assertEquals(10L, withRetainDurationShorterThanTtl.computeEffectiveRetainDurationMillis());
}

@Test
void isRetainDurationEnabled() {
assertTrue(withRetainDurationOnly.isRetainDurationEnabled());
assertTrue(withTTLOnly.isRetainDurationEnabled());
assertFalse(withNeitherRetainDurationNorTtl.isRetainDurationEnabled());
assertTrue(withRetainDurationGreaterThanTtl.isRetainDurationEnabled());
assertTrue(withRetainDurationShorterThanTtl.isRetainDurationEnabled());
}

@Test
void isTTLEnabled() {
assertFalse(withRetainDurationOnly.isTTLEnabled());
assertTrue(withTTLOnly.isTTLEnabled());
assertFalse(withNeitherRetainDurationNorTtl.isTTLEnabled());
assertTrue(withRetainDurationGreaterThanTtl.isTTLEnabled());
assertTrue(withRetainDurationShorterThanTtl.isTTLEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -95,7 +96,7 @@ public ReconcileProgress reconcile(
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
if (currentState.getCurrentStateSummary().isTerminated()) {
Optional<ReconcileProgress> terminatedAppProgress =
checkEarlyExitForTerminatedApp(application, statusRecorder);
checkEarlyExitForTerminatedApp(context.getClient(), application, statusRecorder);
if (terminatedAppProgress.isPresent()) {
return terminatedAppProgress.get();
}
Expand Down Expand Up @@ -184,36 +185,70 @@ public ReconcileProgress reconcile(
}
}

protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
protected Optional<ReconcileProgress> clearCacheAndFinishReconcileForApplication(
final SparkApplication application, final SparkAppStatusRecorder statusRecorder) {
log.debug("Cleaning up status cache and stop reconciling for application.");
statusRecorder.removeCachedStatus(application);
return Optional.of(ReconcileProgress.completeAndNoRequeue());
}

protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
final KubernetesClient client,
final SparkApplication application,
final SparkAppStatusRecorder statusRecorder) {
ApplicationStatus currentStatus = application.getStatus();
ApplicationState currentState = currentStatus.getCurrentState();
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
Instant now = Instant.now();
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
statusRecorder.removeCachedStatus(application);
return Optional.of(ReconcileProgress.completeAndNoRequeue());
// Perform TTL check after removing all secondary resources, if enabled
if (isOnDemandCleanup() || !tolerations.isTTLEnabled()) {
// all secondary resources have been released, no more reconciliations needed
return clearCacheAndFinishReconcileForApplication(application, statusRecorder);
} else {
ApplicationState lastObservedStateBeforeTermination =
getLastObservedStateBeforeTermination(currentStatus);
Duration nextCheckDuration =
Duration.between(
now,
Instant.parse(lastObservedStateBeforeTermination.getLastTransitionTime())
.plusMillis(tolerations.getTtlAfterStopMillis()));
if (nextCheckDuration.isNegative()) {
log.info("Garbage collecting application exceeded given ttl.");
ReconcilerUtils.deleteResourceIfExists(client, application, true);
return clearCacheAndFinishReconcileForApplication(application, statusRecorder);
} else {
log.info(
"Application has yet expired, reconciliation would be resumed in {} millis.",
nextCheckDuration.toMillis());
return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
}
}
}
if (isOnDemandCleanup()) {
return Optional.empty();
}
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
currentState.getCurrentStateSummary())) {
if (tolerations.isRetainDurationEnabled()) {
Instant now = Instant.now();
if (tolerations.exceedRetainDurationAtInstant(currentState, now)) {
log.info("Garbage collecting secondary resources for application");
onDemandCleanUpReason = SparkAppStatusUtils::appExceededRetainDuration;
return Optional.empty();
} else {
Duration nextCheckDuration =
Duration.between(
Instant.now(),
now,
Instant.parse(currentState.getLastTransitionTime())
.plusMillis(tolerations.getResourceRetainDurationMillis()));
.plusMillis(tolerations.computeEffectiveRetainDurationMillis()));
log.info(
"Application is within retention, reconciliation would be resumed in {} millis.",
nextCheckDuration.toMillis());
return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
}
} else {
statusRecorder.removeCachedStatus(application);
return Optional.of(ReconcileProgress.completeAndNoRequeue());
log.info("Retention duration check is not enabled for application.");
return clearCacheAndFinishReconcileForApplication(application, statusRecorder);
}
}
return Optional.empty();
Expand All @@ -223,17 +258,25 @@ protected boolean isOnDemandCleanup() {
return onDemandCleanUpReason != null;
}

protected boolean isReleasingResourcesForSchedulingFailureAttempt(
final ApplicationStatus status) {
/**
* @param status status of the application
* @return The last observed state before termination if the app has terminated. If the app has
* not terminated, return the last observed state
*/
protected ApplicationState getLastObservedStateBeforeTermination(final ApplicationStatus status) {
ApplicationState lastObservedState = status.getCurrentState();
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
lastObservedState.getCurrentStateSummary())) {
// if the app has already terminated, use the last observed state before termination
if (lastObservedState.getCurrentStateSummary().isTerminated()) {
NavigableMap<Long, ApplicationState> navMap =
(NavigableMap<Long, ApplicationState>) status.getStateTransitionHistory();
Map.Entry<Long, ApplicationState> terminateState = navMap.lastEntry();
lastObservedState = navMap.lowerEntry(terminateState.getKey()).getValue();
return navMap.lowerEntry(terminateState.getKey()).getValue();
}
return lastObservedState;
}

protected boolean isReleasingResourcesForSchedulingFailureAttempt(
final ApplicationStatus status) {
ApplicationState lastObservedState = getLastObservedStateBeforeTermination(status);
return ApplicationStateSummary.SchedulingFailure.equals(
lastObservedState.getCurrentStateSummary());
}
Expand Down
Loading
Loading