Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17426,6 +17426,9 @@ spec:
type: integer
restartBackoffMillis:
type: integer
restartCounterResetMillis:
default: -1
type: integer
restartPolicy:
enum:
- Always
Expand Down Expand Up @@ -23735,6 +23738,8 @@ spec:
properties:
id:
type: integer
restartCounter:
type: integer
type: object
stateTransitionHistory:
additionalProperties:
Expand Down Expand Up @@ -24895,6 +24900,8 @@ spec:
properties:
id:
type: integer
restartCounter:
type: integer
type: object
stateTransitionHistory:
additionalProperties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21802,6 +21802,8 @@ spec:
properties:
id:
type: integer
restartCounter:
type: integer
type: object
stateTransitionHistory:
additionalProperties:
Expand Down Expand Up @@ -21842,6 +21844,8 @@ spec:
properties:
id:
type: integer
restartCounter:
type: integer
type: object
stateTransitionHistory:
additionalProperties:
Expand Down
25 changes: 25 additions & 0 deletions docs/spark_custom_resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,31 @@ restartConfig:
restartBackoffMillis: 30000
```

### Restart Counter reset

The restartCounterResetMillis field controls automatic restart counter resets for long-running
application attempts. When set to a non-negative value (in milliseconds), the operator will reset
the restart counter if an application attempt runs successfully for at least the specified duration
before failing. This feature enables user to allow maximal x attempts if an app fails really
fast (which could indicate some underlying issue other than the app itself) while allowing
indefinite restarts when the app can survive given threshold.

For example, setting

```yaml

restartConfig:
## 1hr
restartCounterResetMillis: 3600000
maxRestartAttempts: 3

```

means the application can fail and restart up to 3 times, but if any attempt runs for more than
1 hour, the counter resets to zero, allowing another 3 restart attempts.

The default value is -1, which disables automatic counter resets.

### Timeouts

It's possible to configure applications to be proactively terminated and resubmitted in particular
Expand Down
27 changes: 25 additions & 2 deletions spark-operator-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,31 @@ tasks.register("assertGeneratedCRDMatchesHelmChart") {
"${stagedCRDFileBase}sparkclusters.spark.apache.org-v1.yaml"
].execute().text.trim()
if (generatedAppCRD != stagedAppCRD || generatedClusterCRD != stagedClusterCRD) {
throw new GradleException("Generated CRD yaml does not match the staged version in " +
"Helm Chart, please keep the chart updated.")
def errorMessage = new StringBuilder("Generated CRD yaml does not match the staged " +
"version in Helm Chart, please keep the chart updated.\n\n")

if (generatedAppCRD != stagedAppCRD) {
errorMessage.append("=== SparkApplication CRD Differences ===\n")
def appDiff = ["bash", "-c",
"diff -u <(echo '${generatedAppCRD.replace("'", "'\\''")}' " +
"| yq -P 'sort_keys(..)') <(echo '${stagedAppCRD.replace("'", "'\\''")}' " +
"| yq -P 'sort_keys(..)')"]
.execute().text
errorMessage.append(appDiff ?: "Unable to generate diff\n")
errorMessage.append("\n")
}

if (generatedClusterCRD != stagedClusterCRD) {
errorMessage.append("=== SparkCluster CRD Differences ===\n")
def clusterDiff = ["bash", "-c",
"diff -u <(echo '${generatedClusterCRD.replace("'", "'\\''")}' " +
"| yq -P 'sort_keys(..)') <(echo '${stagedClusterCRD.replace("'", "'\\''")}' " +
"| yq -P 'sort_keys(..)')"]
.execute().text
errorMessage.append(clusterDiff ?: "Unable to generate diff\n")
}

throw new GradleException(errorMessage.toString())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.generator.annotation.Default;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -37,4 +38,8 @@ public class RestartConfig {
@Builder.Default protected RestartPolicy restartPolicy = RestartPolicy.Never;
@Builder.Default protected Long maxRestartAttempts = 3L;
@Builder.Default protected Long restartBackoffMillis = 30000L;

@Default("-1")
@Builder.Default
protected Long restartCounterResetMillis = -1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import static org.apache.spark.k8s.operator.Constants.EXCEED_MAX_RETRY_ATTEMPT_MESSAGE;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

Expand Down Expand Up @@ -115,7 +119,18 @@ public ApplicationStatus terminateOrRestart(
currentAttemptSummary);
}

if (currentAttemptSummary.getAttemptInfo().getId() >= restartConfig.getMaxRestartAttempts()) {
boolean resetRestartCounter = false;
if (restartConfig.getRestartCounterResetMillis() >= 0L) {
resetRestartCounter =
calculateCurrentAttemptDuration()
.compareTo(Duration.ofMillis(restartConfig.getRestartCounterResetMillis()))
>= 0;
}

long effectiveAttemptId =
resetRestartCounter ? 0L : currentAttemptSummary.getAttemptInfo().getRestartCounter();

if (effectiveAttemptId >= restartConfig.getMaxRestartAttempts()) {
String stateMessage =
String.format(EXCEED_MAX_RETRY_ATTEMPT_MESSAGE, restartConfig.getMaxRestartAttempts());
if (stateMessageOverride != null && !stateMessageOverride.isEmpty()) {
Expand All @@ -138,7 +153,9 @@ public ApplicationStatus terminateOrRestart(
currentAttemptSummary);
}

AttemptInfo nextAttemptInfo = currentAttemptSummary.getAttemptInfo().createNextAttemptInfo();
AttemptInfo nextAttemptInfo =
currentAttemptSummary.getAttemptInfo().createNextAttemptInfo(resetRestartCounter);

ApplicationAttemptSummary nextAttemptSummary = new ApplicationAttemptSummary(nextAttemptInfo);
ApplicationState state =
new ApplicationState(ApplicationStateSummary.ScheduledToRestart, stateMessageOverride);
Expand All @@ -163,6 +180,44 @@ public ApplicationStatus terminateOrRestart(
}
}

/**
* Finds the first state of the current application attempt.
*
* <p>This method traverses the state transition history in reverse order to find the most recent
* initializing state (e.g., Submitted or ScheduledToRestart), which marks the beginning of the
* current attempt. If no initializing state is found, it returns the first entry in the history.
*
* @return The ApplicationState representing the start of the current attempt.
*/
protected ApplicationState findFirstStateOfCurrentAttempt() {
List<Map.Entry<Long, ApplicationState>> entries =
new ArrayList<>(stateTransitionHistory.entrySet());
for (int k = entries.size() - 1; k >= 0; k--) {
Map.Entry<Long, ApplicationState> entry = entries.get(k);
if (entry.getValue().getCurrentStateSummary().isInitializing()) {
return entry.getValue();
}
}
return entries.get(0).getValue();
}

/**
* Calculates the duration of the current application attempt.
*
* <p>The duration is calculated as the time between the first state of the current attempt (as
* determined by {@link #findFirstStateOfCurrentAttempt()}) and the current state's last
* transition time. This is particularly useful for determining whether the restart counter should
* be reset based on the configured {@code restartCounterResetMillis}.
*
* @return A Duration representing the time elapsed since the start of the current attempt.
*/
protected Duration calculateCurrentAttemptDuration() {
ApplicationState firstStateOfCurrentAttempt = findFirstStateOfCurrentAttempt();
return Duration.between(
Instant.parse(firstStateOfCurrentAttempt.getLastTransitionTime()),
Instant.parse(currentState.getLastTransitionTime()));
}

/**
* Creates an ApplicationState indicating that the application is terminated without releasing
* resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/** Information about an attempt. */
Expand All @@ -38,13 +39,14 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class AttemptInfo {
@Getter @Builder.Default protected final long id = 0L;
@Getter @Setter protected long restartCounter;

/**
* Creates a new AttemptInfo object representing the next attempt.
*
* @return A new AttemptInfo with an incremented ID.
*/
public AttemptInfo createNextAttemptInfo() {
return new AttemptInfo(id + 1L);
public AttemptInfo createNextAttemptInfo(boolean resetRestartCounter) {
return new AttemptInfo(id + 1L, resetRestartCounter ? 1L : restartCounter + 1);
}
}
Loading
Loading