Skip to content

Commit caa5ffc

Browse files
committed
expand preemption retries to a general retry mechanism
Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>
1 parent 485c57c commit caa5ffc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3278
-754
lines changed

_local/scheduler/config.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,25 @@ postgres:
2222
scheduling:
2323
indexedNodeLabels:
2424
- "kubernetes.io/hostname"
25+
retryPolicy:
26+
enabled: true
27+
globalMaxRetries: 5
28+
default:
29+
retryLimit: 3
30+
rules: [] # No rules = failures fail immediately
31+
policies:
32+
# Combined policy for retry tests (used by e2e-test-queue-retry)
33+
retry-all:
34+
retryLimit: 3
35+
rules:
36+
- action: Retry
37+
onExitCodes:
38+
operator: In
39+
values: [42]
40+
- action: Retry
41+
onConditions: [OOMKilled]
42+
- action: Retry
43+
onTerminationMessage:
44+
pattern: ".*TRANSIENT.*"
2545
metrics:
2646
port: 9001

cmd/armadactl/cmd/queue.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,19 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
6363
return fmt.Errorf("error converting queue labels to map: %s", err)
6464
}
6565

66+
retryPolicy, err := cmd.Flags().GetString("retry-policy")
67+
if err != nil {
68+
return fmt.Errorf("error reading retry-policy: %s", err)
69+
}
70+
6671
newQueue, err := queue.NewQueue(&api.Queue{
6772
Name: name,
6873
PriorityFactor: priorityFactor,
6974
UserOwners: owners,
7075
GroupOwners: groups,
7176
Cordoned: cordoned,
7277
Labels: labelsAsMap,
78+
RetryPolicy: retryPolicy,
7379
})
7480
if err != nil {
7581
return fmt.Errorf("invalid queue data: %s", err)
@@ -83,6 +89,7 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
8389
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
8490
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
8591
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
92+
cmd.Flags().String("retry-policy", "", "Name of the retry policy from scheduler config to use for this queue. Defaults to empty (uses default policy).")
8693
return cmd
8794
}
8895

@@ -230,13 +237,19 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
230237
return fmt.Errorf("error converting queue labels to map: %s", err)
231238
}
232239

240+
retryPolicy, err := cmd.Flags().GetString("retry-policy")
241+
if err != nil {
242+
return fmt.Errorf("error reading retry-policy: %s", err)
243+
}
244+
233245
newQueue, err := queue.NewQueue(&api.Queue{
234246
Name: name,
235247
PriorityFactor: priorityFactor,
236248
UserOwners: owners,
237249
GroupOwners: groups,
238250
Cordoned: cordoned,
239251
Labels: labelsAsMap,
252+
RetryPolicy: retryPolicy,
240253
})
241254
if err != nil {
242255
return fmt.Errorf("invalid queue data: %s", err)
@@ -251,5 +264,6 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
251264
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
252265
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
253266
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
267+
cmd.Flags().String("retry-policy", "", "Name of the retry policy from scheduler config to use for this queue. Defaults to empty (uses default policy).")
254268
return cmd
255269
}

config/scheduler/config.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,25 @@ scheduling:
107107
maximumPerQueueSchedulingBurst: 1000
108108
maxJobSchedulingContextsPerExecutor: 10000
109109
maxRetries: 3
110+
# Retry policy provides fine-grained control over job retry behavior.
111+
# When enabled, it takes precedence over maxRetries.
112+
# Default behavior: all failures fail immediately (no retry).
113+
# Add Retry rules to opt-in to retry behavior for specific conditions.
114+
retryPolicy:
115+
enabled: false # Set to true to enable retry policy
116+
globalMaxRetries: 20 # Hard cap on total retries across all policies
117+
default:
118+
retryLimit: 3
119+
rules: [] # No rules = all failures fail immediately
120+
# Named policies can be referenced by queue's retry_policy field
121+
# policies:
122+
# ml-training:
123+
# retryLimit: 10
124+
# rules:
125+
# - action: Retry
126+
# onExitCodes:
127+
# operator: In
128+
# values: [137, 143]
110129
dominantResourceFairnessResourcesToConsider:
111130
- "cpu"
112131
- "memory"

developer/config/insecure-armada.yaml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,25 @@ auth:
1313
watch_all_events: ["everyone"]
1414
execute_jobs: ["everyone"]
1515
update_executor_settings: ["everyone"]
16+
17+
# Scheduler-specific config (ignored by other components)
18+
scheduling:
19+
retryPolicy:
20+
enabled: true
21+
globalMaxRetries: 5
22+
default:
23+
retryLimit: 3
24+
rules: [] # No rules = failures fail immediately
25+
policies:
26+
retry-all:
27+
retryLimit: 3
28+
rules:
29+
- action: Retry
30+
onExitCodes:
31+
operator: In
32+
values: [42]
33+
- action: Retry
34+
onConditions: [OOMKilled]
35+
- action: Retry
36+
onTerminationMessage:
37+
pattern: ".*TRANSIENT.*"

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ require (
7272
github.com/redis/go-redis/v9 v9.17.3
7373
github.com/rs/zerolog v1.34.0
7474
github.com/segmentio/fasthash v1.0.3
75+
github.com/sirupsen/logrus v1.9.4
7576
github.com/xitongsys/parquet-go v1.6.2
7677
github.com/zalando/go-keyring v0.2.6
7778
go.uber.org/atomic v1.11.0
@@ -218,7 +219,6 @@ require (
218219
github.com/sagikazarmark/locafero v0.12.0 // indirect
219220
github.com/sergi/go-diff v1.4.0 // indirect
220221
github.com/shopspring/decimal v1.4.0 // indirect
221-
github.com/sirupsen/logrus v1.9.4 // indirect
222222
github.com/skeema/knownhosts v1.3.2 // indirect
223223
github.com/spaolacci/murmur3 v1.1.0 // indirect
224224
github.com/spf13/afero v1.15.0 // indirect

internal/armadactl/watch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (a *App) Watch(queue string, jobSetId string, raw bool, exitOnInactive bool
3737
if jobInfo != nil && jobInfo.ClusterId != "" && jobInfo.Job != nil {
3838
fmt.Fprintf(
3939
a.Out, "Found no logs for job; try '%s --tail=50\n",
40-
client.GetKubectlCommand(jobInfo.ClusterId, jobInfo.Job.Namespace, event2.JobId, int(event2.PodNumber), "logs"),
40+
client.GetKubectlCommand(jobInfo.ClusterId, jobInfo.Job.Namespace, event2.JobId, int(event2.PodNumber), nil, "logs"),
4141
)
4242
}
4343
default:

internal/binoculars/server/binoculars.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package server
22

33
import (
44
"context"
5-
"strconv"
65

76
"github.com/gogo/protobuf/types"
87

@@ -28,10 +27,17 @@ func NewBinocularsServer(logService service.LogService, cordonService service.Co
2827
func (b *BinocularsServer) Logs(ctx context.Context, request *binoculars.LogRequest) (*binoculars.LogResponse, error) {
2928
principal := auth.GetPrincipal(ctx)
3029

30+
var runIndex *int
31+
if request.IncludeRunIndex {
32+
ri := int(request.RunIndex)
33+
runIndex = &ri
34+
}
35+
podName := common.BuildPodName(request.JobId, int(request.PodNumber), runIndex)
36+
3137
logLines, err := b.logService.GetLogs(armadacontext.FromGrpcCtx(ctx), &service.LogParams{
3238
Principal: principal,
3339
Namespace: request.PodNamespace,
34-
PodName: common.PodNamePrefix + request.JobId + "-" + strconv.Itoa(int(request.PodNumber)),
40+
PodName: podName,
3541
SinceTime: request.SinceTime,
3642
LogOptions: request.LogOptions,
3743
})

internal/common/constants.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,21 @@
11
package common
22

3+
import "fmt"
4+
35
const PodNamePrefix string = "armada-"
46

7+
// PodName returns the legacy pod name format: armada-<jobId>-0
8+
// Deprecated: Use BuildPodName for new code.
59
func PodName(jobId string) string {
610
return PodNamePrefix + jobId + "-0"
711
}
12+
13+
// BuildPodName constructs a pod name.
14+
// If runIndex is nil, returns legacy format: armada-<jobId>-<podIndex>
15+
// If runIndex is non-nil, returns new format: armada-<jobId>-<podIndex>-<runIndex>
16+
func BuildPodName(jobId string, podIndex int, runIndex *int) string {
17+
if runIndex != nil {
18+
return fmt.Sprintf("%s%s-%d-%d", PodNamePrefix, jobId, podIndex, *runIndex)
19+
}
20+
return fmt.Sprintf("%s%s-%d", PodNamePrefix, jobId, podIndex)
21+
}

internal/common/database/upsert.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,10 @@ func Upsert[T any](ctx *armadacontext.Context, tx pgx.Tx, tableName string, reco
130130
// Move those rows into the main table, using ON CONFLICT rules to over-write existing rows.
131131
var b strings.Builder
132132

133-
fmt.Fprintf(&b, "INSERT INTO %s SELECT %s from %s ", tableName, strings.Join(writableNames, ","), tempTableName)
133+
// Explicitly specify target columns to handle GENERATED ALWAYS columns correctly.
134+
// Without explicit columns, INSERT matches values by position, which fails when
135+
// columns are excluded (e.g., GENERATED columns) and remaining values shift positions.
136+
fmt.Fprintf(&b, "INSERT INTO %s (%s) SELECT %s from %s ", tableName, strings.Join(writableNames, ","), strings.Join(writableNames, ","), tempTableName)
134137
fmt.Fprintf(&b, "ON CONFLICT (%s) DO UPDATE SET ", writableNames[0])
135138
for i, name := range writableNames {
136139
fmt.Fprintf(&b, "%s = EXCLUDED.%s", name, name)

internal/executor/job/submit.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"fmt"
55
"regexp"
66
"sync"
7+
"time"
78

89
"github.com/pkg/errors"
910
v1 "k8s.io/api/core/v1"
11+
networking "k8s.io/api/networking/v1"
1012
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
1113
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1214

@@ -19,6 +21,15 @@ import (
1921
util2 "github.com/armadaproject/armada/internal/executor/util"
2022
)
2123

24+
// When a job retries, the previous pod's deletion triggers async garbage collection
25+
// of owned resources (services/ingresses via OwnerReference). Creating new resources
26+
// may fail with AlreadyExists if GC hasn't completed. We use delete-then-create
27+
// with retries to handle this race condition.
28+
const (
29+
serviceIngressMaxRetries = 3
30+
serviceIngressRetryDelay = 100 * time.Millisecond
31+
)
32+
2233
type Submitter interface {
2334
SubmitJobs(jobsToSubmit []*SubmitJob) []*FailedSubmissionDetails
2435
}
@@ -130,15 +141,15 @@ func (submitService *SubmitService) submitPod(job *SubmitJob) (*v1.Pod, error) {
130141

131142
for _, service := range job.Services {
132143
service.ObjectMeta.OwnerReferences = []metav1.OwnerReference{util2.CreateOwnerReference(submittedPod)}
133-
_, err = submitService.clusterContext.SubmitService(service)
144+
err = submitService.createServiceWithRetry(service)
134145
if err != nil {
135146
return pod, err
136147
}
137148
}
138149

139150
for _, ingress := range job.Ingresses {
140151
ingress.ObjectMeta.OwnerReferences = []metav1.OwnerReference{util2.CreateOwnerReference(submittedPod)}
141-
_, err = submitService.clusterContext.SubmitIngress(ingress)
152+
err = submitService.createIngressWithRetry(ingress)
142153
if err != nil {
143154
return pod, err
144155
}
@@ -200,3 +211,59 @@ func (submitService *SubmitService) isRecoverable(err error) bool {
200211

201212
return false
202213
}
214+
215+
// createServiceWithRetry creates a service with retry handling for GC race conditions.
216+
func (submitService *SubmitService) createServiceWithRetry(service *v1.Service) error {
217+
return submitService.createWithRetry(
218+
"service",
219+
service.Namespace,
220+
service.Name,
221+
func() error { return submitService.clusterContext.DeleteService(service) },
222+
func() error { _, err := submitService.clusterContext.SubmitService(service); return err },
223+
)
224+
}
225+
226+
// createIngressWithRetry creates an ingress with retry handling for GC race conditions.
227+
func (submitService *SubmitService) createIngressWithRetry(ingress *networking.Ingress) error {
228+
return submitService.createWithRetry(
229+
"ingress",
230+
ingress.Namespace,
231+
ingress.Name,
232+
func() error { return submitService.clusterContext.DeleteIngress(ingress) },
233+
func() error { _, err := submitService.clusterContext.SubmitIngress(ingress); return err },
234+
)
235+
}
236+
237+
// createWithRetry implements a delete-then-create pattern with retries to handle
238+
// race conditions when garbage collection is still running from a previous run.
239+
func (submitService *SubmitService) createWithRetry(
240+
resourceType string,
241+
namespace string,
242+
name string,
243+
deleteFn func() error,
244+
createFn func() error,
245+
) error {
246+
for attempt := 0; attempt < serviceIngressMaxRetries; attempt++ {
247+
if err := deleteFn(); err != nil {
248+
return errors.Wrapf(err, "failed to delete existing %s %s/%s", resourceType, namespace, name)
249+
}
250+
251+
err := createFn()
252+
if err == nil {
253+
return nil
254+
}
255+
256+
if !k8s_errors.IsAlreadyExists(err) {
257+
return err
258+
}
259+
260+
if attempt < serviceIngressMaxRetries-1 {
261+
log.Warnf("%s %s/%s already exists after delete (attempt %d/%d), retrying after %v",
262+
resourceType, namespace, name, attempt+1, serviceIngressMaxRetries, serviceIngressRetryDelay)
263+
time.Sleep(serviceIngressRetryDelay)
264+
}
265+
}
266+
267+
return errors.Errorf("failed to create %s %s/%s: still exists after %d delete attempts",
268+
resourceType, namespace, name, serviceIngressMaxRetries)
269+
}

0 commit comments

Comments
 (0)