Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ jobs:
- spark-versions
- python
- state-transition
- resource-retain-duration
- watched-namespaces
exclude:
- mode: dynamic
Expand All @@ -90,6 +91,8 @@ jobs:
test-group: python
- mode: dynamic
test-group: state-transition
- mode: dynamic
test-group: resource-retain-duration
- mode: static
test-group: watched-namespaces
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17411,6 +17411,9 @@ spec:
minExecutors:
type: integer
type: object
resourceRetainDurationMillis:
default: -1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will going to happen for the old CRDs (v1alpha and v1beta)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first commit introduces the feature to v1 only and usee would need to upgrade to v1.

I do agree that technically our operator still supports all versions and this can be introduced to all versions. I'll upgrade all them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the same to all versions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no, it seems that you misunderstood the comment, @jiangzho .

They are immutable. You should not change them.

What I asked is that the real time behavior inside K8s cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, we load the old CRDs and store in v1 format. I was wondering what was the default stored value for the old CRDs.

Copy link
Contributor Author

@jiangzho jiangzho Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the confusion - I'll ensure a default value for them to keep the behavior consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default -1 would give unlimited retention duration for old custom resources, as previously

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for confirming, @jiangzho .

type: integer
resourceRetainPolicy:
enum:
- Always
Expand Down
7 changes: 6 additions & 1 deletion docs/spark_custom_resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,17 @@ On the other hand, when developing an application, it's possible to configure
```yaml
applicationTolerations:
# Acceptable values are 'Always', 'OnFailure', 'Never'
# Setting this to 'OnFailure' would retain secondary resources if and only if the app fails
resourceRetainPolicy: OnFailure
# Secondary resources would be garbage collected 10 minutes after app termination
resourceRetainDurationMillis: 600000
```

to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly,
if resourceRetainPolicy is set to `Always`, operator would not delete driver resources
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Please handle this typo-fix PR independently, resourceRetentionPolicy -> resourceRetainPolicy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the unrelated typo fixes and raised #284

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged #284 .

when app ends. Note that this applies only to operator-created resources (driver pod, SparkConf
when app ends. They would be by default kept with the same lifecycle as the App. It's also
possible to configure `resourceRetainDurationMillis` to define the maximal retain duration for
these resources. Note that this applies only to operator-created resources (driver pod, SparkConf
configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination`
and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created
resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class Constants {
+ "observed status for details.";
public static final String APP_CANCELLED_MESSAGE =
"Spark application has been shutdown as requested.";
public static final String APP_EXCEEDED_RETAIN_DURATION_MESSAGE =
"Spark application resources released after exceeding the configured retain duration.";
public static final String DRIVER_UNEXPECTED_REMOVED_MESSAGE =
"Driver removed. This could caused by 'exit' called in driver process with non-zero "
+ "code, involuntary disruptions or unintentional destroy behavior, check "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@

package org.apache.spark.k8s.operator.spec;

import java.time.Instant;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.generator.annotation.Default;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import org.apache.spark.k8s.operator.status.ApplicationState;

/** Toleration settings for a Spark application. */
@Data
@NoArgsConstructor
Expand All @@ -43,4 +48,40 @@ public class ApplicationTolerations {
@Builder.Default protected ExecutorInstanceConfig instanceConfig = new ExecutorInstanceConfig();

@Builder.Default protected ResourceRetainPolicy resourceRetainPolicy = ResourceRetainPolicy.Never;

/**
* Time-to-live in milliseconds for secondary resources of SparkApplications after termination. If
* set to a negative value, secondary resources could be retained with the same lifecycle as the
* application according to the retain policy.
*/
@Default("-1")
@Builder.Default
protected Long resourceRetainDurationMillis = -1L;

/**
* Check whether a terminated application has exceeded the resource retain duration at the
* provided instant
*
* @param lastObservedState last observed state of the application
* @return true if the app has terminated and resource retain duration is configured to a positive
* value and the app is not within retain duration; false otherwise.
*/
public boolean exceedRetainDurationAtInstant(
ApplicationState lastObservedState, Instant instant) {
return lastObservedState != null
&& lastObservedState.getCurrentStateSummary().isTerminated()
&& resourceRetainDurationMillis > 0L
&& Instant.parse(lastObservedState.getLastTransitionTime())
.plusMillis(resourceRetainDurationMillis)
.isBefore(instant);
}

/**
* Indicates whether the reconciler need to perform retain duration check
*
* @return true `resourceRetainDurationMillis` is set to non-negative value
*/
public boolean isRetainDurationEnabled() {
return resourceRetainDurationMillis >= 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -47,16 +46,22 @@
import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.utils.ReconcilerUtils;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils;

/**
* Cleanup all secondary resources when application is deleted, or at the end of each attempt.
* Update Application status to indicate whether another attempt would be made.
*/
@AllArgsConstructor
@NoArgsConstructor
@Slf4j
public class AppCleanUpStep extends AppReconcileStep {
private Supplier<ApplicationState> onDemandCleanUpReason;
private String stateUpdateMessage;

public AppCleanUpStep(Supplier<ApplicationState> onDemandCleanUpReason) {
super();
this.onDemandCleanUpReason = onDemandCleanUpReason;
}

/**
* Cleanup secondary resources for an application if needed and updates application status
Expand Down Expand Up @@ -88,42 +93,37 @@ public ReconcileProgress reconcile(
ApplicationStatus currentStatus = application.getStatus();
ApplicationState currentState = currentStatus.getCurrentState();
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
statusRecorder.removeCachedStatus(application);
return ReconcileProgress.completeAndNoRequeue();
}
String stateMessage = null;
if (isOnDemandCleanup()) {
log.info("Cleaning up application resources on demand");
} else {
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
currentState.getCurrentStateSummary())) {
statusRecorder.removeCachedStatus(application);
return ReconcileProgress.completeAndNoRequeue();
} else if (currentState.getCurrentStateSummary().isStopping()) {
if (retainReleaseResourceForPolicyAndState(
tolerations.getResourceRetainPolicy(), currentState)) {
if (tolerations.getRestartConfig() != null
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
stateMessage =
"Application is configured to restart, resources created in current "
+ "attempt would be force released.";
log.warn(stateMessage);
} else {
ApplicationState terminationState =
new ApplicationState(
ApplicationStateSummary.TerminatedWithoutReleaseResources,
"Application is terminated without releasing resources as configured.");
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
return appendStateAndRequeueAfter(
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
}
if (currentState.getCurrentStateSummary().isTerminated()) {
Optional<ReconcileProgress> terminatedAppProgress =
checkEarlyExitForTerminatedApp(application, statusRecorder);
if (terminatedAppProgress.isPresent()) {
return terminatedAppProgress.get();
}
} else if (isOnDemandCleanup()) {
log.info("Releasing secondary resources for application on demand.");
} else if (currentState.getCurrentStateSummary().isStopping()) {
if (retainReleaseResourceForPolicyAndState(
tolerations.getResourceRetainPolicy(), currentState)) {
if (tolerations.getRestartConfig() != null
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
stateUpdateMessage =
"Application is configured to restart, resources created in current "
+ "attempt would be force released.";
log.warn(stateUpdateMessage);
} else {
ApplicationState terminationState =
new ApplicationState(
ApplicationStateSummary.TerminatedWithoutReleaseResources,
"Application is terminated without releasing resources as configured.");
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
return appendStateAndRequeueAfter(
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
}
} else {
log.debug("Clean up is not expected for app, proceeding to next step.");
return ReconcileProgress.proceed();
}
} else {
log.debug("Clean up is not expected for app, proceeding to next step.");
return ReconcileProgress.proceed();
}

List<HasMetadata> resourcesToRemove = new ArrayList<>();
Expand Down Expand Up @@ -159,8 +159,8 @@ public ReconcileProgress reconcile(
ApplicationStatus updatedStatus;
if (onDemandCleanUpReason != null) {
ApplicationState state = onDemandCleanUpReason.get();
if (StringUtils.isNotEmpty(stateMessage)) {
state.setMessage(stateMessage);
if (StringUtils.isNotEmpty(stateUpdateMessage)) {
state.setMessage(stateUpdateMessage);
}
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
Expand All @@ -171,7 +171,7 @@ public ReconcileProgress reconcile(
currentStatus.terminateOrRestart(
tolerations.getRestartConfig(),
tolerations.getResourceRetainPolicy(),
stateMessage,
stateUpdateMessage,
SparkOperatorConf.TRIM_ATTEMPT_STATE_TRANSITION_HISTORY.getValue());
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
Expand All @@ -184,6 +184,41 @@ public ReconcileProgress reconcile(
}
}

protected Optional<ReconcileProgress> checkEarlyExitForTerminatedApp(
final SparkApplication application, final SparkAppStatusRecorder statusRecorder) {
ApplicationStatus currentStatus = application.getStatus();
ApplicationState currentState = currentStatus.getCurrentState();
ApplicationTolerations tolerations = application.getSpec().getApplicationTolerations();
if (ApplicationStateSummary.ResourceReleased.equals(currentState.getCurrentStateSummary())) {
statusRecorder.removeCachedStatus(application);
return Optional.of(ReconcileProgress.completeAndNoRequeue());
}
if (isOnDemandCleanup()) {
return Optional.empty();
}
if (ApplicationStateSummary.TerminatedWithoutReleaseResources.equals(
currentState.getCurrentStateSummary())) {
if (tolerations.isRetainDurationEnabled()) {
Instant now = Instant.now();
if (tolerations.exceedRetainDurationAtInstant(currentState, now)) {
onDemandCleanUpReason = SparkAppStatusUtils::appExceededRetainDuration;
return Optional.empty();
} else {
Duration nextCheckDuration =
Duration.between(
Instant.now(),
Instant.parse(currentState.getLastTransitionTime())
.plusMillis(tolerations.getResourceRetainDurationMillis()));
return Optional.of(ReconcileProgress.completeAndRequeueAfter(nextCheckDuration));
}
} else {
statusRecorder.removeCachedStatus(application);
return Optional.of(ReconcileProgress.completeAndNoRequeue());
}
}
return Optional.empty();
}

protected boolean isOnDemandCleanup() {
return onDemandCleanUpReason != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public static ApplicationState appCancelled() {
ApplicationStateSummary.ResourceReleased, Constants.APP_CANCELLED_MESSAGE);
}

public static ApplicationState appExceededRetainDuration() {
return new ApplicationState(
ApplicationStateSummary.ResourceReleased, Constants.APP_EXCEEDED_RETAIN_DURATION_MESSAGE);
}

public static boolean hasReachedState(
SparkApplication application, ApplicationState stateToCheck) {
return isValidApplicationStatus(application)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,61 @@ void testCleanupRunningApp() {
assertTrue(deleteControl.isRemoveFinalizer());
}
}

@SuppressWarnings("PMD.UnusedLocalVariable")
@Test
void testCleanupAppTerminatedWithoutReleaseResources() {
try (MockedConstruction<SparkAppContext> mockAppContext =
mockConstruction(
SparkAppContext.class,
(mock, context) -> {
when(mock.getResource()).thenReturn(app);
when(mock.getClient()).thenReturn(mockClient);
when(mock.getDriverPod()).thenReturn(Optional.of(mockDriver));
when(mock.getDriverPodSpec()).thenReturn(mockDriver);
when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
});
MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) {
// delete app
app.setStatus(
app.getStatus()
.appendNewState(
new ApplicationState(
ApplicationStateSummary.TerminatedWithoutReleaseResources, "")));
DeleteControl deleteControl = reconciler.cleanup(app, mockContext);
assertFalse(deleteControl.isRemoveFinalizer());
utils.verify(() -> ReconcilerUtils.deleteResourceIfExists(mockClient, mockDriver, false));
assertEquals(
ApplicationStateSummary.ResourceReleased,
app.getStatus().getCurrentState().getCurrentStateSummary());

// proceed delete for terminated app
deleteControl = reconciler.cleanup(app, mockContext);
assertTrue(deleteControl.isRemoveFinalizer());
}
}

@SuppressWarnings("PMD.UnusedLocalVariable")
@Test
void testCleanupAppTerminatedResourceReleased() {
try (MockedConstruction<SparkAppContext> mockAppContext =
mockConstruction(
SparkAppContext.class,
(mock, context) -> {
when(mock.getResource()).thenReturn(app);
when(mock.getClient()).thenReturn(mockClient);
when(mock.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
when(mock.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
});
MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) {
// delete app
app.setStatus(
app.getStatus()
.appendNewState(new ApplicationState(ApplicationStateSummary.ResourceReleased, "")));
DeleteControl deleteControl = reconciler.cleanup(app, mockContext);
assertTrue(deleteControl.isRemoveFinalizer());
utils.verifyNoInteractions();
}
}
}
Loading
Loading