Skip to content

Commit 517b0f5

Browse files
committed
expand preemption retries to a general retry mechanism
Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>
1 parent 485c57c commit 517b0f5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2799
-523
lines changed

_local/scheduler/config.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,37 @@ postgres:
2222
scheduling:
2323
indexedNodeLabels:
2424
- "kubernetes.io/hostname"
25+
retryPolicy:
26+
enabled: true
27+
globalMaxRetries: 5
28+
# Default policy: no rules = everything fails immediately
29+
default:
30+
retryLimit: 3
31+
rules: []
32+
policies:
33+
# ML training: retry OOMKilled only
34+
ml-training:
35+
retryLimit: 5
36+
rules:
37+
- action: Retry
38+
onExitCodes:
39+
operator: In
40+
values: [137]
41+
# Retry-all: traditional batch behavior (retry everything except success)
42+
retry-all:
43+
retryLimit: 5
44+
rules:
45+
- action: Retry
46+
onExitCodes:
47+
operator: NotIn
48+
values: [0]
49+
# High-retry: for testing global limit caps
50+
high-retry:
51+
retryLimit: 10
52+
rules:
53+
- action: Retry
54+
onExitCodes:
55+
operator: NotIn
56+
values: [0]
2557
metrics:
2658
port: 9001

cmd/armadactl/cmd/queue.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,19 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
6363
return fmt.Errorf("error converting queue labels to map: %s", err)
6464
}
6565

66+
retryPolicy, err := cmd.Flags().GetString("retry-policy")
67+
if err != nil {
68+
return fmt.Errorf("error reading retry-policy: %s", err)
69+
}
70+
6671
newQueue, err := queue.NewQueue(&api.Queue{
6772
Name: name,
6873
PriorityFactor: priorityFactor,
6974
UserOwners: owners,
7075
GroupOwners: groups,
7176
Cordoned: cordoned,
7277
Labels: labelsAsMap,
78+
RetryPolicy: retryPolicy,
7379
})
7480
if err != nil {
7581
return fmt.Errorf("invalid queue data: %s", err)
@@ -83,6 +89,7 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
8389
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
8490
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
8591
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
92+
cmd.Flags().String("retry-policy", "", "Name of the retry policy from scheduler config to use for this queue. Defaults to empty (uses default policy).")
8693
return cmd
8794
}
8895

@@ -230,13 +237,19 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
230237
return fmt.Errorf("error converting queue labels to map: %s", err)
231238
}
232239

240+
retryPolicy, err := cmd.Flags().GetString("retry-policy")
241+
if err != nil {
242+
return fmt.Errorf("error reading retry-policy: %s", err)
243+
}
244+
233245
newQueue, err := queue.NewQueue(&api.Queue{
234246
Name: name,
235247
PriorityFactor: priorityFactor,
236248
UserOwners: owners,
237249
GroupOwners: groups,
238250
Cordoned: cordoned,
239251
Labels: labelsAsMap,
252+
RetryPolicy: retryPolicy,
240253
})
241254
if err != nil {
242255
return fmt.Errorf("invalid queue data: %s", err)
@@ -251,5 +264,6 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
251264
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
252265
cmd.Flags().Bool("cordon", false, "Used to pause scheduling on specified queue. Defaults to false.")
253266
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
267+
cmd.Flags().String("retry-policy", "", "Name of the retry policy from scheduler config to use for this queue. Defaults to empty (uses default policy).")
254268
return cmd
255269
}

config/scheduler/config.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,25 @@ scheduling:
107107
maximumPerQueueSchedulingBurst: 1000
108108
maxJobSchedulingContextsPerExecutor: 10000
109109
maxRetries: 3
110+
# Retry policy provides fine-grained control over job retry behavior.
111+
# When enabled, it takes precedence over maxRetries.
112+
# Default behavior: all failures fail immediately (no retry).
113+
# Add Retry rules to opt-in to retry behavior for specific conditions.
114+
retryPolicy:
115+
enabled: false # Set to true to enable retry policy
116+
globalMaxRetries: 20 # Hard cap on total retries across all policies
117+
default:
118+
retryLimit: 3
119+
rules: [] # No rules = all failures fail immediately
120+
# Named policies can be referenced by queue's retry_policy field
121+
# policies:
122+
# ml-training:
123+
# retryLimit: 10
124+
# rules:
125+
# - action: Retry
126+
# onExitCodes:
127+
# operator: In
128+
# values: [137, 143]
110129
dominantResourceFairnessResourcesToConsider:
111130
- "cpu"
112131
- "memory"

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ require (
7272
github.com/redis/go-redis/v9 v9.17.3
7373
github.com/rs/zerolog v1.34.0
7474
github.com/segmentio/fasthash v1.0.3
75+
github.com/sirupsen/logrus v1.9.4
7576
github.com/xitongsys/parquet-go v1.6.2
7677
github.com/zalando/go-keyring v0.2.6
7778
go.uber.org/atomic v1.11.0
@@ -218,7 +219,6 @@ require (
218219
github.com/sagikazarmark/locafero v0.12.0 // indirect
219220
github.com/sergi/go-diff v1.4.0 // indirect
220221
github.com/shopspring/decimal v1.4.0 // indirect
221-
github.com/sirupsen/logrus v1.9.4 // indirect
222222
github.com/skeema/knownhosts v1.3.2 // indirect
223223
github.com/spaolacci/murmur3 v1.1.0 // indirect
224224
github.com/spf13/afero v1.15.0 // indirect

internal/common/database/upsert.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,10 @@ func Upsert[T any](ctx *armadacontext.Context, tx pgx.Tx, tableName string, reco
130130
// Move those rows into the main table, using ON CONFLICT rules to over-write existing rows.
131131
var b strings.Builder
132132

133-
fmt.Fprintf(&b, "INSERT INTO %s SELECT %s from %s ", tableName, strings.Join(writableNames, ","), tempTableName)
133+
// Explicitly specify target columns to handle GENERATED ALWAYS columns correctly.
134+
// Without explicit columns, INSERT matches values by position, which fails when
135+
// columns are excluded (e.g., GENERATED columns) and remaining values shift positions.
136+
fmt.Fprintf(&b, "INSERT INTO %s (%s) SELECT %s from %s ", tableName, strings.Join(writableNames, ","), strings.Join(writableNames, ","), tempTableName)
134137
fmt.Fprintf(&b, "ON CONFLICT (%s) DO UPDATE SET ", writableNames[0])
135138
for i, name := range writableNames {
136139
fmt.Fprintf(&b, "%s = EXCLUDED.%s", name, name)

internal/executor/job/submit.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"fmt"
55
"regexp"
66
"sync"
7+
"time"
78

89
"github.com/pkg/errors"
910
v1 "k8s.io/api/core/v1"
11+
networking "k8s.io/api/networking/v1"
1012
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
1113
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1214

@@ -19,6 +21,15 @@ import (
1921
util2 "github.com/armadaproject/armada/internal/executor/util"
2022
)
2123

24+
// When a job retries, the previous pod's deletion triggers async garbage collection
25+
// of owned resources (services/ingresses via OwnerReference). Creating new resources
26+
// may fail with AlreadyExists if GC hasn't completed. We use delete-then-create
27+
// with retries to handle this race condition.
28+
const (
29+
serviceIngressMaxRetries = 3
30+
serviceIngressRetryDelay = 100 * time.Millisecond
31+
)
32+
2233
type Submitter interface {
2334
SubmitJobs(jobsToSubmit []*SubmitJob) []*FailedSubmissionDetails
2435
}
@@ -130,15 +141,15 @@ func (submitService *SubmitService) submitPod(job *SubmitJob) (*v1.Pod, error) {
130141

131142
for _, service := range job.Services {
132143
service.ObjectMeta.OwnerReferences = []metav1.OwnerReference{util2.CreateOwnerReference(submittedPod)}
133-
_, err = submitService.clusterContext.SubmitService(service)
144+
err = submitService.createServiceWithRetry(service)
134145
if err != nil {
135146
return pod, err
136147
}
137148
}
138149

139150
for _, ingress := range job.Ingresses {
140151
ingress.ObjectMeta.OwnerReferences = []metav1.OwnerReference{util2.CreateOwnerReference(submittedPod)}
141-
_, err = submitService.clusterContext.SubmitIngress(ingress)
152+
err = submitService.createIngressWithRetry(ingress)
142153
if err != nil {
143154
return pod, err
144155
}
@@ -200,3 +211,59 @@ func (submitService *SubmitService) isRecoverable(err error) bool {
200211

201212
return false
202213
}
214+
215+
// createServiceWithRetry creates a service with retry handling for GC race conditions.
216+
func (submitService *SubmitService) createServiceWithRetry(service *v1.Service) error {
217+
return submitService.createWithRetry(
218+
"service",
219+
service.Namespace,
220+
service.Name,
221+
func() error { return submitService.clusterContext.DeleteService(service) },
222+
func() error { _, err := submitService.clusterContext.SubmitService(service); return err },
223+
)
224+
}
225+
226+
// createIngressWithRetry creates an ingress with retry handling for GC race conditions.
227+
func (submitService *SubmitService) createIngressWithRetry(ingress *networking.Ingress) error {
228+
return submitService.createWithRetry(
229+
"ingress",
230+
ingress.Namespace,
231+
ingress.Name,
232+
func() error { return submitService.clusterContext.DeleteIngress(ingress) },
233+
func() error { _, err := submitService.clusterContext.SubmitIngress(ingress); return err },
234+
)
235+
}
236+
237+
// createWithRetry implements a delete-then-create pattern with retries to handle
238+
// race conditions when garbage collection is still running from a previous run.
239+
func (submitService *SubmitService) createWithRetry(
240+
resourceType string,
241+
namespace string,
242+
name string,
243+
deleteFn func() error,
244+
createFn func() error,
245+
) error {
246+
for attempt := 0; attempt < serviceIngressMaxRetries; attempt++ {
247+
if err := deleteFn(); err != nil {
248+
return errors.Wrapf(err, "failed to delete existing %s %s/%s", resourceType, namespace, name)
249+
}
250+
251+
err := createFn()
252+
if err == nil {
253+
return nil
254+
}
255+
256+
if !k8s_errors.IsAlreadyExists(err) {
257+
return err
258+
}
259+
260+
if attempt < serviceIngressMaxRetries-1 {
261+
log.Warnf("%s %s/%s already exists after delete (attempt %d/%d), retrying after %v",
262+
resourceType, namespace, name, attempt+1, serviceIngressMaxRetries, serviceIngressRetryDelay)
263+
time.Sleep(serviceIngressRetryDelay)
264+
}
265+
}
266+
267+
return errors.Errorf("failed to create %s %s/%s: still exists after %d delete attempts",
268+
resourceType, namespace, name, serviceIngressMaxRetries)
269+
}

0 commit comments

Comments
 (0)