Skip to content

Commit 485c57c

Browse files
committed
resolve conflicts and fix minor issues with native preemption retries
Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>
1 parent 4d789e6 commit 485c57c

File tree

17 files changed

+525
-111
lines changed

17 files changed

+525
-111
lines changed

config/scheduler/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ scheduling:
124124
executorTimeout: "10m"
125125
maxUnacknowledgedJobsPerExecutor: 2500
126126
executorUpdateFrequency: "60s"
127+
preemptionRetry:
128+
enabled: true
127129
experimentalIndicativePricing:
128130
basePrice: 100.0
129131
basePriority: 500.0

config/server/config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ submission:
6666
defaultActiveDeadlineByResourceRequest:
6767
nvidia.com/gpu: "336h" # 14 days.
6868
assertInitContainersRequestFractionalCpu: true
69+
preemptionRetry:
70+
enabled: true
71+
maxRetryCount: 10
6972
pulsar:
7073
URL: "pulsar://pulsar:6650"
7174
jobsetEventsTopic: "events"

internal/common/constants/constants.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@ const (
3434
PoolAnnotation = "armadaproject.io/pool"
3535
ReservationTaintKey = "armadaproject.io/reservation"
3636

37-
PreemptionRetryCountMaxAnnotation = "armadaproject.io/preemptionRetryCountMax"
38-
PreemptionRetryEnabledAnnotation = "armadaproject.io/preemptionRetryEnabled"
37+
// PreemptionRetryEnabledAnnotation enables or disables preemption retries for a job.
38+
// When set to "true", the job will be rescheduled if preempted, up to the max retry count.
39+
PreemptionRetryEnabledAnnotation = "armadaproject.io/preemptionRetryEnabled"
40+
// PreemptionMaxRetryCountAnnotation specifies the maximum number of times a job can be
41+
// rescheduled after preemption. Must be a positive integer.
42+
PreemptionMaxRetryCountAnnotation = "armadaproject.io/preemptionMaxRetryCount"
3943
)
4044

4145
var schedulingAnnotations = map[string]bool{
@@ -44,6 +48,8 @@ var schedulingAnnotations = map[string]bool{
4448
GangNodeUniformityLabelAnnotation: true,
4549
FailFastAnnotation: true,
4650
JobPriceBand: true,
51+
PreemptionRetryEnabledAnnotation: true,
52+
PreemptionMaxRetryCountAnnotation: true,
4753
}
4854

4955
func IsSchedulingAnnotation(annotation string) bool {

internal/common/preemption/utils.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ import (
66
"github.com/armadaproject/armada/internal/common/constants"
77
)
88

9+
// RetryConfig contains preemption retry settings.
10+
// Used by both server (submission validation) and scheduler (runtime behavior).
11+
type RetryConfig struct {
12+
// Enabled toggles preemption retries globally.
13+
// When false, jobs with preemption retry annotations are rejected at submission.
14+
Enabled bool
15+
// MaxRetryCount is the maximum retry count users can request via annotations.
16+
// Used by server to validate job submissions. If nil, no upper bound is enforced.
17+
MaxRetryCount *uint
18+
// DefaultRetryCount is the retry count applied when jobs don't specify via annotation.
19+
// Used by scheduler at runtime. If nil, defaults to 0 (no retries unless job specifies).
20+
DefaultRetryCount *uint
21+
}
22+
923
// AreRetriesEnabled determines whether preemption retries are enabled at the job level. Also returns whether the
1024
// annotation was set.
1125
func AreRetriesEnabled(annotations map[string]string) (enabled bool, annotationSet bool) {
@@ -23,7 +37,7 @@ func AreRetriesEnabled(annotations map[string]string) (enabled bool, annotationS
2337

2438
// GetMaxRetryCount gets the max preemption retry count at a job level. Also returns whether the annotation was set.
2539
func GetMaxRetryCount(annotations map[string]string) (maxRetryCount uint, annotationSet bool) {
26-
maxRetryCountStr, exists := annotations[constants.PreemptionRetryCountMaxAnnotation]
40+
maxRetryCountStr, exists := annotations[constants.PreemptionMaxRetryCountAnnotation]
2741
if !exists {
2842
return 0, false
2943
}

internal/scheduler/configuration/configuration.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
authconfig "github.com/armadaproject/armada/internal/common/auth/configuration"
1010
commonconfig "github.com/armadaproject/armada/internal/common/config"
1111
grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration"
12+
"github.com/armadaproject/armada/internal/common/preemption"
1213
profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration"
1314
armadaresource "github.com/armadaproject/armada/internal/common/resource"
1415
"github.com/armadaproject/armada/internal/common/types"
@@ -297,8 +298,8 @@ type SchedulingConfig struct {
297298
DefaultPoolSchedulePriority int
298299
Pools []PoolConfig
299300
ExperimentalIndicativeShare ExperimentalIndicativeShare
300-
// Default preemption retries settings so you don't have to annotate all jobs with retries.
301-
DefaultPreemptionRetry PreemptionRetryConfig
301+
// Preemption retry settings. Jobs can override via annotations.
302+
PreemptionRetry preemption.RetryConfig
302303
}
303304

304305
const (
@@ -482,8 +483,3 @@ type PricingApiConfig struct {
482483
// It will stub the pricing api so it returns non-zero values but won't call and external service
483484
DevModeEnabled bool
484485
}
485-
486-
type PreemptionRetryConfig struct {
487-
Enabled bool
488-
DefaultMaxRetryCount *uint
489-
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ALTER TABLE runs ADD COLUMN IF NOT EXISTS run_index bigint;
1+
ALTER TABLE runs ADD COLUMN IF NOT EXISTS run_index bigint DEFAULT 0;

internal/scheduler/jobdb/job.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ import (
44
"fmt"
55
"time"
66

7-
"github.com/armadaproject/armada/internal/common/preemption"
8-
"github.com/armadaproject/armada/internal/scheduler/configuration"
9-
107
"github.com/hashicorp/go-multierror"
118
"github.com/pkg/errors"
129
"golang.org/x/exp/maps"
@@ -15,6 +12,7 @@ import (
1512

1613
"github.com/armadaproject/armada/internal/common/constants"
1714
armadamaps "github.com/armadaproject/armada/internal/common/maps"
15+
"github.com/armadaproject/armada/internal/common/preemption"
1816
"github.com/armadaproject/armada/internal/common/types"
1917
"github.com/armadaproject/armada/internal/scheduler/adapters"
2018
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
@@ -846,9 +844,9 @@ func (job *Job) NumAttempts() uint {
846844
// IsEligibleForPreemptionRetry determines whether the job is eligible for preemption retries. It checks whether the
847845
// scheduler or the job has opted in for preemption retries. It then checks whether the job has exhausted the number
848846
// of retries.
849-
func (job *Job) IsEligibleForPreemptionRetry(defaultPreemptionRetryConfig configuration.PreemptionRetryConfig) bool {
847+
func (job *Job) IsEligibleForPreemptionRetry(retryConfig preemption.RetryConfig) bool {
850848
// Check if job explicitly enabled/disabled retries, falling back to platform default
851-
enabled := defaultPreemptionRetryConfig.Enabled
849+
enabled := retryConfig.Enabled
852850
if jobRetryEnabled, exists := preemption.AreRetriesEnabled(job.Annotations()); exists {
853851
enabled = jobRetryEnabled
854852
}
@@ -857,7 +855,7 @@ func (job *Job) IsEligibleForPreemptionRetry(defaultPreemptionRetryConfig config
857855
return false
858856
}
859857

860-
return job.NumPreemptedRuns() <= job.MaxPreemptionRetryCount(defaultPreemptionRetryConfig)
858+
return job.NumPreemptedRuns() <= job.MaxPreemptionRetryCount(retryConfig)
861859
}
862860

863861
func (job *Job) NumPreemptedRuns() uint {
@@ -870,11 +868,11 @@ func (job *Job) NumPreemptedRuns() uint {
870868
return preemptCount
871869
}
872870

873-
func (job *Job) MaxPreemptionRetryCount(defaultPreemptionRetryConfig configuration.PreemptionRetryConfig) uint {
871+
func (job *Job) MaxPreemptionRetryCount(retryConfig preemption.RetryConfig) uint {
874872
// Start with platform default
875873
var maxRetryCount uint
876-
if defaultPreemptionRetryConfig.DefaultMaxRetryCount != nil {
877-
maxRetryCount = *defaultPreemptionRetryConfig.DefaultMaxRetryCount
874+
if retryConfig.DefaultRetryCount != nil {
875+
maxRetryCount = *retryConfig.DefaultRetryCount
878876
}
879877

880878
// Allow jobs to override with a custom max retry count

internal/scheduler/jobdb/job_run_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,51 @@ func TestJobRun_TestRunAttempted(t *testing.T) {
123123
assert.True(t, attemptedRun.RunAttempted())
124124
}
125125

126+
func TestJobRun_InTerminalState(t *testing.T) {
127+
tests := map[string]struct {
128+
run *JobRun
129+
expected bool
130+
}{
131+
"base run is not terminal": {
132+
run: baseJobRun,
133+
expected: false,
134+
},
135+
"succeeded run is terminal": {
136+
run: baseJobRun.WithSucceeded(true),
137+
expected: true,
138+
},
139+
"failed run is terminal": {
140+
run: baseJobRun.WithFailed(true),
141+
expected: true,
142+
},
143+
"cancelled run is terminal": {
144+
run: baseJobRun.WithCancelled(true),
145+
expected: true,
146+
},
147+
"returned run is terminal": {
148+
run: baseJobRun.WithReturned(true),
149+
expected: true,
150+
},
151+
"preempted run is terminal": {
152+
run: baseJobRun.WithPreempted(true),
153+
expected: true,
154+
},
155+
"running run is not terminal": {
156+
run: baseJobRun.WithRunning(true),
157+
expected: false,
158+
},
159+
"pending run is not terminal": {
160+
run: baseJobRun.WithPending(true),
161+
expected: false,
162+
},
163+
}
164+
for name, tc := range tests {
165+
t.Run(name, func(t *testing.T) {
166+
assert.Equal(t, tc.expected, tc.run.InTerminalState())
167+
})
168+
}
169+
}
170+
126171
func TestDeepCopy(t *testing.T) {
127172
run := jobDb.CreateRun(
128173
uuid.NewString(),

0 commit comments

Comments
 (0)