Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions executor/pkg/controller/taskaction_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,40 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// Map transition phase to TaskAction conditions
phaseInfo := transition.Info()

// In-place task restart: when a recoverable failure occurs, restart the pod within the
// same TaskAction rather than relying on the runs service to create a new TaskAction.
var restartAttempts uint32
if !cacheShortCircuited && phaseInfo.Phase() == pluginsCore.PhaseRetryableFailure {
currentAttempts := observedAttempts(taskAction)
maxAttempts := tCtx.TaskExecutionMetadata().GetMaxAttempts()

if currentAttempts < maxAttempts {
// Abort (delete) the current pod before incrementing attempts.
// tCtx was built with the current attempt number so Abort targets the right pod.
if abortErr := p.Abort(ctx, tCtx); abortErr != nil {
logger.Error(abortErr, "failed to abort pod during in-place restart")
}
// Track the new attempt count; applied to Status.Attempts after the stateMgr block.
restartAttempts = currentAttempts + 1
// Override the transition to Queued so the TaskAction stays non-terminal.
transition = pluginsCore.DoTransition(pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("restarting task (attempt %d/%d)", currentAttempts+1, maxAttempts)))
phaseInfo = transition.Info()
} else {
// All retries exhausted — convert to a permanent (terminal) failure.
execErr := phaseInfo.Err()
if execErr == nil {
execErr = &core.ExecutionError{
Kind: core.ExecutionError_USER,
Code: "MaxRetriesExceeded",
Message: fmt.Sprintf("task failed after %d attempt(s)", currentAttempts),
}
}
transition = pluginsCore.DoTransition(pluginsCore.PhaseInfoFailed(pluginsCore.PhasePermanentFailure, execErr, phaseInfo.Info()))
phaseInfo = transition.Info()
}
}
mapPhaseToConditions(taskAction, phaseInfo)

// Update StateJSON for observability
Expand All @@ -229,6 +263,14 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
taskAction.Status.PluginStateVersion = newVersion
}

// If an in-place restart was triggered, increment attempts and clear plugin state so the
// next reconcile starts fresh with PluginPhaseNotStarted and creates a new pod.
if restartAttempts > 0 {
taskAction.Status.Attempts = restartAttempts
taskAction.Status.PluginState = nil
taskAction.Status.PluginStateVersion = 0
}

taskAction.Status.PluginPhase = phaseInfo.Phase().String()
taskAction.Status.PluginPhaseVersion = phaseInfo.Version()
taskAction.Status.Attempts = observedAttempts(taskAction)
Expand Down
Loading